diff --git a/commands/cluster_command_launcher.go b/commands/cluster_command_launcher.go index 35b0b10..7fe5cf4 100644 --- a/commands/cluster_command_launcher.go +++ b/commands/cluster_command_launcher.go @@ -33,14 +33,16 @@ const defaultExecutablePath = "/opt/vertica/bin/vcluster" const CLIVersion = "1.2.0" const vclusterLogPathEnv = "VCLUSTER_LOG_PATH" -const vclusterKeyPathEnv = "VCLUSTER_KEY_PATH" -const vclusterCertPathEnv = "VCLUSTER_CERT_PATH" +const vclusterKeyFileEnv = "VCLUSTER_KEY_FILE" +const vclusterCertFileEnv = "VCLUSTER_CERT_FILE" // *Flag is for the flag name, *Key is for viper key name // They are bound together const ( dbNameFlag = "db-name" dbNameKey = "dbName" + dbUserFlag = "db-user" + dbUserKey = "dbUser" hostsFlag = "hosts" hostsKey = "hosts" catalogPathFlag = "catalog-path" @@ -59,10 +61,10 @@ const ( configParamKey = "configParam" logPathFlag = "log-path" logPathKey = "logPath" - keyPathFlag = "key-path" - keyPathKey = "keyPath" - certPathFlag = "cert-path" - certPathKey = "certPath" + keyFileFlag = "key-file" + keyFileKey = "keyFile" + certFileFlag = "cert-file" + certFileKey = "certFile" passwordFlag = "password" passwordKey = "password" passwordFileFlag = "password-file" @@ -79,9 +81,26 @@ const ( sandboxFlag = "sandbox" ) +// Flag and key for database replication +const ( + targetDBNameFlag = "target-db-name" + targetDBNameKey = "targetDBName" + targetHostFlag = "target-hosts" + targetHostKey = "targetHosts" + targetUserNameFlag = "target-db-user" + targetUserNameKey = "targetDBUser" + targetPasswordFileFlag = "target-password-file" + targetPasswordFileKey = "targetPasswordFile" + targetConnFlag = "target-conn" + targetConnKey = "targetConn" + sourceTLSConfigFlag = "source-tlsconfig" + sourceTLSConfigKey = "sourceTLSConfig" +) + // flags to viper key map var flagKeyMap = map[string]string{ dbNameFlag: dbNameKey, + dbUserFlag: dbUserKey, hostsFlag: hostsKey, catalogPathFlag: catalogPathKey, depotPathFlag: depotPathKey, @@ -91,14 +110,19 @@ var flagKeyMap = map[string]string{ eonModeFlag: eonModeKey, configParamFlag: configParamKey, logPathFlag: logPathKey, - keyPathFlag: keyPathKey, - certPathFlag: certPathKey, + keyFileFlag: keyFileKey, + certFileFlag: certFileKey, passwordFlag: passwordKey, passwordFileFlag: passwordFileKey, readPasswordFromPromptFlag: readPasswordFromPromptKey, configFlag: configKey, verboseFlag: verboseKey, outputFileFlag: outputFileKey, + targetDBNameFlag: targetDBNameKey, + targetHostFlag: targetHostKey, + targetUserNameFlag: targetUserNameKey, + targetPasswordFileFlag: targetPasswordFileKey, + sourceTLSConfigFlag: sourceTLSConfigKey, } const ( @@ -108,6 +132,8 @@ const ( manageConfigSubCmd = "manage_config" configRecoverSubCmd = "recover" configShowSubCmd = "show" + replicationSubCmd = "replication" + startReplicationSubCmd = "start" listAllNodesSubCmd = "list_allnodes" startDBSubCmd = "start_db" dropDBSubCmd = "drop_db" @@ -130,8 +156,8 @@ const ( type cmdGlobals struct { verbose bool file *os.File - keyPath string - certPath string + keyFile string + certFile string } var ( @@ -223,10 +249,10 @@ func setDBOptionsUsingViper(flag string) error { dbOptions.ConfigurationParameters = viper.GetStringMapString(configParamKey) case logPathFlag: *dbOptions.LogPath = viper.GetString(logPathKey) - case keyPathFlag: - globals.keyPath = viper.GetString(keyPathKey) - case certPathFlag: - globals.certPath = viper.GetString(certPathKey) + case keyFileFlag: + globals.keyFile = viper.GetString(keyFileKey) + case certFileFlag: + globals.certFile = viper.GetString(certFileKey) case verboseFlag: globals.verbose = viper.GetBool(verboseKey) default: @@ -244,12 +270,12 @@ func configViper(cmd *cobra.Command, flagsInConfig []string) error { // log-path is a flag that all the subcommands need flagsInConfig = append(flagsInConfig, logPathFlag) - // cert-path and key-path are not available for + // cert-file and key-file are not available for // - manage_config // - manage_config show if cmd.CalledAs() != manageConfigSubCmd && cmd.CalledAs() != configShowSubCmd { - flagsInConfig = append(flagsInConfig, certPathFlag, keyPathFlag) + flagsInConfig = append(flagsInConfig, certFileFlag, keyFileFlag) } // bind viper keys to cobra flags @@ -268,13 +294,13 @@ func configViper(cmd *cobra.Command, flagsInConfig []string) error { if err != nil { return fmt.Errorf("fail to bind viper key %q to environment variable %q: %w", logPathKey, vclusterLogPathEnv, err) } - err = viper.BindEnv(keyPathKey, vclusterKeyPathEnv) + err = viper.BindEnv(keyFileKey, vclusterKeyFileEnv) if err != nil { - return fmt.Errorf("fail to bind viper key %q to environment variable %q: %w", keyPathKey, vclusterKeyPathEnv, err) + return fmt.Errorf("fail to bind viper key %q to environment variable %q: %w", keyFileKey, vclusterKeyFileEnv, err) } - err = viper.BindEnv(certPathKey, vclusterCertPathEnv) + err = viper.BindEnv(certFileKey, vclusterCertFileEnv) if err != nil { - return fmt.Errorf("fail to bind viper key %q to environment variable %q: %w", certPathKey, vclusterCertPathEnv, err) + return fmt.Errorf("fail to bind viper key %q to environment variable %q: %w", certFileKey, vclusterCertFileEnv, err) } // load db options from config file to viper @@ -389,6 +415,17 @@ func OldMakeBasicCobraCmd(i cmdInterface, use, short, long string) *cobra.Comman return cmd } +// makeSimpleCobraCmd can make a simple cobra command for some vcluster commands +// such as replication and manage_config +func makeSimpleCobraCmd(use, short, long string) *cobra.Command { + return &cobra.Command{ + Use: use, + Short: short, + Long: long, + Args: cobra.NoArgs, + } +} + // constructCmds returns a list of commands that will be executed // by the cluster command launcher. func constructCmds() []*cobra.Command { @@ -416,6 +453,7 @@ func constructCmds() []*cobra.Command { // others makeCmdScrutinize(), makeCmdManageConfig(), + makeCmdReplication(), } } diff --git a/commands/cmd_base.go b/commands/cmd_base.go index 96eb492..799ccb9 100644 --- a/commands/cmd_base.go +++ b/commands/cmd_base.go @@ -108,26 +108,26 @@ func (c *CmdBase) setCommonFlags(cmd *cobra.Command, flags []string) { false, "Show the details of VCluster run in the console", ) - // keyPath and certPath are flags that all subcommands require, + // keyFile and certFile are flags that all subcommands require, // except for manage_config and `manage_config show` if cmd.Name() != manageConfigSubCmd && // VER-92992: remove this line once manage_config is not runnable cmd.Name() != configShowSubCmd { cmd.Flags().StringVar( - &globals.keyPath, - keyPathFlag, + &globals.keyFile, + keyFileFlag, "", "Path to the key file", ) - markFlagsFileName(cmd, map[string][]string{keyPathFlag: {"key"}}) + markFlagsFileName(cmd, map[string][]string{keyFileFlag: {"key"}}) cmd.Flags().StringVar( - &globals.certPath, - certPathFlag, + &globals.certFile, + certFileFlag, "", "Path to the cert file", ) - markFlagsFileName(cmd, map[string][]string{certPathFlag: {"pem", "crt"}}) - cmd.MarkFlagsRequiredTogether(keyPathFlag, certPathFlag) + markFlagsFileName(cmd, map[string][]string{certFileFlag: {"pem", "crt"}}) + cmd.MarkFlagsRequiredTogether(keyFileFlag, certFileFlag) } if util.StringInArray(outputFileFlag, flags) { cmd.Flags().StringVarP( @@ -138,6 +138,14 @@ func (c *CmdBase) setCommonFlags(cmd *cobra.Command, flags []string) { "Write output to this file instead of stdout", ) } + if util.StringInArray(dbUserFlag, flags) { + cmd.Flags().StringVar( + dbOptions.UserName, + dbUserFlag, + "", + "The username for connecting to the database", + ) + } } // setConfigFlags sets the config flag as well as all the common flags that @@ -291,27 +299,35 @@ func (c *CmdBase) setDBPassword(opt *vclusterops.DatabaseOptions) error { return nil } + password, err := c.passwordFileHelper(c.passwordFile) + if err != nil { + return err + } + *opt.Password = password + return nil +} + +func (c *CmdBase) passwordFileHelper(passwordFile string) (string, error) { if c.passwordFile == "" { - return fmt.Errorf("password file path is empty") + return "", fmt.Errorf("password file path is empty") } // hyphen(`-`) is used to indicate that input should come // from stdin rather than from a file - if c.passwordFile == "-" { + if passwordFile == "-" { password, err := readFromStdin() if err != nil { - return err - } - *opt.Password = strings.TrimSuffix(password, "\n") - } else { - // Read password from file - passwordBytes, err := os.ReadFile(c.passwordFile) - if err != nil { - return fmt.Errorf("error reading password from file %q: %w", c.passwordFile, err) + return "", err } - // Convert bytes to string, removing any newline characters - *opt.Password = strings.TrimSuffix(string(passwordBytes), "\n") + return strings.TrimSuffix(password, "\n"), nil } - return nil + + // Read password from file + passwordBytes, err := os.ReadFile(passwordFile) + if err != nil { + return "", fmt.Errorf("error reading password from file %q: %w", passwordFile, err) + } + // Convert bytes to string, removing any newline characters + return strings.TrimSuffix(string(passwordBytes), "\n"), nil } // usePassword returns true if at least one of the password @@ -349,15 +365,15 @@ func (c *CmdBase) initCmdOutputFile() (*os.File, error) { // getCertFilesFromPaths will update cert and key file from cert path options func (c *CmdBase) getCertFilesFromCertPaths(opt *vclusterops.DatabaseOptions) error { - if globals.certPath != "" { - certData, err := os.ReadFile(globals.certPath) + if globals.certFile != "" { + certData, err := os.ReadFile(globals.certFile) if err != nil { return fmt.Errorf("failed to read certificate file, details %w", err) } opt.Cert = string(certData) } - if globals.keyPath != "" { - keyData, err := os.ReadFile(globals.keyPath) + if globals.keyFile != "" { + keyData, err := os.ReadFile(globals.keyFile) if err != nil { return fmt.Errorf("failed to read private key file, details %w", err) } diff --git a/commands/cmd_re_ip.go b/commands/cmd_re_ip.go index 33b4ff7..a30d145 100644 --- a/commands/cmd_re_ip.go +++ b/commands/cmd_re_ip.go @@ -16,6 +16,8 @@ package commands import ( + "fmt" + "github.com/spf13/cobra" "github.com/vertica/vcluster/vclusterops" "github.com/vertica/vcluster/vclusterops/vlog" @@ -66,7 +68,7 @@ Examples: ) // common db flags - newCmd.setCommonFlags(cmd, []string{dbNameFlag, hostsFlag, catalogPathFlag}) + newCmd.setCommonFlags(cmd, []string{dbNameFlag, hostsFlag, catalogPathFlag, configFlag}) // local flags newCmd.setLocalFlags(cmd) @@ -112,16 +114,62 @@ func (c *CmdReIP) validateParse(logger vlog.Printer) error { func (c *CmdReIP) Run(vcc vclusterops.ClusterCommands) error { vcc.LogInfo("Called method Run()") - err := vcc.VReIP(c.reIPOptions) + + options := c.reIPOptions + + // load config info from the YAML config file + canUpdateConfig := true + dbConfig, err := readConfig() + if err != nil { + vcc.LogInfo("fail to read config file: %v", err) + canUpdateConfig = false + } + + // VER-92369 should clean up the block below + // as the GetDBConfig function will be removed + config, err := options.GetDBConfig(vcc) + if err != nil { + return err + } + options.Config = config + + err = vcc.VReIP(options) if err != nil { vcc.LogError(err, "fail to re-ip") return err } vcc.PrintInfo("Re-ip is successfully completed") + + // update config file after running re_ip + if canUpdateConfig { + c.UpdateConfig(dbConfig) + err = dbConfig.write(options.ConfigPath) + if err != nil { + fmt.Printf("Warning: fail to update config file, details %v\n", err) + } + } + return nil } +// UpdateConfig will update node addresses in the config object after re_ip +func (c *CmdReIP) UpdateConfig(dbConfig *DatabaseConfig) { + nodeNameToAddress := make(map[string]string) + for _, reIPInfo := range c.reIPOptions.ReIPList { + if reIPInfo.TargetAddress != "" { + nodeNameToAddress[reIPInfo.NodeName] = reIPInfo.TargetAddress + } + } + + for _, n := range dbConfig.Nodes { + newAddress, ok := nodeNameToAddress[n.Name] + if ok { + n.Address = newAddress + } + } +} + // SetDatabaseOptions will assign a vclusterops.DatabaseOptions instance to the one in CmdReIP func (c *CmdReIP) SetDatabaseOptions(opt *vclusterops.DatabaseOptions) { c.reIPOptions.DatabaseOptions = *opt diff --git a/commands/cmd_replication.go b/commands/cmd_replication.go new file mode 100644 index 0000000..a4daca5 --- /dev/null +++ b/commands/cmd_replication.go @@ -0,0 +1,30 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package commands + +import ( + "github.com/spf13/cobra" +) + +func makeCmdReplication() *cobra.Command { + cmd := makeSimpleCobraCmd( + replicationSubCmd, + "Handle database replication", + `This subcommand is used to start or show the status of database replication.`) + + cmd.AddCommand(makeCmdStartReplication()) + return cmd +} diff --git a/commands/cmd_start_db.go b/commands/cmd_start_db.go index f534cac..7283611 100644 --- a/commands/cmd_start_db.go +++ b/commands/cmd_start_db.go @@ -174,6 +174,8 @@ func (c *CmdStartDB) Run(vcc vclusterops.ClusterCommands) error { options := c.startDBOptions + // VER-92369 should clean up the block below + // as the GetDBConfig function will be removed // load vdb info from the YAML config file // get config from vertica_cluster.yaml config, err := options.GetDBConfig(vcc) @@ -190,10 +192,8 @@ func (c *CmdStartDB) Run(vcc vclusterops.ClusterCommands) error { vcc.PrintInfo("Successfully start the database %s", *options.DBName) - // update config file to fill nodes' subcluster information - if config == nil { - vcc.PrintWarning("cannot update config file as %s does not exist", options.ConfigPath) - } else { + // for Eon database, update config file to fill nodes' subcluster information + if options.OldIsEon.ToBool() { // write db info to vcluster config file err := writeConfig(vdb, vcc.GetLog()) if err != nil { diff --git a/commands/cmd_start_replication.go b/commands/cmd_start_replication.go new file mode 100644 index 0000000..1a5660e --- /dev/null +++ b/commands/cmd_start_replication.go @@ -0,0 +1,221 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package commands + +import ( + "fmt" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/vertica/vcluster/vclusterops" + "github.com/vertica/vcluster/vclusterops/util" + "github.com/vertica/vcluster/vclusterops/vlog" +) + +/* CmdStartReplication + * + * Implements ClusterCommand interface + */ +type CmdStartReplication struct { + startRepOptions *vclusterops.VReplicationDatabaseOptions + CmdBase + targetPasswordFile string + targetConnPath string +} + +func makeCmdStartReplication() *cobra.Command { + newCmd := &CmdStartReplication{} + opt := vclusterops.VReplicationDatabaseFactory() + newCmd.startRepOptions = &opt + + cmd := makeBasicCobraCmd( + newCmd, + startReplicationSubCmd, + "Start database replication", + `This subcommand starts a database replication. + +This subcommand copies table or schema data directly from one Eon Mode database's communal storage +to another. + +The --target-conn option serves as a collection file for gathering necessary target +information for replication. You need to run vcluster manage_connection to generate this +connection file in order to use this option. + +Examples: + # Start database replication with config and connection file + vcluster replication start --config /opt/vertica/config/vertica_cluster.yaml \ + --target-conn /opt/vertica/config/target_connection.yaml + + # Start database replication with user input and connection file + vcluster replication start --db-name test_db --hosts 10.20.30.40 \ + --target-conn /opt/vertica/config/target_connection.yaml + + # Start database replication with config and connection file + # tls option and tls-based authentication + vcluster replication start --config /opt/vertica/config/vertica_cluster.yaml \ + --key-file /path/to/key-file --cert-file /path/to/cert-file \ + --target-conn /opt/vertica/config/target_connection.yaml --source-tlsconfig test_tlsconfig + + # Start database replication with user input + # option and password-based authentication + vcluster replication start --db-name test_db --db-user dbadmin --hosts 10.20.30.40 --target-db-name platform_db \ + --target-hosts 10.20.30.43 --password-file /path/to/password-file --target-db-user dbadmin \ + --target-password-file /path/to/password-file +`, + []string{dbNameFlag, hostsFlag, ipv6Flag, configFlag, passwordFlag, dbUserFlag, eonModeFlag}, + ) + + // local flags + newCmd.setLocalFlags(cmd) + + // hide eon mode flag since we expect it to come from config file, not from user input + hideLocalFlags(cmd, []string{eonModeFlag}) + return cmd +} + +// setLocalFlags will set the local flags the command has +func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { + cmd.Flags().StringVar( + &c.startRepOptions.TargetDB, + targetDBNameFlag, + "", + "The target database that we will replicate to", + ) + cmd.Flags().StringSliceVar( + &c.startRepOptions.TargetHosts, + targetHostFlag, + []string{}, + "Comma-separated list of hosts in target database") + cmd.Flags().StringVar( + &c.startRepOptions.TargetUserName, + targetUserNameFlag, + "", + "The username for connecting to the target database", + ) + cmd.Flags().StringVar( + &c.startRepOptions.SourceTLSConfig, + sourceTLSConfigFlag, + "", + "The TLS configuration to use when connecting to the target database "+ + ", must exist in the source database", + ) + cmd.Flags().StringVar( + &c.targetConnPath, + targetConnFlag, + "", + "Path to the target connection file") + markFlagsFileName(cmd, map[string][]string{configFlag: {"yaml"}}) + + // password flags + cmd.Flags().StringVar( + &c.targetPasswordFile, + targetPasswordFileFlag, + "", + "Path to the file to read the password for target database. "+ + "If - is passed, the password is read from stdin", + ) +} + +func (c *CmdStartReplication) Parse(inputArgv []string, logger vlog.Printer) error { + c.argv = inputArgv + logger.LogMaskedArgParse(c.argv) + + // for some options, we do not want to use their default values, + // if they are not provided in cli, + // reset the value of those options to nil + c.OldResetUserInputOptions() + + // replication only works for an Eon db + // When eon mode cannot be found in config file, we set its value to true + if !viper.IsSet(eonModeKey) { + c.startRepOptions.IsEon = true + } + + return c.validateParse(logger) +} + +// all validations of the arguments should go in here +func (c *CmdStartReplication) validateParse(logger vlog.Printer) error { + logger.Info("Called validateParse()") + err := c.getCertFilesFromCertPaths(&c.startRepOptions.DatabaseOptions) + if err != nil { + return err + } + err = c.parseTargetHostList() + if err != nil { + return err + } + + err = c.parseTargetPassword() + if err != nil { + return err + } + + err = c.ValidateParseBaseOptions(&c.startRepOptions.DatabaseOptions) + if err != nil { + return err + } + + return c.setDBPassword(&c.startRepOptions.DatabaseOptions) +} + +func (c *CmdStartReplication) parseTargetHostList() error { + if len(c.startRepOptions.TargetHosts) > 0 { + err := util.ParseHostList(&c.startRepOptions.TargetHosts) + if err != nil { + return fmt.Errorf("must specify at least one target host to replicate") + } + } + return nil +} + +func (c *CmdStartReplication) parseTargetPassword() error { + options := c.startRepOptions + if !c.usePassword() { + // reset password option to nil if password is not provided in cli + options.TargetPassword = nil + return nil + } + if c.startRepOptions.TargetPassword == nil { + options.TargetPassword = new(string) + } + + password, err := c.passwordFileHelper(c.targetPasswordFile) + if err != nil { + return err + } + *options.TargetPassword = password + return nil +} + +func (c *CmdStartReplication) Run(vcc vclusterops.ClusterCommands) error { + vcc.LogInfo("Called method Run()") + + options := c.startRepOptions + + err := vcc.VReplicateDatabase(options) + if err != nil { + vcc.LogError(err, "fail to replicate to database", "targetDB", options.TargetDB) + return err + } + vcc.PrintInfo("Successfully replicate to database %s", options.TargetDB) + return nil +} + +// SetDatabaseOptions will assign a vclusterops.DatabaseOptions instance +func (c *CmdStartReplication) SetDatabaseOptions(opt *vclusterops.DatabaseOptions) { + c.startRepOptions.DatabaseOptions = *opt +} diff --git a/commands/user_input_test.go b/commands/user_input_test.go index 6e9c22f..f5400dc 100644 --- a/commands/user_input_test.go +++ b/commands/user_input_test.go @@ -74,3 +74,28 @@ func TestManageConfig(t *testing.T) { err = simulateVClusterCli("vcluster manage_config show recover") assert.ErrorContains(t, err, `unknown command "recover" for "vcluster manage_config show"`) } + +func TestManageReplication(t *testing.T) { + // vcluster replication should succeed and show help message + err := simulateVClusterCli("vcluster replication") + assert.NoError(t, err) + + err = simulateVClusterCli("vcluster replication start test") + assert.ErrorContains(t, err, `unknown command "test" for "vcluster replication start"`) +} + +func TestStartReplication(t *testing.T) { + // vcluster replication start should succeed + // since there is no op for this subcommand + err := simulateVClusterCli("vcluster replication start") + assert.NoError(t, err) + + var passwordFilePath = os.TempDir() + "/password.txt" + tempConfig, _ := os.Create(passwordFilePath) + tempConfig.Close() + defer os.Remove(passwordFilePath) + err = simulateVClusterCli("vcluster replication start --db-name platform_test_db --hosts" + + " 192.168.1.101 --target-db-name test_db --target-hosts 192.168.1.103 --target-password-file " + passwordFilePath + + " --password-file " + passwordFilePath) + assert.NoError(t, err) +} diff --git a/commands/vcluster_config.go b/commands/vcluster_config.go index 605445b..f5c0df0 100644 --- a/commands/vcluster_config.go +++ b/commands/vcluster_config.go @@ -273,6 +273,28 @@ func backupConfigFile(configFilePath string, logger vlog.Printer) error { return nil } +// read reads information from configFilePath to a DatabaseConfig object. +// It returns any read error encountered. +func readConfig() (dbConfig *DatabaseConfig, err error) { + configFilePath := dbOptions.ConfigPath + + if configFilePath == "" { + return nil, fmt.Errorf("no config file provided") + } + configBytes, err := os.ReadFile(configFilePath) + if err != nil { + return nil, fmt.Errorf("fail to read config file, details: %w", err) + } + + var config Config + err = yaml.Unmarshal(configBytes, &config) + if err != nil { + return nil, fmt.Errorf("fail to unmarshal config file, details: %w", err) + } + + return &config.Database, nil +} + // write writes configuration information to configFilePath. It returns // any write error encountered. The viper in-built write function cannot // work well(the order of keys cannot be customized) so we used yaml.Marshal() diff --git a/vclusterops/cluster_op.go b/vclusterops/cluster_op.go index 614c019..7f04a32 100644 --- a/vclusterops/cluster_op.go +++ b/vclusterops/cluster_op.go @@ -500,6 +500,7 @@ type ClusterCommands interface { VStartDatabase(options *VStartDatabaseOptions) (vdbPtr *VCoordinationDatabase, err error) VStartNodes(options *VStartNodesOptions) error VStopDatabase(options *VStopDatabaseOptions) error + VReplicateDatabase(options *VReplicationDatabaseOptions) error VFetchCoordinationDatabase(options *VFetchCoordinationDatabaseOptions) (VCoordinationDatabase, error) VUnsandbox(options *VUnsandboxOptions) error VStopSubcluster(options *VStopSubclusterOptions) error diff --git a/vclusterops/cluster_op_engine_context.go b/vclusterops/cluster_op_engine_context.go index 2849288..b0b65cc 100644 --- a/vclusterops/cluster_op_engine_context.go +++ b/vclusterops/cluster_op_engine_context.go @@ -23,6 +23,7 @@ type opEngineExecContext struct { nmaVDatabase nmaVDatabase upHosts []string // a sorted host list that contains all up nodes nodesInfo []NodeInfo + scNodesInfo []NodeInfo // a node list contains all nodes in a subcluster // This field is specifically used for sandboxing // as sandboxing requires all nodes in the subcluster to be sandboxed to be UP. diff --git a/vclusterops/helpers.go b/vclusterops/helpers.go index a06ddaf..6d4d1a9 100644 --- a/vclusterops/helpers.go +++ b/vclusterops/helpers.go @@ -112,10 +112,7 @@ type nodeStateInfo struct { } func (node *nodeStateInfo) asNodeInfo() (n NodeInfo, err error) { - n.Address = node.Address - n.Name = node.Name - n.State = node.State - n.CatalogPath = node.CatalogPath + n = node.asNodeInfoWoVer() // version can be, eg, v24.0.0- or v23.4.0-- including a hotfix or daily build date verWithHotfix := 3 verWithoutHotfix := 2 @@ -131,6 +128,15 @@ func (node *nodeStateInfo) asNodeInfo() (n NodeInfo, err error) { return } +// asNodeInfoWoVer will create a NodeInfo with empty Version and Revision +func (node *nodeStateInfo) asNodeInfoWoVer() (n NodeInfo) { + n.Address = node.Address + n.Name = node.Name + n.State = node.State + n.CatalogPath = node.CatalogPath + return +} + type nodesStateInfo struct { NodeList []*nodeStateInfo `json:"node_list"` } diff --git a/vclusterops/https_get_up_nodes_op.go b/vclusterops/https_get_up_nodes_op.go index bfcaefb..987d903 100644 --- a/vclusterops/https_get_up_nodes_op.go +++ b/vclusterops/https_get_up_nodes_op.go @@ -154,6 +154,7 @@ func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) err downHosts := []string{} sandboxInfo := make(map[string]string) upScNodes := mapset.NewSet[NodeInfo]() + scNodes := mapset.NewSet[NodeInfo]() for host, result := range op.clusterHTTPRequest.ResultCollection { op.logResponse(host, result) if !result.isPassing() { @@ -190,7 +191,7 @@ func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) err } // collect all the up hosts - err = op.collectUpHosts(nodesStates, host, upHosts, upScInfo, sandboxInfo, upScNodes) + err = op.collectUpHosts(nodesStates, host, upHosts, upScInfo, sandboxInfo, upScNodes, scNodes) if err != nil { allErrs = errors.Join(allErrs, err) return allErrs @@ -205,6 +206,7 @@ func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) err } } execContext.nodesInfo = upScNodes.ToSlice() + execContext.scNodesInfo = scNodes.ToSlice() execContext.upHostsToSandboxes = sandboxInfo ignoreErrors := op.processHostLists(upHosts, upScInfo, exceptionHosts, downHosts, sandboxInfo, execContext) if ignoreErrors { @@ -302,8 +304,8 @@ func (op *httpsGetUpNodesOp) validateHosts(nodesStates nodesStateInfo) error { return nil } -func (op *httpsGetUpNodesOp) collectUpHosts(nodesStates nodesStateInfo, host string, - upHosts mapset.Set[string], upScInfo, sandboxInfo map[string]string, upScNodes mapset.Set[NodeInfo]) (err error) { +func (op *httpsGetUpNodesOp) collectUpHosts(nodesStates nodesStateInfo, host string, upHosts mapset.Set[string], + upScInfo, sandboxInfo map[string]string, upScNodes, scNodes mapset.Set[NodeInfo]) (err error) { upMainNodeFound := false foundSC := false for _, node := range nodesStates.NodeList { @@ -324,14 +326,26 @@ func (op *httpsGetUpNodesOp) collectUpHosts(nodesStates nodesStateInfo, host str upMainNodeFound = true } } - if op.scName == node.Subcluster { - op.sandbox = node.Sandbox - var n NodeInfo + } + if op.scName == node.Subcluster { + op.sandbox = node.Sandbox + var n NodeInfo + // collect info for "UP" and "DOWN" nodes, ignore "UNKNOWN" nodes here + // because we want to avoid getting duplicate nodes' info. For a sandbox node, + // we will get two duplicate NodeInfo entries if we do not ignore "UNKNOWN" nodes: + // one with state "UNKNOWN" from main cluster, and the other with state "UP" + // from sandboxes. + if node.State == util.NodeUpState { if n, err = node.asNodeInfo(); err != nil { op.logger.PrintError("[%s] %s", op.name, err.Error()) } else { upScNodes.Add(n) + scNodes.Add(n) } + } else if node.State == util.NodeDownState { + // for "DOWN" node, we cannot get its version from https response + n = node.asNodeInfoWoVer() + scNodes.Add(n) } } } diff --git a/vclusterops/https_poll_subcluster_node_state_op.go b/vclusterops/https_poll_subcluster_node_state_op.go index 7788710..6b11e93 100644 --- a/vclusterops/https_poll_subcluster_node_state_op.go +++ b/vclusterops/https_poll_subcluster_node_state_op.go @@ -101,10 +101,8 @@ func (op *httpsPollSubclusterNodeStateOp) setupClusterHTTPRequest(hosts []string func (op *httpsPollSubclusterNodeStateOp) prepare(execContext *opEngineExecContext) error { // We need to ensure that the https request to fetch the node state goes to the sandboxed node // because the main cluster will report the status of sandboxed nodes as "UNKNOWN". - for host, sc := range execContext.upScInfo { - if sc == op.scName { - op.hosts = append(op.hosts, host) - } + for _, vnode := range execContext.scNodesInfo { + op.hosts = append(op.hosts, vnode.Address) } execContext.dispatcher.setup(op.hosts) return op.setupClusterHTTPRequest(op.hosts) diff --git a/vclusterops/https_unsandbox_subcluster_op.go b/vclusterops/https_unsandbox_subcluster_op.go index f6ad168..bc1f054 100644 --- a/vclusterops/https_unsandbox_subcluster_op.go +++ b/vclusterops/https_unsandbox_subcluster_op.go @@ -73,27 +73,28 @@ func (op *httpsUnsandboxingOp) setupRequestBody() error { } func (op *httpsUnsandboxingOp) prepare(execContext *opEngineExecContext) error { - var hosts []string var mainHost string if len(execContext.upHostsToSandboxes) == 0 { return fmt.Errorf(`[%s] Cannot find any up hosts in OpEngineExecContext`, op.name) } - // use shortlisted hosts to execute https post request, this host/hosts will be the initiator + // use an UP host in main cluster to execute the https post request for h, sb := range execContext.upHostsToSandboxes { if sb == "" { mainHost = h - } else { - hosts = append(hosts, h) + break } } - hosts = append(hosts, mainHost) + if mainHost == "" { + return fmt.Errorf(`[%s] Cannot find any up hosts of main cluster in OpEngineExecContext`, op.name) + } + op.hosts = []string{mainHost} err := op.setupRequestBody() if err != nil { return err } - execContext.dispatcher.setup(hosts) + execContext.dispatcher.setup(op.hosts) - return op.setupClusterHTTPRequest(hosts) + return op.setupClusterHTTPRequest(op.hosts) } func (op *httpsUnsandboxingOp) execute(execContext *opEngineExecContext) error { diff --git a/vclusterops/nma_delete_dir_op.go b/vclusterops/nma_delete_dir_op.go index 465b68c..d3a3681 100644 --- a/vclusterops/nma_delete_dir_op.go +++ b/vclusterops/nma_delete_dir_op.go @@ -103,10 +103,13 @@ func (op *nmaDeleteDirectoriesOp) setupClusterHTTPRequest(hosts []string) error func (op *nmaDeleteDirectoriesOp) prepare(execContext *opEngineExecContext) error { if op.sandbox { + if len(execContext.scNodesInfo) == 0 { + return fmt.Errorf(`[%s] Cannot find any node information of target subcluster in OpEngineExecContext`, op.name) + } op.hosts = []string{} op.hostRequestBodyMap = make(map[string]string) - for _, node := range execContext.nodesInfo { + for _, node := range execContext.scNodesInfo { p := deleteDirParams{} p.Directories = append(p.Directories, node.CatalogPath) p.ForceDelete = true diff --git a/vclusterops/nma_start_node_op.go b/vclusterops/nma_start_node_op.go index 08eb5d1..4221853 100644 --- a/vclusterops/nma_start_node_op.go +++ b/vclusterops/nma_start_node_op.go @@ -76,7 +76,10 @@ func (op *nmaStartNodeOp) updateRequestBody(execContext *opEngineExecContext) er } } } else { - for _, vnode := range execContext.nodesInfo { + if len(execContext.scNodesInfo) == 0 { + return fmt.Errorf(`[%s] Cannot find any node information of target subcluster in OpEngineExecContext`, op.name) + } + for _, vnode := range execContext.scNodesInfo { op.hosts = append(op.hosts, vnode.Address) hoststartCommand, ok := execContext.startupCommandMap[vnode.Name] if ok { diff --git a/vclusterops/nma_vertica_version_op.go b/vclusterops/nma_vertica_version_op.go index 3b24347..58d27c5 100644 --- a/vclusterops/nma_vertica_version_op.go +++ b/vclusterops/nma_vertica_version_op.go @@ -93,12 +93,18 @@ func (op *nmaVerticaVersionOp) setupClusterHTTPRequest(hosts []string) error { return nil } -func (op *nmaVerticaVersionOp) prepareSandboxVers(execContext *opEngineExecContext) { +func (op *nmaVerticaVersionOp) prepareSandboxVers(execContext *opEngineExecContext) error { // Add current unsandboxed sc hosts - for _, node := range execContext.nodesInfo { + if len(execContext.scNodesInfo) == 0 { + return fmt.Errorf(`[%s] Cannot find any node information of target subcluster in OpEngineExecContext`, op.name) + } + for _, node := range execContext.scNodesInfo { op.hosts = append(op.hosts, node.Address) } // Add Up main cluster hosts + if len(execContext.upHostsToSandboxes) == 0 { + return fmt.Errorf(`[%s] Cannot find any up hosts in OpEngineExecContext`, op.name) + } for h, sb := range execContext.upHostsToSandboxes { if sb == "" { op.hosts = append(op.hosts, h) @@ -110,6 +116,8 @@ func (op *nmaVerticaVersionOp) prepareSandboxVers(execContext *opEngineExecConte for _, host := range op.hosts { op.SCToHostVersionMap[sc][host] = "" } + + return nil } func (op *nmaVerticaVersionOp) prepare(execContext *opEngineExecContext) error { /* @@ -126,7 +134,10 @@ func (op *nmaVerticaVersionOp) prepare(execContext *opEngineExecContext) error { * */ if op.sandbox { - op.prepareSandboxVers(execContext) + err := op.prepareSandboxVers(execContext) + if err != nil { + return err + } } else if len(op.hosts) == 0 { if op.vdb != nil { // db is up diff --git a/vclusterops/re_ip.go b/vclusterops/re_ip.go index e43fc54..ab84998 100644 --- a/vclusterops/re_ip.go +++ b/vclusterops/re_ip.go @@ -124,14 +124,37 @@ func (vcc VClusterCommands) VReIP(options *VReIPOptions) error { * - Give the instructions to the VClusterOpEngine to run */ - err := options.validateAnalyzeOptions(vcc.Log) + // set db name and hosts + err := options.setDBNameAndHosts() + if err != nil { + return err + } + + // set catalog prefix + options.CatalogPrefix, err = options.getCatalogPrefix(options.Config) + if err != nil { + return err + } + + isEon, err := options.isEonMode(options.Config) + if err != nil { + return err + } + if isEon { + options.CommunalStorageLocation, err = options.getCommunalStorageLocation(options.Config) + if err != nil { + return err + } + } + + err = options.validateAnalyzeOptions(vcc.Log) if err != nil { return err } var pVDB *VCoordinationDatabase // retrieve database information from cluster_config.json for Eon databases - if options.OldIsEon.ToBool() { + if isEon { if *options.CommunalStorageLocation != "" { vdb, e := options.getVDBWhenDBIsDown(vcc) if e != nil { diff --git a/vclusterops/replication.go b/vclusterops/replication.go new file mode 100644 index 0000000..990d2d7 --- /dev/null +++ b/vclusterops/replication.go @@ -0,0 +1,126 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "fmt" + + "github.com/vertica/vcluster/vclusterops/util" + "github.com/vertica/vcluster/vclusterops/vlog" +) + +type VReplicationDatabaseOptions struct { + /* part 1: basic db info */ + DatabaseOptions + + /* part 2: replication info */ + TargetHosts []string + TargetDB string + TargetUserName string + TargetPassword *string + SourceTLSConfig string +} + +func VReplicationDatabaseFactory() VReplicationDatabaseOptions { + opt := VReplicationDatabaseOptions{} + // set default values to the params + opt.setDefaultValues() + return opt +} + +func (opt *VReplicationDatabaseOptions) validateEonOptions(_ vlog.Printer) error { + if !opt.IsEon { + return fmt.Errorf("replication is only supported in Eon mode") + } + return nil +} + +func (opt *VReplicationDatabaseOptions) validateParseOptions(logger vlog.Printer) error { + err := opt.validateEonOptions(logger) + if err != nil { + return err + } + return opt.validateBaseOptions(commandReplicationStart, logger) +} + +// analyzeOptions will modify some options based on what is chosen +func (opt *VReplicationDatabaseOptions) analyzeOptions() (err error) { + if len(opt.TargetHosts) > 0 { + // resolve RawHosts to be IP addresses + opt.TargetHosts, err = util.ResolveRawHostsToAddresses(opt.TargetHosts, opt.OldIpv6.ToBool()) + if err != nil { + return err + } + } + // we analyze host names when it is set in user input, otherwise we use hosts in yaml config + if len(opt.RawHosts) > 0 { + // resolve RawHosts to be IP addresses + hostAddresses, err := util.ResolveRawHostsToAddresses(opt.RawHosts, opt.OldIpv6.ToBool()) + if err != nil { + return err + } + opt.Hosts = hostAddresses + } + return nil +} + +func (opt *VReplicationDatabaseOptions) validateAnalyzeOptions(logger vlog.Printer) error { + if err := opt.validateParseOptions(logger); err != nil { + return err + } + return opt.analyzeOptions() +} + +// VReplicateDatabase can copy all table data and metadata from this cluster to another +func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOptions) error { + /* + * - Produce Instructions + * - Create a VClusterOpEngine + * - Give the instructions to the VClusterOpEngine to run + */ + + // validate and analyze options + err := options.validateAnalyzeOptions(vcc.Log) + if err != nil { + return err + } + + // produce database replication instructions + instructions, err := vcc.produceDBReplicationInstructions(options) + if err != nil { + return fmt.Errorf("fail to produce instructions, %w", err) + } + + // create a VClusterOpEngine, and add certs to the engine + certs := httpsCerts{key: options.Key, cert: options.Cert, caCert: options.CaCert} + clusterOpEngine := makeClusterOpEngine(instructions, &certs) + + // give the instructions to the VClusterOpEngine to run + runError := clusterOpEngine.run(vcc.Log) + if runError != nil { + return fmt.Errorf("fail to replicate database: %w", runError) + } + + return nil +} + +// The generated instructions will later perform the following operations necessary +// for a successful replication. Temporarily, There is no-op for this subcommand. +// We will implement the ops to do the actual replication in VER-92706 +func (vcc VClusterCommands) produceDBReplicationInstructions(_ *VReplicationDatabaseOptions) ([]clusterOp, error) { + var instructions []clusterOp + return instructions, nil +} diff --git a/vclusterops/sandbox.go b/vclusterops/sandbox.go index 8f2cd20..64233cc 100644 --- a/vclusterops/sandbox.go +++ b/vclusterops/sandbox.go @@ -115,8 +115,8 @@ func (vcc *VClusterCommands) produceSandboxSubclusterInstructions(options *VSand username := *options.UserName // Get all up nodes - httpsGetUpNodesOp, err := makeHTTPSGetUpNodesOp(*options.DBName, options.Hosts, - usePassword, username, options.Password, SandboxCmd) + httpsGetUpNodesOp, err := makeHTTPSGetUpScNodesOp(*options.DBName, options.Hosts, + usePassword, username, options.Password, SandboxCmd, *options.SCName) if err != nil { return instructions, err } diff --git a/vclusterops/start_db.go b/vclusterops/start_db.go index c678f23..ce9a75a 100644 --- a/vclusterops/start_db.go +++ b/vclusterops/start_db.go @@ -347,7 +347,7 @@ func (vcc VClusterCommands) produceStartDBInstructions(options *VStartDatabaseOp ) if options.OldIsEon.ToBool() { - httpsSyncCatalogOp, err := makeHTTPSSyncCatalogOp(options.Hosts, true, *options.UserName, options.Password, StartDBSyncCat) + httpsSyncCatalogOp, err := makeHTTPSSyncCatalogOp(options.Hosts, options.usePassword, *options.UserName, options.Password, StartDBSyncCat) if err != nil { return instructions, err } diff --git a/vclusterops/start_node.go b/vclusterops/start_node.go index d65af6c..e89632c 100644 --- a/vclusterops/start_node.go +++ b/vclusterops/start_node.go @@ -342,7 +342,8 @@ func (vcc VClusterCommands) produceStartNodesInstructions(startNodeInfo *VStartN ) if vdb.IsEon { - httpsSyncCatalogOp, err := makeHTTPSSyncCatalogOp(options.Hosts, true, *options.UserName, options.Password, StartNodeSyncCat) + httpsSyncCatalogOp, err := makeHTTPSSyncCatalogOp(options.Hosts, options.usePassword, *options.UserName, + options.Password, StartNodeSyncCat) if err != nil { return instructions, err } diff --git a/vclusterops/unsandbox.go b/vclusterops/unsandbox.go index d8dbbc0..9bb2c90 100644 --- a/vclusterops/unsandbox.go +++ b/vclusterops/unsandbox.go @@ -30,6 +30,8 @@ type VUnsandboxOptions struct { SCRawHosts []string // if restart the subcluster after unsandboxing it, the default value of it is true RestartSC bool + // if any node in the target subcluster is up. This is for internal use only. + hasUpNodeInSC bool } func VUnsandboxOptionsFactory() VUnsandboxOptions { @@ -126,6 +128,11 @@ func (vcc *VClusterCommands) unsandboxPreCheck(vdb *VCoordinationDatabase, optio return &SubclusterNotSandboxedError{SCName: *options.SCName} } sandboxedHosts = append(sandboxedHosts, vnode.Address) + // when the node state is not "DOWN" ("UP" or "UNKNOWN"), we consider + // the node is running + if vnode.State != util.NodeDownState { + options.hasUpNodeInSC = true + } } } @@ -149,8 +156,9 @@ func (vcc *VClusterCommands) unsandboxPreCheck(vdb *VCoordinationDatabase, optio // for a successful unsandbox_subcluster: // - Get UP nodes through HTTPS call, if any node is UP then the DB is UP and ready for running unsandboxing operation // Also get up nodes from fellow subclusters in the same sandbox. Also get all UP nodes info in the given subcluster -// - Stop the up subcluster hosts -// - Poll for stopped hosts to be down +// - If the subcluster is UP +// 1. Stop the up subcluster hosts +// 2. Poll for stopped hosts to be down // - Run unsandboxing for the user provided subcluster using the selected initiator host(s). // - Remove catalog dirs from unsandboxed hosts // - VCluster CLI will restart the unsandboxed hosts using below instructions, but k8s operator will skip the restart process @@ -179,19 +187,27 @@ func (vcc *VClusterCommands) produceUnsandboxSCInstructions(options *VUnsandboxO if err != nil { return instructions, err } + instructions = append(instructions, &httpsGetUpNodesOp) + + if options.hasUpNodeInSC { + // Stop the nodes in the subcluster that is to be unsandboxed + httpsStopNodeOp, e := makeHTTPSStopNodeOp(usePassword, username, options.Password, + nil) + if e != nil { + return instructions, e + } - // Stop the nodes in the subcluster that is to be unsandboxed - httpsStopNodeOp, err := makeHTTPSStopNodeOp(usePassword, username, options.Password, - nil) - if err != nil { - return instructions, err - } + // Poll for nodes down + httpsPollScDown, e := makeHTTPSPollSubclusterNodeStateDownOp(*options.SCName, + usePassword, username, options.Password) + if e != nil { + return instructions, e + } - // Poll for nodes down - httpsPollScDown, err := makeHTTPSPollSubclusterNodeStateDownOp(*options.SCName, - usePassword, username, options.Password) - if err != nil { - return instructions, err + instructions = append(instructions, + &httpsStopNodeOp, + &httpsPollScDown, + ) } // Run Unsandboxing @@ -208,9 +224,6 @@ func (vcc *VClusterCommands) produceUnsandboxSCInstructions(options *VUnsandboxO } instructions = append(instructions, - &httpsGetUpNodesOp, - &httpsStopNodeOp, - &httpsPollScDown, &httpsUnsandboxSubclusterOp, &nmaDeleteDirsOp, ) diff --git a/vclusterops/vcluster_database_options.go b/vclusterops/vcluster_database_options.go index affe7fa..37bed54 100644 --- a/vclusterops/vcluster_database_options.go +++ b/vclusterops/vcluster_database_options.go @@ -108,6 +108,7 @@ const ( commandShowRestorePoints = "show_restore_points" commandInstallPackages = "install_packages" commandConfigRecover = "manage_config_recover" + commandReplicationStart = "replication_start" ) func DatabaseOptionsFactory() DatabaseOptions {