Skip to content

Commit

Permalink
Sync from server repo (29f7f85ff46)
Browse files Browse the repository at this point in the history
  • Loading branch information
releng committed Oct 22, 2024
1 parent 89b25bf commit 0b19368
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 29 deletions.
57 changes: 51 additions & 6 deletions commands/cmd_revive_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package commands

import (
"fmt"
"strconv"

"github.com/spf13/cobra"
Expand All @@ -42,8 +43,10 @@ func makeCmdReviveDB() *cobra.Command {
cmd := makeBasicCobraCmd(
newCmd,
reviveDBSubCmd,
"Revive or restores an Eon Mod database.",
`Revives or restores an Eon Mode database. You cannot revive sandboxes with this command.
"Revive or restores an Eon Mode database.",
`Revives or restores an Eon Mode database. In a cluster with sandboxes, the database can be revived
to main cluster by default or by using the arg --main-cluster-only. arg --sandbox <sandboxname> can
be used to revive database to given sandbox.
If access to communal storage requires access keys, you must provide the keys with the --config-param option.
Expand Down Expand Up @@ -128,6 +131,18 @@ func (c *CmdReviveDB) setLocalFlags(cmd *cobra.Command) {
"",
"The identifier of the restore point in the restore archive.",
)
cmd.Flags().StringVar(
&c.reviveDBOptions.Sandbox,
sandboxFlag,
"",
"Name of the sandbox to revive",
)
cmd.Flags().BoolVar(
&c.reviveDBOptions.MainCluster,
"main-cluster-only",
false,
"Revive the database on main cluster, but do not touch any of the sandboxes",
)
// only one of restore-point-index or restore-point-id" will be required
cmd.MarkFlagsMutuallyExclusive("restore-point-index", "restore-point-id")
}
Expand Down Expand Up @@ -184,15 +199,45 @@ func (c *CmdReviveDB) Run(vcc vclusterops.ClusterCommands) error {

// write db info to vcluster config file
vdb.FirstStartAfterRevive = true
err = writeConfig(vdb, true /*forceOverwrite*/)
if err != nil {
vcc.DisplayWarning("Failed to write the configuration file: %s", err)

// Read the config file
dbConfig := MakeDatabaseConfig()
dbConfigPtr, configErr := readConfig()
if configErr != nil {
// config file does not exist, neither main cluster nor sandbox has been revived yet.
// overwrite the config file.
err = c.overwriteConfig(vdb)
if err != nil {
vcc.DisplayWarning(err.Error())
return nil
}
} else {
// config file already exists. This could happen if we have partially revived the db(sandbox or main cluster) already
// In this case, we update the existing config file instead of overwriting it.
dbConfig = *dbConfigPtr
updateConfig(vdb, &dbConfig)
writeErr := dbConfig.write(c.reviveDBOptions.ConfigPath, true /*forceOverwrite*/)
if writeErr != nil {
vcc.DisplayWarning("Fail to update config file: %s", writeErr)
return nil
}
err = c.writeConfigParam(c.reviveDBOptions.ConfigurationParameters, true /*forceOverwrite*/)
if err != nil {
vcc.DisplayWarning("Failed to write the configuration parameter file: %s", err)
}
}
return nil
}

func (c *CmdReviveDB) overwriteConfig(vdb *vclusterops.VCoordinationDatabase) error {
err := writeConfig(vdb, true /*forceOverwrite*/)
if err != nil {
return fmt.Errorf("failed to write the configuration file: %s", err)
}
// write config parameters to vcluster config param file
err = c.writeConfigParam(c.reviveDBOptions.ConfigurationParameters, true /*forceOverwrite*/)
if err != nil {
vcc.DisplayWarning("Failed to write the configuration parameter file: %s", err)
return fmt.Errorf("failed to write the configuration parameter file: %s", err)
}
return nil
}
Expand Down
38 changes: 38 additions & 0 deletions commands/vcluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"os"
"path/filepath"
"sort"

"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -274,6 +275,43 @@ func buildNodeConfig(vnode *vclusterops.VCoordinationNode,
return nodeConfig
}

// Update give node info based on give vnode info
func updateNodeConfig(vnode *vclusterops.VCoordinationNode,
vdb *vclusterops.VCoordinationDatabase, n *NodeConfig) {
n.Address = vnode.Address
n.Subcluster = vnode.Subcluster
n.Sandbox = vnode.Sandbox
n.CatalogPath = vnode.CatalogPath
if vdb.DataPrefix == "" && len(vnode.StorageLocations) > 0 && n.DataPath == "" {
n.DataPath = vnode.StorageLocations[0]
}
n.DepotPath = vnode.DepotPath
}

// update the input dbConfig
func updateConfig(vdb *vclusterops.VCoordinationDatabase, dbConfig *DatabaseConfig) {
var newNodes []*NodeConfig
nodeConfigMap := make(map[string]*NodeConfig)
for _, n := range dbConfig.Nodes {
nodeConfigMap[n.Name] = n
}

for _, vnode := range vdb.HostNodeMap {
if n, exists := nodeConfigMap[vnode.Name]; exists {
// If found, update the existing node configuration
updateNodeConfig(vnode, vdb, n)
} else {
// If not found, build and append a new node configuration
n := buildNodeConfig(vnode, vdb)
newNodes = append(newNodes, &n)
}
}
dbConfig.Nodes = append(dbConfig.Nodes, newNodes...)
sort.Slice(dbConfig.Nodes, func(i, j int) bool {
return dbConfig.Nodes[i].Name < dbConfig.Nodes[j].Name
})
}

// read reads information from configFilePath to a DatabaseConfig object.
// It returns any read error encountered.
func readConfig() (dbConfig *DatabaseConfig, err error) {
Expand Down
16 changes: 15 additions & 1 deletion vclusterops/nma_load_remote_catalog_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"encoding/json"
"errors"
"fmt"

"github.com/vertica/vcluster/vclusterops/util"
)

type nmaLoadRemoteCatalogOp struct {
Expand All @@ -30,6 +32,7 @@ type nmaLoadRemoteCatalogOp struct {
timeout uint
primaryNodeCount uint
restorePoint *RestorePointPolicy
Sandbox string
}

type loadRemoteCatalogRequestData struct {
Expand All @@ -46,6 +49,7 @@ type loadRemoteCatalogRequestData struct {
RestorePointArchive string `json:"restore_point_archive,omitempty"`
RestorePointIndex int `json:"restore_point_index,omitempty"`
RestorePointID string `json:"restore_point_id,omitempty"`
Sandbox string `json:"sandbox,omitempty"`
}

func makeNMALoadRemoteCatalogOp(oldHosts []string, configurationParameters map[string]string,
Expand All @@ -57,7 +61,7 @@ func makeNMALoadRemoteCatalogOp(oldHosts []string, configurationParameters map[s
op.oldHosts = oldHosts
op.configurationParameters = configurationParameters
op.vdb = vdb
op.timeout = timeout
op.timeout = timeout // timeout 0 indicates wait forever
op.restorePoint = restorePoint

op.primaryNodeCount = 0
Expand All @@ -70,6 +74,13 @@ func makeNMALoadRemoteCatalogOp(oldHosts []string, configurationParameters map[s
return op
}

func makeNMALoadRemoteCatalogWithSandboxOp(oldHosts []string, configurationParameters map[string]string,
vdb *VCoordinationDatabase, timeout uint, restorePoint *RestorePointPolicy, sandbox string) nmaLoadRemoteCatalogOp {
op := makeNMALoadRemoteCatalogOp(oldHosts, configurationParameters, vdb, timeout, restorePoint)
op.Sandbox = sandbox
return op
}

// make https json data
func (op *nmaLoadRemoteCatalogOp) setupRequestBody(execContext *opEngineExecContext) error {
if len(execContext.networkProfiles) != len(op.hosts) {
Expand Down Expand Up @@ -106,6 +117,9 @@ func (op *nmaLoadRemoteCatalogOp) setupRequestBody(execContext *opEngineExecCont
requestData.RestorePointIndex = op.restorePoint.Index
requestData.RestorePointID = op.restorePoint.ID
}
if op.Sandbox != util.MainClusterSandbox {
requestData.Sandbox = op.Sandbox
}

dataBytes, err := json.Marshal(requestData)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions vclusterops/nma_read_catalog_editor_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

"github.com/vertica/vcluster/rfc7807"
"github.com/vertica/vcluster/vclusterops/util"
"golang.org/x/exp/maps"
)

Expand All @@ -38,6 +39,7 @@ type nmaReadCatalogEditorOp struct {
hostsWithLatestCatalog []string
latestNmaVDB nmaVDatabase
bestHost string
sandbox string
}

// makeNMAReadCatalogEditorOpWithInitiator creates an op to read catalog editor info.
Expand All @@ -61,12 +63,13 @@ func makeNMAReadCatalogEditorOp(vdb *VCoordinationDatabase) (nmaReadCatalogEdito

func makeNMAReadCatalogEditorOpForStartDB(
vdb *VCoordinationDatabase,
firstStartAfterRevive bool) (nmaReadCatalogEditorOp, error) {
firstStartAfterRevive bool,
sandbox string) (nmaReadCatalogEditorOp, error) {
op, err := makeNMAReadCatalogEditorOpWithInitiator([]string{}, vdb)
if err != nil {
return op, err
}

op.sandbox = sandbox
op.firstStartAfterRevive = firstStartAfterRevive
return op, err
}
Expand Down Expand Up @@ -273,5 +276,10 @@ func (op *nmaReadCatalogEditorOp) finalize(execContext *opEngineExecContext) err
// save the latest nmaVDB to execContext
execContext.nmaVDatabase = op.latestNmaVDB
op.logger.PrintInfo("reporting results as obtained from the host [%s] ", op.bestHost)
// when starting sandboxes, we just need one passing result from a primary node
// and we return successfully once we have it.
if op.sandbox != util.MainClusterSandbox {
return nil
}
return op.allErrs
}
34 changes: 28 additions & 6 deletions vclusterops/revive_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type VReviveDatabaseOptions struct {
IgnoreClusterLease bool
// the restore policy
RestorePoint RestorePointPolicy
// Name of sandbox to revive
Sandbox string
// Revive db on main cluster only
MainCluster bool
}

type RestorePointPolicy struct {
Expand Down Expand Up @@ -260,7 +264,15 @@ func (vcc VClusterCommands) VReviveDatabase(options *VReviveDatabaseOptions) (db
if err != nil {
return dbInfo, &vdb, fmt.Errorf("fail to revive database %w", err)
}

nmaVDB := clusterOpEngine.execContext.nmaVDatabase
for h, vnode := range nmaVDB.HostNodeMap {
_, ok := vdb.HostNodeMap[h]
if !ok {
continue
}
vdb.HostNodeMap[h].Subcluster = vnode.Subcluster.Name
vdb.HostNodeMap[h].Sandbox = vnode.Subcluster.SandboxName
}
// fill vdb with VReviveDatabaseOptions information
vdb.Name = options.DBName
vdb.IsEon = true
Expand Down Expand Up @@ -299,7 +311,8 @@ func (vcc VClusterCommands) producePreReviveDBInstructions(options *VReviveDatab
)

// use current description file path as source file path
currConfigFileSrcPath := options.getCurrConfigFilePath()
currConfigFileSrcPath := ""
currConfigFileSrcPath = options.getCurrConfigFilePath(options.Sandbox)

if !options.isRestoreEnabled() {
// perform revive, either display-only or not
Expand Down Expand Up @@ -384,14 +397,18 @@ func (vcc VClusterCommands) produceReviveDBInstructions(options *VReviveDatabase
if err != nil {
return instructions, err
}

initiator := []string{}
// create a new HostNodeMap to prepare directories
hostNodeMap := makeVHostNodeMap()
// remove user storage locations from storage locations in every node
// user storage location will not be force deleted,
// and fail to create user storage location will not cause a failure of NMA /directories/prepare call.
// as a result, we separate user storage locations with other storage locations
for host, vnode := range newVDB.HostNodeMap {
if vnode.IsPrimary {
// whether reviving to main cluster or sandbox, the host node map would always have relevant cluster nodes
initiator = append(initiator, host)
}
userLocationSet := make(map[string]struct{})
for _, userLocation := range vnode.UserStorageLocations {
userLocationSet[userLocation] = struct{}{}
Expand All @@ -405,21 +422,26 @@ func (vcc VClusterCommands) produceReviveDBInstructions(options *VReviveDatabase
vnode.StorageLocations = newLocations
hostNodeMap[host] = vnode
}

// prepare all directories
nmaPrepareDirectoriesOp, err := makeNMAPrepareDirectoriesOp(hostNodeMap, options.ForceRemoval, true /*for db revive*/)
if err != nil {
return instructions, err
}

nmaNetworkProfileOp := makeNMANetworkProfileOp(options.Hosts)

nmaLoadRemoteCatalogOp := makeNMALoadRemoteCatalogOp(oldHosts, options.ConfigurationParameters,
&newVDB, options.LoadCatalogTimeout, &options.RestorePoint)
nmaLoadRemoteCatalogOp := makeNMALoadRemoteCatalogWithSandboxOp(oldHosts, options.ConfigurationParameters,
&newVDB, options.LoadCatalogTimeout, &options.RestorePoint, options.Sandbox)
nmaReadCatEdOp, err := makeNMAReadCatalogEditorOpWithInitiator(initiator, &newVDB)
if err != nil {
return instructions, err
}

instructions = append(instructions,
&nmaPrepareDirectoriesOp,
&nmaNetworkProfileOp,
&nmaLoadRemoteCatalogOp,
&nmaReadCatEdOp,
)

return instructions, nil
Expand Down
4 changes: 2 additions & 2 deletions vclusterops/start_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (vcc VClusterCommands) produceStartDBPreCheck(options *VStartDatabaseOption

// find latest catalog to use for removal of nodes not in the catalog
if trimHostList {
nmaReadCatalogEditorOp, err := makeNMAReadCatalogEditorOpForStartDB(vdb, options.FirstStartAfterRevive)
nmaReadCatalogEditorOp, err := makeNMAReadCatalogEditorOpForStartDB(vdb, options.FirstStartAfterRevive, options.Sandbox)
if err != nil {
return instructions, err
}
Expand All @@ -314,7 +314,7 @@ func (vcc VClusterCommands) produceStartDBInstructions(options *VStartDatabaseOp
var instructions []clusterOp

// vdb here should contain only primary nodes
nmaReadCatalogEditorOp, err := makeNMAReadCatalogEditorOpForStartDB(vdb, options.FirstStartAfterRevive)
nmaReadCatalogEditorOp, err := makeNMAReadCatalogEditorOpForStartDB(vdb, options.FirstStartAfterRevive, options.Sandbox)
if err != nil {
return instructions, err
}
Expand Down
19 changes: 14 additions & 5 deletions vclusterops/vcluster_database_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (opt *DatabaseOptions) getVDBWhenDBIsDown(vcc VClusterCommands) (vdb VCoord
// step 2: get node details from cluster_config.json
vdb2 := VCoordinationDatabase{}
var instructions2 []clusterOp
currConfigFileSrcPath := opt.getCurrConfigFilePath()
currConfigFileSrcPath := opt.getCurrConfigFilePath(util.MainClusterSandbox)
nmaDownLoadFileOp, err := makeNMADownloadFileOp(opt.Hosts, currConfigFileSrcPath, currConfigFileDestPath, catalogPath,
opt.ConfigurationParameters, &vdb2)
if err != nil {
Expand Down Expand Up @@ -365,11 +365,20 @@ func (opt *DatabaseOptions) getVDBWhenDBIsDown(vcc VClusterCommands) (vdb VCoord
return vdb, nil
}

// getCurrConfigFilePath can make the current description file path using db name and communal storage location in the options
func (opt *DatabaseOptions) getCurrConfigFilePath() string {
// description file will be in the location: {communal_storage_location}/metadata/{db_name}/cluster_config.json
// getCurrConfigFilePath can make the current description file path using the database (or sandbox) name and
// communal storage location in the options
func (opt *DatabaseOptions) getCurrConfigFilePath(sandbox string) string {
descriptor := opt.DBName
if sandbox != util.MainClusterSandbox {
descriptor = sandbox
}
// For main cluster or a cluster without sandboxes, description file will be in the location:
// {communal_storage_location}/metadata/{db_name}/cluster_config.json
// an example: s3://tfminio/test_loc/metadata/test_db/cluster_config.json
descriptionFilePath := filepath.Join(opt.CommunalStorageLocation, descriptionFileMetadataFolder, opt.DBName, descriptionFileName)
// For sandboxes, description file will be in the location:
// {communal_storage_location}/metadata/{sandbox_name}/cluster_config.json
// an example: s3://tfminio/test_loc/metadata/sand/cluster_config.json
descriptionFilePath := filepath.Join(opt.CommunalStorageLocation, descriptionFileMetadataFolder, descriptor, descriptionFileName)
// filepath.Join() will change "://" of the remote communal storage path to ":/"
// as a result, we need to change the separator back to url format
descriptionFilePath = strings.Replace(descriptionFilePath, ":/", "://", 1)
Expand Down
Loading

0 comments on commit 0b19368

Please sign in to comment.