Skip to content

Commit

Permalink
Sync from server repo (cbf78ae4e5a)
Browse files Browse the repository at this point in the history
  • Loading branch information
releng committed Nov 21, 2024
1 parent cf50448 commit caf19d2
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 21 deletions.
1 change: 1 addition & 0 deletions vclusterops/cluster_op_engine_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type opEngineExecContext struct {
networkProfiles map[string]networkProfile
nmaVDatabase nmaVDatabase
upHosts []string // a sorted host list that contains all up nodes
computeHosts []string // a sorted host list that contains all up (COMPUTE) compute nodes
nodesInfo []NodeInfo
scNodesInfo []NodeInfo // a node list contains all nodes in a subcluster

Expand Down
10 changes: 9 additions & 1 deletion vclusterops/coordinator_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,16 @@ func (vdb *VCoordinationDatabase) addNode(vnode *VCoordinationNode) error {
// in all clusters (main and sandboxes)
func (vdb *VCoordinationDatabase) addHosts(hosts []string, scName string,
existingHostNodeMap vHostNodeMap) error {
totalHostCount := len(hosts) + len(existingHostNodeMap)
totalHostCount := len(hosts) + len(existingHostNodeMap) + len(vdb.UnboundNodes)
nodeNameToHost := genNodeNameToHostMap(existingHostNodeMap)
// The GenVNodeName(...) function below will generate node names based on nodeNameToHost and totalHostCount.
// If a name already exists, it won't be re-generated.
// In this case, we need to add unbound node names into this map too.
// Otherwise, the new nodes will reuse the existing unbound node names, then make a clash later on.
for _, vnode := range vdb.UnboundNodes {
nodeNameToHost[vnode.Name] = vnode.Address
}

for _, host := range hosts {
vNode := makeVCoordinationNode()
name, ok := util.GenVNodeName(nodeNameToHost, vdb.Name, totalHostCount)
Expand Down
3 changes: 3 additions & 0 deletions vclusterops/fetch_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (vcc VClusterCommands) VFetchCoordinationDatabase(options *VFetchCoordinati
}

for h, n := range nmaVDB.HostNodeMap {
if h == util.UnboundedIPv4 || h == util.UnboundedIPv6 {
continue
}
vnode, ok := vdb.HostNodeMap[h]
if !ok {
return vdb, fmt.Errorf("host %s is not found in the vdb object", h)
Expand Down
6 changes: 6 additions & 0 deletions vclusterops/https_check_subcluster_sandbox_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package vclusterops
import (
"errors"
"fmt"

"github.com/vertica/vcluster/vclusterops/util"
)

type httpsCheckSubclusterSandboxOp struct {
Expand Down Expand Up @@ -60,6 +62,10 @@ func (op *httpsCheckSubclusterSandboxOp) setupClusterHTTPRequest(hosts []string)
}

func (op *httpsCheckSubclusterSandboxOp) prepare(execContext *opEngineExecContext) error {
if execContext.computeHosts != nil {
op.hosts = util.SliceDiff(op.hosts, execContext.computeHosts)
}

execContext.dispatcher.setup(op.hosts)

return op.setupClusterHTTPRequest(op.hosts)
Expand Down
30 changes: 20 additions & 10 deletions vclusterops/https_get_up_nodes_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (op *httpsGetUpNodesOp) execute(execContext *opEngineExecContext) error {
func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) error {
var allErrs error
upHosts := mapset.NewSet[string]()
computeHosts := mapset.NewSet[string]()
upScInfo := make(map[string]string)
exceptionHosts := []string{}
downHosts := []string{}
Expand All @@ -148,8 +149,9 @@ func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) err
op.logResponse(host, result)
if !result.isPassing() {
allErrs = errors.Join(allErrs, result.err)
if result.isUnauthorizedRequest() || result.isInternalError() {
// Authentication error and any unexpected internal server error
if result.isUnauthorizedRequest() || result.isInternalError() || result.hasPreconditionFailed() {
// Authentication error and any unexpected internal server error, plus compute nodes or nodes
// that haven't joined the cluster yet
exceptionHosts = append(exceptionHosts, host)
continue
}
Expand All @@ -167,16 +169,15 @@ func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) err
continue
}

if op.cmdType == StopDBCmd || op.cmdType == StopSubclusterCmd {
err = op.validateHosts(nodesStates)
if err != nil {
allErrs = errors.Join(allErrs, err)
break
}
// For certain commands, check hosts in input against those reported from endpoint
err = op.validateHosts(nodesStates)
if err != nil {
allErrs = errors.Join(allErrs, err)
break
}

// Collect all the up hosts
err = op.collectUpHosts(nodesStates, host, upHosts, upScInfo, sandboxInfo, upScNodes, scNodes)
err = op.collectUpHosts(nodesStates, host, upHosts, computeHosts, upScInfo, sandboxInfo, upScNodes, scNodes)
if err != nil {
allErrs = errors.Join(allErrs, err)
return allErrs
Expand All @@ -190,6 +191,7 @@ func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) err
break
}
}
execContext.computeHosts = computeHosts.ToSlice()
execContext.nodesInfo = upScNodes.ToSlice()
execContext.scNodesInfo = scNodes.ToSlice()
execContext.upHostsToSandboxes = sandboxInfo
Expand Down Expand Up @@ -275,6 +277,10 @@ func (op *httpsGetUpNodesOp) processHostLists(upHosts mapset.Set[string], upScIn

// validateHosts can validate if hosts in user input matches the ones in GET /nodes response
func (op *httpsGetUpNodesOp) validateHosts(nodesStates nodesStateInfo) error {
// only needed for the following commands
if !(op.cmdType == StopDBCmd || op.cmdType == StopSubclusterCmd) {
return nil
}
var dbHosts []string
dbUnexpected := false
unexpectedDBName := ""
Expand Down Expand Up @@ -310,7 +316,7 @@ func (op *httpsGetUpNodesOp) checkUpHostEligible(node *nodeStateInfo) bool {
return true
}

func (op *httpsGetUpNodesOp) collectUpHosts(nodesStates nodesStateInfo, host string, upHosts mapset.Set[string],
func (op *httpsGetUpNodesOp) collectUpHosts(nodesStates nodesStateInfo, host string, upHosts, computeHosts mapset.Set[string],
upScInfo, sandboxInfo map[string]string, upScNodes, scNodes mapset.Set[NodeInfo]) (err error) {
foundSC := false
for _, node := range nodesStates.NodeList {
Expand All @@ -333,6 +339,10 @@ func (op *httpsGetUpNodesOp) collectUpHosts(nodesStates nodesStateInfo, host str
}
}

if node.State == util.NodeComputeState {
computeHosts.Add(node.Address)
}

if op.scName == node.Subcluster {
op.sandbox = node.Sandbox
if node.IsPrimary {
Expand Down
6 changes: 4 additions & 2 deletions vclusterops/nma_download_file_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ type fileContent struct {
Path string `json:"path"`
Usage int `json:"usage"`
} `json:"StorageLocation"`
Sandbox string
}

func (op *nmaDownloadFileOp) processResult(execContext *opEngineExecContext) error {
Expand Down Expand Up @@ -278,7 +279,7 @@ func (op *nmaDownloadFileOp) processResult(execContext *opEngineExecContext) err
}

// save descFileContent in vdb
return op.buildVDBFromClusterConfig(descFileContent)
return op.buildVDBFromClusterConfig(&descFileContent)
}

httpsErr := errors.Join(fmt.Errorf("[%s] HTTPS call failed on host %s", op.name, host), result.err)
Expand All @@ -299,13 +300,14 @@ func filterPrimaryNodes(descFileContent *fileContent) {
}

// buildVDBFromClusterConfig can build a vdb using cluster_config.json
func (op *nmaDownloadFileOp) buildVDBFromClusterConfig(descFileContent fileContent) error {
func (op *nmaDownloadFileOp) buildVDBFromClusterConfig(descFileContent *fileContent) error {
op.vdb.HostNodeMap = makeVHostNodeMap()
for _, node := range descFileContent.NodeList {
vNode := makeVCoordinationNode()
vNode.Name = node.Name
vNode.Address = node.Address
vNode.IsPrimary = node.IsPrimary
vNode.Sandbox = descFileContent.Sandbox

// remove suffix "/Catalog" from node catalog path
// e.g. /data/test_db/v_test_db_node0002_catalog/Catalog -> /data/test_db/v_test_db_node0002_catalog
Expand Down
10 changes: 4 additions & 6 deletions vclusterops/start_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,12 @@ func (vcc VClusterCommands) VStartDatabase(options *VStartDatabaseOptions) (vdbP
// VER-93369 may improve this if the CLI knows which nodes are primary
// from the config file
var vdb VCoordinationDatabase
// retrieve database information from cluster_config.json for Eon databases,
// skip this step for starting a sandbox because cluster_config.json does not
// contain accurate info of nodes in a sandbox
if !options.HostsInSandbox && options.IsEon {
// retrieve database information from cluster_config.json for Eon databases
if options.IsEon {
const warningMsg = " for an Eon database, start_db after revive_db could fail " +
util.DBInfo
if options.CommunalStorageLocation != "" {
vdbNew, e := options.getVDBWhenDBIsDown(vcc)
vdbNew, e := options.getVDBFromSandboxWhenDBIsDown(vcc, options.Sandbox)
if e != nil {
// show a warning message if we cannot get VDB from a down database
vcc.Log.PrintWarning(util.CommStorageFail + warningMsg)
Expand Down Expand Up @@ -173,7 +171,7 @@ func (vcc VClusterCommands) VStartDatabase(options *VStartDatabaseOptions) (vdbP
clusterOpEngine := makeClusterOpEngine(instructions, options)

// Give the instructions to the VClusterOpEngine to run
runError := clusterOpEngine.run(vcc.Log)
runError := clusterOpEngine.runInSandbox(vcc.Log, &vdb, options.Sandbox)
if runError != nil {
return nil, fmt.Errorf("fail to start database: %w", runError)
}
Expand Down
11 changes: 9 additions & 2 deletions vclusterops/vcluster_database_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,15 @@ func (opt *DatabaseOptions) normalizePaths() {
opt.DepotPrefix = util.GetCleanPath(opt.DepotPrefix)
}

// getVDBWhenDBIsDown can retrieve db configurations from NMA /nodes endpoint and cluster_config.json when db is down
// getVDBWhenDBIsDown can retrieve db configurations from the NMA /nodes endpoint and cluster_config.json when db is down
func (opt *DatabaseOptions) getVDBWhenDBIsDown(vcc VClusterCommands) (vdb VCoordinationDatabase, err error) {
return opt.getVDBFromSandboxWhenDBIsDown(vcc, util.MainClusterSandbox)
}

// getVDBFromSandboxWhenDBIsDown can retrieve db configurations about a given sandbox
// from the NMA /nodes endpoint and cluster_config.json when db is down
func (opt *DatabaseOptions) getVDBFromSandboxWhenDBIsDown(vcc VClusterCommands,
sandbox string) (vdb VCoordinationDatabase, err error) {
/*
* 1. Get node names for input hosts from NMA /nodes.
* 2. Get other node information for input hosts from cluster_config.json.
Expand Down Expand Up @@ -324,7 +331,7 @@ func (opt *DatabaseOptions) getVDBWhenDBIsDown(vcc VClusterCommands) (vdb VCoord
// step 2: get node details from cluster_config.json
vdb2 := VCoordinationDatabase{}
var instructions2 []clusterOp
currConfigFileSrcPath := opt.getCurrConfigFilePath(util.MainClusterSandbox)
currConfigFileSrcPath := opt.getCurrConfigFilePath(sandbox)
nmaDownLoadFileOp, err := makeNMADownloadFileOp(opt.Hosts, currConfigFileSrcPath, currConfigFileDestPath, catalogPath,
opt.ConfigurationParameters, &vdb2)
if err != nil {
Expand Down

0 comments on commit caf19d2

Please sign in to comment.