Skip to content

Commit

Permalink
Sync from server repo (d78ecff9c2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Spilchen committed Sep 28, 2023
1 parent f79171f commit 06edc0a
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 126 deletions.
25 changes: 20 additions & 5 deletions commands/cmd_start_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ type CmdStartDB struct {
CmdBase
startDBOptions *vclusterops.VStartDatabaseOptions

Force *bool // force cleanup to start the database
AllowFallbackKeygen *bool // Generate spread encryption key from Vertica. Use under support guidance only
IgnoreClusterLease *bool // ignore the cluster lease in communal storage
Unsafe *bool // Start database unsafely, skipping recovery.
Fast *bool // Attempt fast startup database
Force *bool // force cleanup to start the database
AllowFallbackKeygen *bool // Generate spread encryption key from Vertica. Use under support guidance only
IgnoreClusterLease *bool // ignore the cluster lease in communal storage
Unsafe *bool // Start database unsafely, skipping recovery.
Fast *bool // Attempt fast startup database
communalStorageParams *string // raw input from user, need further processing
}

func makeCmdStartDB() *CmdStartDB {
Expand Down Expand Up @@ -54,6 +55,10 @@ func makeCmdStartDB() *CmdStartDB {
// eon flags
newCmd.isEon = newCmd.parser.Bool("eon-mode", false, util.GetEonFlagMsg("Indicate if the database is an Eon database."+
" Use it when you do not trust "+vclusterops.ConfigFileName))
startDBOptions.CommunalStorageLocation = newCmd.parser.String("communal-storage-location", "",
util.GetEonFlagMsg("Location of communal storage"))
newCmd.communalStorageParams = newCmd.parser.String("communal-storage-params", "", util.GetOptionalFlagMsg(
"Comma-separated list of NAME=VALUE pairs for communal storage parameters"))

// hidden options
// TODO: the following options will be processed later
Expand Down Expand Up @@ -105,6 +110,16 @@ func (c *CmdStartDB) Parse(inputArgv []string) error {

func (c *CmdStartDB) validateParse() error {
vlog.LogInfo("[%s] Called validateParse()", c.CommandType())

// check the format of communal storage params string, and parse it into configParams
communalStorageParams, err := util.ParseConfigParams(*c.communalStorageParams)
if err != nil {
return err
}
if communalStorageParams != nil {
c.startDBOptions.CommunalStorageParameters = communalStorageParams
}

return c.ValidateParseBaseOptions(&c.startDBOptions.DatabaseOptions)
}

Expand Down
15 changes: 15 additions & 0 deletions vclusterops/coordinator_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"github.com/vertica/vcluster/vclusterops/util"
"golang.org/x/exp/maps"
)

/* VCoordinationDatabase contains a copy of some of the CAT::Database
Expand Down Expand Up @@ -312,6 +313,20 @@ func (vdb *VCoordinationDatabase) GetAwsCredentialsFromEnv() error {
return nil
}

// filterPrimaryNodes will remove secondary nodes from vdb
func (vdb *VCoordinationDatabase) filterPrimaryNodes() {
primaryHostNodeMap := makeVHostNodeMap()

for h, vnode := range vdb.HostNodeMap {
if vnode.IsPrimary {
primaryHostNodeMap[h] = vnode
}
}
vdb.HostNodeMap = primaryHostNodeMap

vdb.HostList = maps.Keys(vdb.HostNodeMap)
}

/* VCoordinationNode contains a copy of the some of CAT::Node information
* from the database catalog (visible in the vs_nodes table). It is similar
* to the admintools VNode object.
Expand Down
2 changes: 0 additions & 2 deletions vclusterops/create_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type VCreateDatabaseOptions struct {
LicensePathOnNode *string // required to be a fully qualified path
// part 2: eon db info
ShardCount *int
CommunalStorageLocation *string
CommunalStorageParamsPath *string
DepotSize *string // like 10G
GetAwsCredentialsFromEnv *bool
Expand Down Expand Up @@ -81,7 +80,6 @@ func (opt *VCreateDatabaseOptions) SetDefaultValues() {

// eon db Info
opt.ShardCount = new(int)
opt.CommunalStorageLocation = new(string)
opt.CommunalStorageParamsPath = new(string)
opt.DepotSize = new(string)
opt.GetAwsCredentialsFromEnv = new(bool)
Expand Down
126 changes: 73 additions & 53 deletions vclusterops/nma_download_file_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type NMADownloadFileOp struct {
newNodes []string
displayOnly bool
ignoreClusterLease bool
forRevive bool
}

type downloadFileRequestData struct {
Expand Down Expand Up @@ -83,15 +84,13 @@ func (e *ReviveDBNodeCountMismatchError) Error() string {
}

func makeNMADownloadFileOp(newNodes []string, sourceFilePath, destinationFilePath, catalogPath string,
communalStorageParameters map[string]string, vdb *VCoordinationDatabase, displayOnly, ignoreClusterLease bool) (NMADownloadFileOp, error) {
communalStorageParameters map[string]string, vdb *VCoordinationDatabase) (NMADownloadFileOp, error) {
op := NMADownloadFileOp{}
op.name = "NMADownloadFileOp"
initiator := getInitiator(newNodes)
op.hosts = []string{initiator}
op.vdb = vdb
op.newNodes = newNodes
op.displayOnly = displayOnly
op.ignoreClusterLease = ignoreClusterLease

// make https json data
op.hostRequestBodyMap = make(map[string]string)
Expand All @@ -113,6 +112,20 @@ func makeNMADownloadFileOp(newNodes []string, sourceFilePath, destinationFilePat
return op, nil
}

func makeNMADownloadFileOpForRevive(newNodes []string, sourceFilePath, destinationFilePath, catalogPath string,
communalStorageParameters map[string]string, vdb *VCoordinationDatabase, displayOnly, ignoreClusterLease bool) (NMADownloadFileOp, error) {
op, err := makeNMADownloadFileOp(newNodes, sourceFilePath, destinationFilePath,
catalogPath, communalStorageParameters, vdb)
if err != nil {
return op, err
}
op.displayOnly = displayOnly
op.ignoreClusterLease = ignoreClusterLease
op.forRevive = true

return op, nil
}

func (op *NMADownloadFileOp) setupClusterHTTPRequest(hosts []string) error {
op.clusterHTTPRequest = ClusterHTTPRequest{}
op.clusterHTTPRequest.RequestCollection = make(map[string]HostHTTPRequest)
Expand Down Expand Up @@ -190,7 +203,7 @@ func (op *NMADownloadFileOp) processResult(execContext *OpEngineExecContext) err
}

// for --display-only, we only need the file content
if op.displayOnly {
if op.displayOnly && op.forRevive {
execContext.dbInfo = response.FileContent
return nil
}
Expand All @@ -203,60 +216,27 @@ func (op *NMADownloadFileOp) processResult(execContext *OpEngineExecContext) err
break
}

err = op.clusterLeaseCheck(descFileContent.ClusterLeaseExpiration)
if err != nil {
allErrs = errors.Join(allErrs, err)
break
}

if len(descFileContent.NodeList) != len(op.newNodes) {
err := &ReviveDBNodeCountMismatchError{
ReviveDBStep: op.name,
FailureHost: host,
NumOfNewNodes: len(op.newNodes),
NumOfOldNodes: len(descFileContent.NodeList),
}
allErrs = errors.Join(allErrs, err)
break
}

// save descFileContent in vdb
op.vdb.HostNodeMap = makeVHostNodeMap()
for _, node := range descFileContent.NodeList {
op.vdb.HostList = append(op.vdb.HostList, node.Address)
vNode := MakeVCoordinationNode()
vNode.Name = node.Name
vNode.Address = node.Address
vNode.IsPrimary = node.IsPrimary

// remove suffix "/Catalog" from node catalog path
// e.g. /data/test_db/v_test_db_node0002_catalog/Catalog -> /data/test_db/v_test_db_node0002_catalog
if filepath.Base(node.CatalogPath) == catalogSuffix {
vNode.CatalogPath = filepath.Dir(node.CatalogPath)
} else {
vNode.CatalogPath = node.CatalogPath
if op.forRevive {
err = op.clusterLeaseCheck(descFileContent.ClusterLeaseExpiration)
if err != nil {
allErrs = errors.Join(allErrs, err)
break
}

for _, storage := range descFileContent.StorageLocations {
// when storage name contains the node name, we know this storage is for that node
// an example of storage name: "__location_1_v_test_db_node0001"
// this will filter out communal storage location
if strings.Contains(storage.Name, node.Name) {
// we separate depot path and other storage locations
if storage.Usage == depotStorageType {
vNode.DepotPath = storage.Path
} else {
vNode.StorageLocations = append(vNode.StorageLocations, storage.Path)
// we store the user storage location for later prepare directory use
if storage.Usage == userStorageType {
vNode.UserStorageLocations = append(vNode.UserStorageLocations, storage.Path)
}
}
if len(descFileContent.NodeList) != len(op.newNodes) {
err := &ReviveDBNodeCountMismatchError{
ReviveDBStep: op.name,
FailureHost: host,
NumOfNewNodes: len(op.newNodes),
NumOfOldNodes: len(descFileContent.NodeList),
}
allErrs = errors.Join(allErrs, err)
break
}

op.vdb.HostNodeMap[node.Address] = &vNode
}

// save descFileContent in vdb
op.buildVDBFromClusterConfig(descFileContent)
return nil
}

Expand All @@ -267,6 +247,46 @@ func (op *NMADownloadFileOp) processResult(execContext *OpEngineExecContext) err
return appendHTTPSFailureError(allErrs)
}

// buildVDBFromClusterConfig can build a vdb using cluster_config.json
func (op *NMADownloadFileOp) buildVDBFromClusterConfig(descFileContent fileContent) {
op.vdb.HostNodeMap = makeVHostNodeMap()
for _, node := range descFileContent.NodeList {
op.vdb.HostList = append(op.vdb.HostList, node.Address)
vNode := MakeVCoordinationNode()
vNode.Name = node.Name
vNode.Address = node.Address
vNode.IsPrimary = node.IsPrimary

// remove suffix "/Catalog" from node catalog path
// e.g. /data/test_db/v_test_db_node0002_catalog/Catalog -> /data/test_db/v_test_db_node0002_catalog
if filepath.Base(node.CatalogPath) == catalogSuffix {
vNode.CatalogPath = filepath.Dir(node.CatalogPath)
} else {
vNode.CatalogPath = node.CatalogPath
}

for _, storage := range descFileContent.StorageLocations {
// when storage name contains the node name, we know this storage is for that node
// an example of storage name: "__location_1_v_test_db_node0001"
// this will filter out communal storage location
if strings.Contains(storage.Name, node.Name) {
// we separate depot path and other storage locations
if storage.Usage == depotStorageType {
vNode.DepotPath = storage.Path
} else {
vNode.StorageLocations = append(vNode.StorageLocations, storage.Path)
// we store the user storage location for later prepare directory use
if storage.Usage == userStorageType {
vNode.UserStorageLocations = append(vNode.UserStorageLocations, storage.Path)
}
}
}
}

op.vdb.HostNodeMap[node.Address] = &vNode
}
}

func (op *NMADownloadFileOp) clusterLeaseCheck(clusterLeaseExpiration string) error {
if op.ignoreClusterLease {
vlog.LogPrintWarningln("Skipping cluster lease check")
Expand Down
73 changes: 55 additions & 18 deletions vclusterops/re_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (opt *VReIPOptions) validateParseOptions() error {
return err
}

if *opt.CommunalStorageLocation != "" {
return util.ValidateCommunalStorageLocation(*opt.CommunalStorageLocation)
}

return opt.ValidateBaseOptions("re_ip")
}

Expand Down Expand Up @@ -116,8 +120,26 @@ func (vcc *VClusterCommands) VReIP(options *VReIPOptions) error {
return err
}

var pVDB *VCoordinationDatabase
// retrieve database information from cluster_config.json for EON databases
if options.IsEon.ToBool() {
if *options.CommunalStorageLocation != "" {
vdb, e := options.getVDBWhenDBIsDown()
if e != nil {
return e
}
pVDB = &vdb
} else {
// When communal storage location is missing, we only log a debug message
// because re-ip only fails in between revive_db and first start_db.
// We should not ran re-ip in that case because revive_db has already done the re-ip work.
vlog.LogDebug("communal storage location is not specified for an eon database," +
" re_ip after revive_db could fail because we cannot retrieve the correct database information")
}
}

// produce re-ip instructions
instructions, err := vcc.produceReIPInstructions(options)
instructions, err := vcc.produceReIPInstructions(options, pVDB)
if err != nil {
vlog.LogPrintError("fail to produce instructions, %v", err)
return err
Expand All @@ -144,7 +166,7 @@ func (vcc *VClusterCommands) VReIP(options *VReIPOptions) error {
// - Read database info from catalog editor
// (now we should know which hosts have the latest catalog)
// - Run re-ip on the target nodes
func (vcc *VClusterCommands) produceReIPInstructions(options *VReIPOptions) ([]ClusterOp, error) {
func (vcc *VClusterCommands) produceReIPInstructions(options *VReIPOptions, vdb *VCoordinationDatabase) ([]ClusterOp, error) {
var instructions []ClusterOp

if len(options.ReIPList) == 0 {
Expand All @@ -163,29 +185,44 @@ func (vcc *VClusterCommands) produceReIPInstructions(options *VReIPOptions) ([]C
}
nmaNetworkProfileOp := makeNMANetworkProfileOp(newAddresses)

// build a VCoordinationDatabase (vdb) object by reading the nodes' information from /v1/nodes
vdb := VCoordinationDatabase{}
nmaGetNodesInfoOp := makeNMAGetNodesInfoOp(hosts, *options.DBName, *options.CatalogPrefix, &vdb)
instructions = append(instructions,
&nmaHealthOp,
&nmaVerticaVersionOp,
&nmaNetworkProfileOp,
)

// read catalog editor to get hosts with latest catalog
nmaReadCatEdOp, err := makeNMAReadCatalogEditorOp(&vdb)
if err != nil {
return instructions, err
vdbWithPrimaryNodes := new(VCoordinationDatabase)
// When we cannot get db info from cluster_config.json, we will fetch it from NMA /nodes endpoint.
if vdb == nil {
vdb = new(VCoordinationDatabase)
nmaGetNodesInfoOp := makeNMAGetNodesInfoOp(options.Hosts, *options.DBName, *options.CatalogPrefix, vdb)
// read catalog editor to get hosts with latest catalog
nmaReadCatEdOp, err := makeNMAReadCatalogEditorOp(vdb)
if err != nil {
return instructions, err
}
instructions = append(instructions,
&nmaGetNodesInfoOp,
&nmaReadCatEdOp,
)
} else {
// use a copy of vdb because we want to keep secondary nodes in vdb for next nmaReIPOP
*vdbWithPrimaryNodes = *vdb
vdbWithPrimaryNodes.filterPrimaryNodes()
// read catalog editor to get hosts with latest catalog
nmaReadCatEdOp, err := makeNMAReadCatalogEditorOp(vdbWithPrimaryNodes)
if err != nil {
return instructions, err
}
instructions = append(instructions, &nmaReadCatEdOp)
}

// re-ip
// at this stage the re-ip info should either by provided by
// the re-ip file (for vcluster CLI) or the Kubernetes operator
nmaReIPOP := makeNMAReIPOp(options.ReIPList, &vdb)
nmaReIPOP := makeNMAReIPOp(options.ReIPList, vdb)

instructions = append(instructions,
&nmaHealthOp,
&nmaVerticaVersionOp,
&nmaNetworkProfileOp,
&nmaGetNodesInfoOp,
&nmaReadCatEdOp,
&nmaReIPOP,
)
instructions = append(instructions, &nmaReIPOP)

return instructions, nil
}
Expand Down
Loading

0 comments on commit 06edc0a

Please sign in to comment.