diff --git a/README.md b/README.md index 34285f5..28d215c 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,14 @@ [![Go Reference](https://pkg.go.dev/badge/github.com/vertica/vcluster.svg)](https://pkg.go.dev/github.com/vertica/vcluster) -This repository contains the vcluster-ops Go library and command-line interface to administer a Vertica cluster with a REST API. The REST API endpoints are exposed by the following services: +This repository contains the vcluster-ops Go library and command-line +interface to administer a Vertica cluster with a REST API. The REST API +endpoints are exposed by the following services: - Node Management Agent (NMA) - Embedded HTTPS service -This CLI tool combines REST calls to provide a coherent Go interface so that you can perform the following administrator operations: +This CLI tool combines REST calls to provide a coherent Go interface so that +you can perform the following administrator operations: - Create a database - Scale a cluster up and down - Restart a cluster @@ -58,9 +61,14 @@ directories in this project. ## Usage -Each source file in `vclusterops/` contains a `VOptions` struct with option fields that you can set for that operation, and a `VOptionsFactory` factory function that returns a struct with sensible option defaults. General database and authentication options are available in `DatabaseOptions` in `vclusterops/vcluster_database_options.go`. +Each source file in `vclusterops/` contains a `VOptions` struct +with option fields that you can set for that operation, and a `VOptionsFactory` +factory function that returns a struct with sensible option defaults. General +database and authentication options are available in `DatabaseOptions` in +`vclusterops/vcluster_database_options.go`. -The following example imports the `vclusterops` library, and then calls functions from `vclusterops/create_db.go` to create a database: +The following example imports the `vclusterops` library, and then calls +functions from `vclusterops/create_db.go` to create a database: ``` @@ -94,4 +102,5 @@ We can use similar way to set up and call other vcluster-ops commands. ## Licensing -vcluster is open source code and is under the Apache 2.0 license. Please see `LICENSE` for details. \ No newline at end of file +vcluster is open source and is under the Apache 2.0 license. Please see +`LICENSE` for details. diff --git a/commands/cluster_command_launcher.go b/commands/cluster_command_launcher.go index 23742a9..927c3a0 100644 --- a/commands/cluster_command_launcher.go +++ b/commands/cluster_command_launcher.go @@ -59,6 +59,8 @@ const ( dataPathKey = "dataPath" communalStorageLocationFlag = "communal-storage-location" communalStorageLocationKey = "communalStorageLocation" + archiveNameFlag = "archive-name" + archiveNameKey = "archiveName" ipv6Flag = "ipv6" ipv6Key = "ipv6" eonModeFlag = "eon-mode" @@ -156,6 +158,7 @@ var flagKeyMap = map[string]string{ verboseFlag: verboseKey, outputFileFlag: outputFileKey, sandboxFlag: sandboxKey, + archiveNameFlag: archiveNameKey, targetDBNameFlag: targetDBNameKey, targetHostsFlag: targetHostsKey, targetUserNameFlag: targetUserNameKey, @@ -213,8 +216,10 @@ const ( showRestorePointsSubCmd = "show_restore_points" installPkgSubCmd = "install_packages" // hidden Cmds (for internal testing only) - getDrainingStatusSubCmd = "get_draining_status" promoteSandboxSubCmd = "promote_sandbox" + createArchiveCmd = "create_archive" + saveRestorePointsSubCmd = "save_restore_point" + getDrainingStatusSubCmd = "get_draining_status" ) // cmdGlobals holds global variables shared by multiple @@ -580,6 +585,8 @@ func constructCmds() []*cobra.Command { // hidden cmds (for internal testing only) makeCmdGetDrainingStatus(), makeCmdPromoteSandbox(), + makeCmdCreateArchive(), + makeCmdSaveRestorePoint(), } } diff --git a/commands/cmd_create_archive.go b/commands/cmd_create_archive.go new file mode 100644 index 0000000..6a08f29 --- /dev/null +++ b/commands/cmd_create_archive.go @@ -0,0 +1,176 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package commands + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/vertica/vcluster/vclusterops" + "github.com/vertica/vcluster/vclusterops/vlog" +) + +/* CmdCreateArchive + * + * Parses arguments to create-archive and calls + * the high-level function for create-archive. + * + * Implements ClusterCommand interface + */ + +type CmdCreateArchive struct { + CmdBase + createArchiveOptions *vclusterops.VCreateArchiveOptions +} + +func makeCmdCreateArchive() *cobra.Command { + newCmd := &CmdCreateArchive{} + opt := vclusterops.VCreateArchiveFactory() + newCmd.createArchiveOptions = &opt + + cmd := makeBasicCobraCmd( + newCmd, + createArchiveCmd, + "Create an archive in a given archive name and number.", + `Create an archive in a given archive name and number. + +Examples: + # Create an archive in a given archive name + vcluster create_archive --db-name DBNAME --archive-name ARCHIVE_ONE + + # Create an archive in a given archive name and number of restore point(default 3) + vcluster create_archive --db-name DBNAME --archive-name ARCHIVE_ONE \ + --num-restore-points 6 + + # Create an archive in main cluster with user input password + vcluster create_archive --db-name DBNAME --archive-name ARCHIVE_ONE \ + --hosts 10.20.30.40,10.20.30.41,10.20.30.42 --password "PASSWORD" + + # Create an archive for a sandbox + vcluster create_archive --db-name DBNAME --archive-name ARCHIVE_ONE \ + --sandbox SANDBOX_ONE --password "PASSWORD" + +`, + []string{dbNameFlag, configFlag, passwordFlag, + hostsFlag, ipv6Flag, eonModeFlag}, + ) + + // local flags + newCmd.setLocalFlags(cmd) + + // require archive-name + markFlagsRequired(cmd, archiveNameFlag) + + // hide this subcommand + cmd.Hidden = true + + return cmd +} + +// setLocalFlags will set the local flags the command has +func (c *CmdCreateArchive) setLocalFlags(cmd *cobra.Command) { + cmd.Flags().StringVar( + &c.createArchiveOptions.ArchiveName, + archiveNameFlag, + "", + "The name of archive to be created.", + ) + cmd.Flags().IntVar( + &c.createArchiveOptions.NumRestorePoint, + "num-restore-points", + vclusterops.CreateArchiveDefaultNumRestore, + "Maximum number of restore points that archive can contain."+ + "If you provide 0, the number of restore points will be unlimited. "+ + "By default, the value is 0. Negative number is disallowed.", + ) + cmd.Flags().StringVar( + &c.createArchiveOptions.Sandbox, + sandboxFlag, + "", + "The name of target sandbox", + ) +} + +func (c *CmdCreateArchive) Parse(inputArgv []string, logger vlog.Printer) error { + c.argv = inputArgv + logger.LogArgParse(&c.argv) + + // for some options, we do not want to use their default values, + // if they are not provided in cli, + // reset the value of those options to nil + c.ResetUserInputOptions(&c.createArchiveOptions.DatabaseOptions) + + // create_archive only works for an Eon db so we assume the user always runs this subcommand + // on an Eon db. When Eon mode cannot be found in config file, we set its value to true. + if !viper.IsSet(eonModeKey) { + c.createArchiveOptions.IsEon = true + } + + return c.validateParse(logger) +} + +// all validations of the arguments should go in here +func (c *CmdCreateArchive) validateParse(logger vlog.Printer) error { + logger.Info("Called validateParse()") + + err := c.ValidateParseBaseOptions(&c.createArchiveOptions.DatabaseOptions) + if err != nil { + return err + } + + err = c.setConfigParam(&c.createArchiveOptions.DatabaseOptions) + if err != nil { + return err + } + + if !c.usePassword() { + err = c.getCertFilesFromCertPaths(&c.createArchiveOptions.DatabaseOptions) + if err != nil { + return err + } + } + + err = c.setDBPassword(&c.createArchiveOptions.DatabaseOptions) + if err != nil { + return err + } + + return nil +} + +func (c *CmdCreateArchive) Analyze(logger vlog.Printer) error { + logger.Info("Called method Analyze()") + return nil +} + +func (c *CmdCreateArchive) Run(vcc vclusterops.ClusterCommands) error { + vcc.LogInfo("Called method Run()") + + options := c.createArchiveOptions + + err := vcc.VCreateArchive(options) + if err != nil { + vcc.LogError(err, "failed to create archive", "archiveName", options.ArchiveName) + return err + } + + vcc.DisplayInfo("Successfully created archive: %s", options.ArchiveName) + return nil +} + +// SetDatabaseOptions will assign a vclusterops.DatabaseOptions instance to the one in CmdCreateArchive +func (c *CmdCreateArchive) SetDatabaseOptions(opt *vclusterops.DatabaseOptions) { + c.createArchiveOptions.DatabaseOptions = *opt +} diff --git a/commands/cmd_create_db.go b/commands/cmd_create_db.go index a7164a5..a869996 100644 --- a/commands/cmd_create_db.go +++ b/commands/cmd_create_db.go @@ -275,7 +275,6 @@ func (c *CmdCreateDB) Run(vcc vclusterops.ClusterCommands) error { vcc.V(1).Info("Called method Run()") vdb, createError := vcc.VCreateDatabase(c.createDBOptions) if createError != nil { - vcc.LogError(createError, "Failed to create the database.") return createError } diff --git a/commands/cmd_save_restore_point.go b/commands/cmd_save_restore_point.go new file mode 100644 index 0000000..c3c6fc7 --- /dev/null +++ b/commands/cmd_save_restore_point.go @@ -0,0 +1,161 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package commands + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/vertica/vcluster/vclusterops" + "github.com/vertica/vcluster/vclusterops/vlog" +) + +/* CmdSaveRestorePoint + * + * Parses arguments to save-restore-points and calls + * the high-level function for save-restore-points. + * + * Implements ClusterCommand interface + */ + +type CmdSaveRestorePoint struct { + CmdBase + saveRestoreOptions *vclusterops.VSaveRestorePointOptions +} + +func makeCmdSaveRestorePoint() *cobra.Command { + newCmd := &CmdSaveRestorePoint{} + opt := vclusterops.VSaveRestorePointFactory() + newCmd.saveRestoreOptions = &opt + + cmd := makeBasicCobraCmd( + newCmd, + saveRestorePointsSubCmd, + "Save a restore point in a given archive.", + `Save a restore point in a given archive. + +Examples: + # Save restore point in a given archive with user input + vcluster save_restore_point --db-name test_db \ + --archive-name ARCHIVE_ONE + + # Save restore point for a sandbox + vcluster save_restore_point --db-name test_db \ + --archive-name ARCHIVE_ONE --sandbox SANDBOX_ONE + +`, + []string{dbNameFlag, hostsFlag, passwordFlag, + ipv6Flag, configFlag, eonModeFlag}, + ) + + // local flags + newCmd.setLocalFlags(cmd) + + // require db-name and archive-name + markFlagsRequired(cmd, dbNameFlag, archiveNameFlag) + + // hide this subcommand + cmd.Hidden = true + + return cmd +} + +// setLocalFlags will set the local flags the command has +func (c *CmdSaveRestorePoint) setLocalFlags(cmd *cobra.Command) { + cmd.Flags().StringVar( + &c.saveRestoreOptions.ArchiveName, + archiveNameFlag, + "", + "Collection of restore points that belong to a certain archive.", + ) + cmd.Flags().StringVar( + &c.saveRestoreOptions.Sandbox, + sandboxFlag, + "", + "The name of target sandbox", + ) +} + +func (c *CmdSaveRestorePoint) Parse(inputArgv []string, logger vlog.Printer) error { + c.argv = inputArgv + logger.LogArgParse(&c.argv) + + // for some options, we do not want to use their default values, + // if they are not provided in cli, + // reset the value of those options to nil + c.ResetUserInputOptions(&c.saveRestoreOptions.DatabaseOptions) + + // save_restore_point only works for an Eon db so we assume the user always runs this subcommand + // on an Eon db. When Eon mode cannot be found in config file, we set its value to true. + if !viper.IsSet(eonModeKey) { + c.saveRestoreOptions.IsEon = true + } + + return c.validateParse(logger) +} + +// all validations of the arguments should go in here +func (c *CmdSaveRestorePoint) validateParse(logger vlog.Printer) error { + logger.Info("Called validateParse()") + + err := c.ValidateParseBaseOptions(&c.saveRestoreOptions.DatabaseOptions) + if err != nil { + return err + } + + if !c.usePassword() { + err = c.getCertFilesFromCertPaths(&c.saveRestoreOptions.DatabaseOptions) + if err != nil { + return err + } + } + + err = c.setConfigParam(&c.saveRestoreOptions.DatabaseOptions) + if err != nil { + return err + } + + err = c.setDBPassword(&c.saveRestoreOptions.DatabaseOptions) + if err != nil { + return err + } + + return nil +} + +func (c *CmdSaveRestorePoint) Analyze(logger vlog.Printer) error { + logger.Info("Called method Analyze()") + return nil +} + +func (c *CmdSaveRestorePoint) Run(vcc vclusterops.ClusterCommands) error { + vcc.LogInfo("Called method Run()") + + options := c.saveRestoreOptions + + err := vcc.VSaveRestorePoint(options) + if err != nil { + vcc.LogError(err, "failed to save restore points", "DBName", options.DBName) + return err + } + + vcc.DisplayInfo("Successfully saved restore points in database %s", options.DBName) + return nil +} + +// SetDatabaseOptions will assign a vclusterops.DatabaseOptions instance to the one in CmdSaveRestorePoint +func (c *CmdSaveRestorePoint) SetDatabaseOptions(opt *vclusterops.DatabaseOptions) { + c.saveRestoreOptions.DatabaseOptions = *opt +} diff --git a/commands/cmd_start_replication.go b/commands/cmd_start_replication.go index ba84f90..421dea5 100644 --- a/commands/cmd_start_replication.go +++ b/commands/cmd_start_replication.go @@ -136,7 +136,8 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { "[Required] The absolute path to the connection file created with the create_connection command, "+ "containing the database name, hosts, and password (if any) for the target database. "+ "Alternatively, you can provide this information manually with --target-db-name, "+ - "--target-hosts, and --target-password-file") + "--target-hosts, and --target-password-file", + ) cmd.Flags().BoolVar( &c.startRepOptions.Async, asyncFlag, @@ -145,7 +146,7 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { "Default value is false.", ) cmd.Flags().StringVar( - &c.startRepOptions.ObjectName, + &c.startRepOptions.TableOrSchemaName, tableOrSchemaNameFlag, "", "(only async replication)The object name we want to copy from the source side. The available"+ @@ -274,12 +275,18 @@ func (c *CmdStartReplication) Run(vcc vclusterops.ClusterCommands) error { options := c.startRepOptions - err := vcc.VReplicateDatabase(options) + transactionID, err := vcc.VReplicateDatabase(options) if err != nil { vcc.LogError(err, "failed to replicate to database", "targetDB", options.TargetDB) return err } - vcc.DisplayInfo("Successfully replicated to database %s", options.TargetDB) + + if options.Async { + vcc.DisplayInfo("Successfully started replication to database %s. Transaction ID: %d", options.TargetDB, transactionID) + } else { + vcc.DisplayInfo("Successfully replicated to database %s", options.TargetDB) + } + return nil } diff --git a/vclusterops/add_node.go b/vclusterops/add_node.go index cf3d7c8..2ef79e8 100644 --- a/vclusterops/add_node.go +++ b/vclusterops/add_node.go @@ -409,7 +409,8 @@ func (vcc VClusterCommands) produceAddNodeInstructions(vdb *VCoordinationDatabas produceTransferConfigOps(&instructions, nil, vdb.HostList, - vdb /*db configurations retrieved from a running db*/) + vdb, /*db configurations retrieved from a running db*/ + nil /*Sandbox name*/) nmaStartNewNodesOp := makeNMAStartNodeOpWithVDB(newHosts, options.StartUpConf, vdb) httpsPollNodeStateOp, err := makeHTTPSPollNodeStateOp(newHosts, usePassword, username, password, options.TimeOut) diff --git a/vclusterops/cluster_op.go b/vclusterops/cluster_op.go index 8693902..84e08f0 100644 --- a/vclusterops/cluster_op.go +++ b/vclusterops/cluster_op.go @@ -72,10 +72,11 @@ const ( ) const ( - SuccessCode = 200 - MultipleChoiceCode = 300 - UnauthorizedCode = 401 - InternalErrorCode = 500 + SuccessCode = 200 + MultipleChoiceCode = 300 + UnauthorizedCode = 401 + PreconditionFailedCode = 412 + InternalErrorCode = 500 ) // hostHTTPResult is used to save result of an Adapter's sendRequest(...) function @@ -97,13 +98,17 @@ const respSuccStatusCode = 0 // The HTTP response with a 401 status code can have several scenarios: // 1. Wrong password // 2. Wrong certificate -// 3. The local node has not yet joined the cluster; the HTTP server will accept connections once the node joins the cluster. -// HTTPCheckDBRunningOp in create_db need to check all scenarios to see any HTTP running -// For HTTPSPollNodeStateOp in start_db, it requires only handling the first and second scenarios +// HTTPCheckDBRunningOp in create_db and HTTPSPollNodeStateOp in start_db need to handle these scenarios func (hostResult *hostHTTPResult) isUnauthorizedRequest() bool { return hostResult.statusCode == UnauthorizedCode } +// The HTTP response with a 412 may happen if +// the local node has not yet joined the cluster; the HTTP server will accept connections once the node joins the cluster. +func (hostResult *hostHTTPResult) hasPreconditionFailed() bool { + return hostResult.statusCode == PreconditionFailedCode +} + // isSuccess returns true if status code is 200 func (hostResult *hostHTTPResult) isSuccess() bool { return hostResult.statusCode == SuccessCode @@ -129,7 +134,8 @@ func (hostResult *hostHTTPResult) isInternalError() bool { } func (hostResult *hostHTTPResult) isHTTPRunning() bool { - if hostResult.isPassing() || hostResult.isUnauthorizedRequest() || hostResult.isInternalError() { + if hostResult.isPassing() || hostResult.isUnauthorizedRequest() || + hostResult.isInternalError() || hostResult.hasPreconditionFailed() { return true } return false @@ -544,6 +550,7 @@ type ClusterCommands interface { VAlterSubclusterType(options *VAlterSubclusterTypeOptions) error VCheckVClusterServerPid(options *VCheckVClusterServerPidOptions) ([]string, error) VCreateDatabase(options *VCreateDatabaseOptions) (VCoordinationDatabase, error) + VCreateArchive(options *VCreateArchiveOptions) error VDropDatabase(options *VDropDatabaseOptions) error VFetchCoordinationDatabase(options *VFetchCoordinationDatabaseOptions) (VCoordinationDatabase, error) VFetchNodesDetails(options *VFetchNodesDetailsOptions) (NodesDetails, error) @@ -556,11 +563,12 @@ type ClusterCommands interface { VRemoveNode(options *VRemoveNodeOptions) (VCoordinationDatabase, error) VRemoveSubcluster(removeScOpt *VRemoveScOptions) (VCoordinationDatabase, error) VRenameSubcluster(options *VRenameSubclusterOptions) error - VReplicateDatabase(options *VReplicationDatabaseOptions) error + VReplicateDatabase(options *VReplicationDatabaseOptions) (int64, error) VReviveDatabase(options *VReviveDatabaseOptions) (dbInfo string, vdbPtr *VCoordinationDatabase, err error) VSandbox(options *VSandboxOptions) error VScrutinize(options *VScrutinizeOptions) error VShowRestorePoints(options *VShowRestorePointsOptions) (restorePoints []RestorePoint, err error) + VSaveRestorePoint(options *VSaveRestorePointOptions) (err error) VStartDatabase(options *VStartDatabaseOptions) (vdbPtr *VCoordinationDatabase, err error) VStartNodes(options *VStartNodesOptions) error VStartSubcluster(startScOpt *VStartScOptions) error diff --git a/vclusterops/cmd_type.go b/vclusterops/cmd_type.go index 005c671..25189ec 100644 --- a/vclusterops/cmd_type.go +++ b/vclusterops/cmd_type.go @@ -20,6 +20,7 @@ const ( SandboxSCCmd UnsandboxSCCmd ShowRestorePointsCmd + SaveRestorePointsCmd InstallPackagesCmd ConfigRecoverCmd GetDrainingStatusCmd @@ -40,6 +41,7 @@ const ( AddNodeSyncCat StartNodeSyncCat RemoveNodeSyncCat + CreateArchiveCmd PollSubclusterStateCmd ) @@ -60,6 +62,7 @@ var cmdStringMap = map[CmdType]string{ SandboxSCCmd: "sandbox_subcluster", UnsandboxSCCmd: "unsandbox_subcluster", ShowRestorePointsCmd: "show_restore_points", + SaveRestorePointsCmd: "save_restore_point", InstallPackagesCmd: "install_packages", ConfigRecoverCmd: "manage_config_recover", GetDrainingStatusCmd: "get_draining_status", @@ -79,6 +82,7 @@ var cmdStringMap = map[CmdType]string{ AddNodeSyncCat: "add_node_sync_cat", StartNodeSyncCat: "start_node_sync_cat", RemoveNodeSyncCat: "remove_node_sync_cat", + CreateArchiveCmd: "create_archive", PollSubclusterStateCmd: "poll_subcluster_state", } diff --git a/vclusterops/coordinator_database.go b/vclusterops/coordinator_database.go index b5644d9..839458c 100644 --- a/vclusterops/coordinator_database.go +++ b/vclusterops/coordinator_database.go @@ -317,6 +317,30 @@ func (vdb *VCoordinationDatabase) filterPrimaryNodes() { vdb.HostList = maps.Keys(vdb.HostNodeMap) } +// Update and limit the hostlist based on status and sandbox info +// If sandbox provided, pick up sandbox up hosts and return. Else return up hosts. +func (vdb *VCoordinationDatabase) filterUpHostlist(inputHosts []string, sandbox string) []string { + var clusterHosts []string + var upSandboxHosts []string + + for _, h := range inputHosts { + vnode, ok := vdb.HostNodeMap[h] + if !ok { + // host address not found in vdb, skip it + continue + } + if vnode.Sandbox == "" && vnode.State == util.NodeUpState { + clusterHosts = append(clusterHosts, vnode.Address) + } else if vnode.Sandbox == sandbox && vnode.State == util.NodeUpState { + upSandboxHosts = append(upSandboxHosts, vnode.Address) + } + } + if sandbox == "" { + return clusterHosts + } + return upSandboxHosts +} + // hostIsUp returns true if the host is up func (vdb *VCoordinationDatabase) hostIsUp(hostName string) bool { return vdb.HostNodeMap[hostName].State == util.NodeUpState diff --git a/vclusterops/create_archive.go b/vclusterops/create_archive.go new file mode 100644 index 0000000..cb2a640 --- /dev/null +++ b/vclusterops/create_archive.go @@ -0,0 +1,195 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "fmt" + + "github.com/vertica/vcluster/vclusterops/util" + "github.com/vertica/vcluster/vclusterops/vlog" +) + +const CreateArchiveDefaultNumRestore = 0 + +type VCreateArchiveOptions struct { + DatabaseOptions + + // Required arguments + ArchiveName string + // Optional arguments + NumRestorePoint int + Sandbox string +} + +func VCreateArchiveFactory() VCreateArchiveOptions { + options := VCreateArchiveOptions{} + // set default values to the params + options.setDefaultValues() + + return options +} + +func (options *VCreateArchiveOptions) setDefaultValues() { + options.DatabaseOptions.setDefaultValues() +} + +func (options *VCreateArchiveOptions) validateRequiredOptions(logger vlog.Printer) error { + err := options.validateEonOptions(logger) + if err != nil { + return err + } + err = options.validateBaseOptions(CreateArchiveCmd, logger) + if err != nil { + return err + } + if options.ArchiveName == "" { + return fmt.Errorf("must specify an archive name") + } + + err = util.ValidateArchiveName(options.ArchiveName) + if err != nil { + return err + } + return nil +} + +func (options *VCreateArchiveOptions) validateExtraOptions() error { + if options.NumRestorePoint < 0 { + return fmt.Errorf("number of restore points must greater than 0") + } + if options.Sandbox != "" { + return util.ValidateSandboxName(options.Sandbox) + } + return nil +} + +func (options *VCreateArchiveOptions) validateEonOptions(_ vlog.Printer) error { + if !options.IsEon { + return fmt.Errorf("create archive is only supported in Eon mode") + } + return nil +} + +func (options *VCreateArchiveOptions) validateParseOptions(log vlog.Printer) error { + // validate required parameters + err := options.validateRequiredOptions(log) + if err != nil { + return err + } + + err = options.validateEonOptions(log) + if err != nil { + return err + } + + err = options.validateAuthOptions(CreateArchiveCmd.CmdString(), log) + if err != nil { + return err + } + + // validate all other params + err = options.validateExtraOptions() + if err != nil { + return err + } + return nil +} + +// analyzeOptions will modify some options based on what is chosen +func (options *VCreateArchiveOptions) analyzeOptions() (err error) { + // we analyze host names when it is set in user input, otherwise we use hosts in yaml config + if len(options.RawHosts) > 0 { + // resolve RawHosts to be IP addresses + hostAddresses, err := util.ResolveRawHostsToAddresses(options.RawHosts, options.IPv6) + if err != nil { + return err + } + options.Hosts = hostAddresses + } + return nil +} + +func (options *VCreateArchiveOptions) validateAnalyzeOptions(log vlog.Printer) error { + if err := options.validateParseOptions(log); err != nil { + return err + } + if err := options.analyzeOptions(); err != nil { + return err + } + if err := options.setUsePassword(log); err != nil { + return err + } + return options.validateUserName(log) +} + +func (vcc VClusterCommands) VCreateArchive(options *VCreateArchiveOptions) error { + /* + * - Produce Instructions + * - Create a VClusterOpEngine + * - Give the instructions to the VClusterOpEngine to run + */ + + // validate and analyze options + err := options.validateAnalyzeOptions(vcc.Log) + if err != nil { + return err + } + + // produce create acchive instructions + instructions, err := vcc.produceCreateArchiveInstructions(options) + if err != nil { + return fmt.Errorf("fail to produce instructions, %w", err) + } + + // create a VClusterOpEngine, and add certs to the engine + clusterOpEngine := makeClusterOpEngine(instructions, options) + + // give the instructions to the VClusterOpEngine to run + runError := clusterOpEngine.run(vcc.Log) + if runError != nil { + return fmt.Errorf("fail to create archive: %w", runError) + } + return nil +} + +// The generated instructions will later perform the following operations necessary +// for a successful create_archive: +// - Retrieve VDB from HTTP endpoints +// - Run create archive query +func (vcc *VClusterCommands) produceCreateArchiveInstructions(options *VCreateArchiveOptions) ([]clusterOp, error) { + var instructions []clusterOp + vdb := makeVCoordinationDatabase() + + err := vcc.getVDBFromRunningDBIncludeSandbox(&vdb, &options.DatabaseOptions, util.MainClusterSandbox) + if err != nil { + return instructions, err + } + // get up hosts + hosts := options.Hosts + // Trim host list + hosts = vdb.filterUpHostlist(hosts, options.Sandbox) + bootstrapHost := []string{getInitiator(hosts)} + + httpsCreateArchiveOp, err := makeHTTPSCreateArchiveOp(bootstrapHost, options.usePassword, + options.UserName, options.Password, options.ArchiveName, options.NumRestorePoint) + if err != nil { + return instructions, err + } + + instructions = append(instructions, + &httpsCreateArchiveOp) + return instructions, nil +} diff --git a/vclusterops/create_db.go b/vclusterops/create_db.go index 03d4992..5a4a41b 100644 --- a/vclusterops/create_db.go +++ b/vclusterops/create_db.go @@ -295,6 +295,7 @@ func (vcc VClusterCommands) VCreateDatabase(options *VCreateDatabaseOptions) (VC vdb := makeVCoordinationDatabase() err := vdb.setFromCreateDBOptions(options, vcc.Log) if err != nil { + vcc.Log.Error(err, "fail to create database") return vdb, err } // produce instructions @@ -489,7 +490,8 @@ func (vcc VClusterCommands) produceCreateDBWorkerNodesInstructions( &instructions, bootstrapHost, vdb.HostList, - vdb /*db configurations retrieved from a running db*/) + vdb, /*db configurations retrieved from a running db*/ + nil /*sandbox name*/) nmaStartNewNodesOp := makeNMAStartNodeOpWithVDB(newNodeHosts, options.StartUpConf, vdb) instructions = append(instructions, &nmaStartNewNodesOp) } diff --git a/vclusterops/fetch_node_state.go b/vclusterops/fetch_node_state.go index 45d9646..657fdea 100644 --- a/vclusterops/fetch_node_state.go +++ b/vclusterops/fetch_node_state.go @@ -16,6 +16,9 @@ type VFetchNodeStateOptions struct { GetVersion bool SkipDownDatabase bool + + // only use this if options.RawHosts contains only sandboxed nodes + SandboxedNodesOnly bool } func VFetchNodeStateOptionsFactory() VFetchNodeStateOptions { @@ -72,7 +75,12 @@ func (vcc VClusterCommands) VFetchNodeState(options *VFetchNodeStateOptions) ([] // this vdb is used to fetch node version var vdb VCoordinationDatabase - err = vcc.getVDBFromRunningDBIncludeSandbox(&vdb, &options.DatabaseOptions, util.MainClusterSandbox) + + if options.SandboxedNodesOnly || util.IsK8sEnvironment() { + err = vcc.getVDBFromRunningDBIncludeSandbox(&vdb, &options.DatabaseOptions, util.MainClusterSandbox) + } else { + err = vcc.getVDBFromMainRunningDBContainsSandbox(&vdb, &options.DatabaseOptions) + } if err != nil { vcc.Log.PrintInfo("Error from vdb build: %s", err.Error()) @@ -91,7 +99,13 @@ func (vcc VClusterCommands) VFetchNodeState(options *VFetchNodeStateOptions) ([] return vcc.fetchNodeStateFromDownDB(options) } - // produce list_all_nodes instructions + nodeStates := buildNodeStateList(&vdb, false /*forDownDatabase*/) + // return the result if no need to get version info + if !options.GetVersion { + return nodeStates, nil + } + + // produce instructions to fill node information instructions, err := vcc.produceListAllNodesInstructions(options, &vdb) if err != nil { return nil, fmt.Errorf("fail to produce instructions, %w", err) @@ -102,7 +116,6 @@ func (vcc VClusterCommands) VFetchNodeState(options *VFetchNodeStateOptions) ([] // give the instructions to the VClusterOpEngine to run runError := clusterOpEngine.run(vcc.Log) - nodeStates := clusterOpEngine.execContext.nodesInfo if runError == nil { // fill node version for i, nodeInfo := range nodeStates { @@ -116,34 +129,9 @@ func (vcc VClusterCommands) VFetchNodeState(options *VFetchNodeStateOptions) ([] nodeInfo.Address) } } - - return nodeStates, nil - } - - // error out in case of wrong certificate or password - if len(clusterOpEngine.execContext.hostsWithWrongAuth) > 0 { - return nodeStates, - fmt.Errorf("wrong certificate or password on hosts %v", clusterOpEngine.execContext.hostsWithWrongAuth) - } - - // if failed to get node info from a running database, - // we will try to get it by reading catalog editor - upNodeCount := 0 - for _, n := range nodeStates { - if n.State == util.NodeUpState { - upNodeCount++ - } } - if upNodeCount == 0 { - if options.SkipDownDatabase { - return []NodeInfo{}, rfc7807.New(rfc7807.FetchDownDatabase) - } - - return vcc.fetchNodeStateFromDownDB(options) - } - - return nodeStates, runError + return nodeStates, nil } func (vcc VClusterCommands) fetchNodeStateFromDownDB(options *VFetchNodeStateOptions) ([]NodeInfo, error) { @@ -163,18 +151,7 @@ func (vcc VClusterCommands) fetchNodeStateFromDownDB(options *VFetchNodeStateOpt return nodeStates, err } - for _, h := range vdb.HostList { - var nodeInfo NodeInfo - n := vdb.HostNodeMap[h] - nodeInfo.Address = n.Address - nodeInfo.Name = n.Name - nodeInfo.CatalogPath = n.CatalogPath - nodeInfo.Subcluster = n.Subcluster - nodeInfo.IsPrimary = n.IsPrimary - nodeInfo.Version = n.Version - nodeInfo.State = util.NodeDownState - nodeStates = append(nodeStates, nodeInfo) - } + nodeStates = buildNodeStateList(&vdb, true /*forDownDatabase*/) return nodeStates, nil } @@ -186,73 +163,64 @@ func (vcc VClusterCommands) produceListAllNodesInstructions( vdb *VCoordinationDatabase) ([]clusterOp, error) { var instructions []clusterOp - // get hosts - hosts := options.Hosts - - // validate user name - usePassword := false - if options.Password != nil { - usePassword = true - err := options.validateUserName(vcc.Log) - if err != nil { - return instructions, err - } - } - nmaHealthOp := makeNMAHealthOpSkipUnreachable(options.Hosts) nmaReadVerticaVersionOp := makeNMAReadVerticaVersionOp(vdb) - // Trim host list - hosts = options.updateHostlist(vcc, vdb, hosts) - - httpsCheckNodeStateOp, err := makeHTTPSCheckNodeStateOp(hosts, - usePassword, options.UserName, options.Password) - if err != nil { - return instructions, err - } - if options.GetVersion { instructions = append(instructions, &nmaHealthOp, &nmaReadVerticaVersionOp) } - instructions = append(instructions, - &httpsCheckNodeStateOp, - ) - return instructions, nil } -// Update and limit the hostlist based on status and sandbox info -// Note: if we have any UP main cluster host in the input list, the trimmed hostlist would always contain -// -// only main cluster UP hosts. -func (options *VFetchNodeStateOptions) updateHostlist(vcc VClusterCommands, vdb *VCoordinationDatabase, inputHosts []string) []string { - var mainClusterHosts []string - var upSandboxHosts []string - - for _, h := range inputHosts { - vnode, ok := vdb.HostNodeMap[h] - if !ok { - // host address not found in vdb, skip it - continue +func buildNodeStateList(vdb *VCoordinationDatabase, forDownDatabase bool) []NodeInfo { + var nodeStates []NodeInfo + + // a map from a subcluster name to whether it is primary + // Context: if a node is primary, the subcluster it belongs to is a primary subcluster. + // If any of the nodes are down in such a primary subcluster, HTTPSUpdateNodeStateOp cannot correctly + // update its IsPrimary value, because this op sends request to each host. + // We use the following scMap to check whether any node is primary in each subcluster, + // then update other nodes' IsPrimary value in this subcluster. + scMap := make(map[string]bool) + + for _, h := range vdb.HostList { + var nodeInfo NodeInfo + n := vdb.HostNodeMap[h] + nodeInfo.Address = n.Address + nodeInfo.CatalogPath = n.CatalogPath + nodeInfo.IsPrimary = n.IsPrimary + nodeInfo.Name = n.Name + nodeInfo.Sandbox = n.Sandbox + if forDownDatabase { + nodeInfo.State = util.NodeDownState + } else { + nodeInfo.State = n.State } - if vnode.Sandbox == "" && (vnode.State == util.NodeUpState || vnode.State == util.NodeUnknownState) { - mainClusterHosts = append(mainClusterHosts, vnode.Address) - } else if vnode.State == util.NodeUpState { - upSandboxHosts = append(upSandboxHosts, vnode.Address) + nodeInfo.Subcluster = n.Subcluster + nodeInfo.Version = n.Version + + nodeStates = append(nodeStates, nodeInfo) + + if !forDownDatabase { + if isPrimary, exists := scMap[n.Subcluster]; exists { + scMap[n.Subcluster] = isPrimary || n.IsPrimary + } else { + scMap[n.Subcluster] = n.IsPrimary + } } } - if len(mainClusterHosts) > 0 { - vcc.Log.PrintWarning("Main cluster UP node found in host list. The status would be fetched from a main cluster host!") - return mainClusterHosts - } - if len(upSandboxHosts) > 0 { - vcc.Log.PrintWarning("Only sandboxed UP nodes found in host list. The status would be fetched from a sandbox host!") - return upSandboxHosts + + // update IsPrimary of the nodes for running database + if !forDownDatabase { + for i := 0; i < len(nodeStates); i++ { + nodeInfo := nodeStates[i] + scName := nodeInfo.Subcluster + nodeStates[i].IsPrimary = scMap[scName] + } } - // We do not have an up host, so better try with complete input hostlist - return inputHosts + return nodeStates } diff --git a/vclusterops/helpers.go b/vclusterops/helpers.go index 33f3736..e225b3e 100644 --- a/vclusterops/helpers.go +++ b/vclusterops/helpers.go @@ -36,15 +36,15 @@ const ( // produceTransferConfigOps generates instructions to transfert some config // files from a sourceConfig node to target nodes. func produceTransferConfigOps(instructions *[]clusterOp, sourceConfigHost, - targetHosts []string, vdb *VCoordinationDatabase) { + targetHosts []string, vdb *VCoordinationDatabase, sandbox *string) { var verticaConfContent string nmaDownloadVerticaConfigOp := makeNMADownloadConfigOp( - "NMADownloadVerticaConfigOp", sourceConfigHost, "config/vertica", &verticaConfContent, vdb) + "NMADownloadVerticaConfigOp", sourceConfigHost, "config/vertica", &verticaConfContent, vdb, sandbox) nmaUploadVerticaConfigOp := makeNMAUploadConfigOp( "NMAUploadVerticaConfigOp", sourceConfigHost, targetHosts, "config/vertica", &verticaConfContent, vdb) var spreadConfContent string nmaDownloadSpreadConfigOp := makeNMADownloadConfigOp( - "NMADownloadSpreadConfigOp", sourceConfigHost, "config/spread", &spreadConfContent, vdb) + "NMADownloadSpreadConfigOp", sourceConfigHost, "config/spread", &spreadConfContent, vdb, sandbox) nmaUploadSpreadConfigOp := makeNMAUploadConfigOp( "NMAUploadSpreadConfigOp", sourceConfigHost, targetHosts, "config/spread", &spreadConfContent, vdb) *instructions = append(*instructions, diff --git a/vclusterops/https_check_node_state_op.go b/vclusterops/https_check_node_state_op.go deleted file mode 100644 index 46adb6a..0000000 --- a/vclusterops/https_check_node_state_op.go +++ /dev/null @@ -1,138 +0,0 @@ -/* - (c) Copyright [2023-2024] Open Text. - Licensed under the Apache License, Version 2.0 (the "License"); - You may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package vclusterops - -import ( - "errors" - "fmt" - - "github.com/vertica/vcluster/vclusterops/util" -) - -type httpsCheckNodeStateOp struct { - opBase - opHTTPSBase -} - -func makeHTTPSCheckNodeStateOp(hosts []string, - useHTTPPassword bool, - userName string, - httpsPassword *string, -) (httpsCheckNodeStateOp, error) { - op := httpsCheckNodeStateOp{} - op.name = "HTTPCheckNodeStateOp" - op.description = "Check node state from running database" - // The hosts are the ones we are going to talk to. - // They can be a subset of the actual host information that we return, - // as if any of the hosts is responsive, spread can give us the info of all nodes - op.hosts = hosts - op.useHTTPPassword = useHTTPPassword - - err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName) - if err != nil { - return op, err - } - - op.userName = userName - op.httpsPassword = httpsPassword - return op, nil -} - -func (op *httpsCheckNodeStateOp) setupClusterHTTPRequest(hosts []string) error { - for _, host := range hosts { - httpRequest := hostHTTPRequest{} - httpRequest.Method = GetMethod - httpRequest.buildHTTPSEndpoint("nodes") - if op.useHTTPPassword { - httpRequest.Password = op.httpsPassword - httpRequest.Username = op.userName - } - op.clusterHTTPRequest.RequestCollection[host] = httpRequest - } - - return nil -} - -func (op *httpsCheckNodeStateOp) prepare(execContext *opEngineExecContext) error { - execContext.dispatcher.setup(op.hosts) - - return op.setupClusterHTTPRequest(op.hosts) -} - -func (op *httpsCheckNodeStateOp) execute(execContext *opEngineExecContext) error { - if err := op.runExecute(execContext); err != nil { - return err - } - - return op.processResult(execContext) -} - -func (op *httpsCheckNodeStateOp) processResult(execContext *opEngineExecContext) error { - var allErrs error - respondingNodeCount := 0 - - for host, result := range op.clusterHTTPRequest.ResultCollection { - op.logResponse(host, result) - - if result.isUnauthorizedRequest() { - op.logger.PrintError("[%s] unauthorized request: %s", op.name, result.content) - execContext.hostsWithWrongAuth = append(execContext.hostsWithWrongAuth, host) - // return here because we assume that - // we will get the same error across other nodes - allErrs = errors.Join(allErrs, result.err) - return allErrs - } - - if !result.isPassing() { - // for any error, we continue to the next node - if result.isInternalError() { - op.logger.PrintError("[%s] internal error of the /nodes endpoint: %s", op.name, result.content) - // At internal error originated from the server, so its a - // response, just not a successful one. - respondingNodeCount++ - } - allErrs = errors.Join(allErrs, result.err) - continue - } - - // parse the /nodes endpoint response - respondingNodeCount++ - nodesStates := nodesStateInfo{} - err := op.parseAndCheckResponse(host, result.content, &nodesStates) - if err != nil { - err = fmt.Errorf("[%s] fail to parse result on host %s: %w", - op.name, host, err) - allErrs = errors.Join(allErrs, err) - continue - } - - nodesInfo := nodesInfo{} - for _, node := range nodesStates.NodeList { - n := node.asNodeInfoWithoutVer() - nodesInfo.NodeList = append(nodesInfo.NodeList, n) - } - // successful case, write the result into exec context - execContext.nodesInfo = nodesInfo.NodeList - op.logger.PrintInfo("reporting results as obtained from the host [%s] ", host) - return nil - } - - return allErrs -} - -func (op *httpsCheckNodeStateOp) finalize(_ *opEngineExecContext) error { - return nil -} diff --git a/vclusterops/https_create_archive_op.go b/vclusterops/https_create_archive_op.go new file mode 100644 index 0000000..343e281 --- /dev/null +++ b/vclusterops/https_create_archive_op.go @@ -0,0 +1,152 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/vertica/vcluster/vclusterops/util" +) + +type httpsCreateArchiveOp struct { + opBase + opHTTPSBase + ArchiveName string + NumRestorePoints int + hostRequestBodyMap map[string]string +} + +type createArchiveRequestData struct { + NumRestorePoints int `json:"num_restore_points,omitempty"` +} + +func (op *httpsCreateArchiveOp) setupRequestBody(hosts []string) error { + op.hostRequestBodyMap = make(map[string]string) + + for _, host := range hosts { + createArchiveData := createArchiveRequestData{} + if op.NumRestorePoints != CreateArchiveDefaultNumRestore { + createArchiveData.NumRestorePoints = op.NumRestorePoints + } + dataBytes, err := json.Marshal(createArchiveData) + if err != nil { + return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err) + } + + op.hostRequestBodyMap[host] = string(dataBytes) + } + + return nil +} + +// makeHTTPSCreateArchiveOp will make an op that call vertica-http service to create archive for database +func makeHTTPSCreateArchiveOp(hosts []string, useHTTPPassword bool, userName string, + httpsPassword *string, archiveName string, numRestorePoints int, +) (httpsCreateArchiveOp, error) { + op := httpsCreateArchiveOp{} + op.name = "HTTPSCreateArchiveOp" + op.description = "Create archive for database" + op.hosts = hosts + op.useHTTPPassword = useHTTPPassword + if useHTTPPassword { + err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName) + if err != nil { + return op, err + } + op.userName = userName + op.httpsPassword = httpsPassword + } + op.ArchiveName = archiveName + op.NumRestorePoints = numRestorePoints + return op, nil +} + +func (op *httpsCreateArchiveOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildHTTPSEndpoint(util.ArchiveEndpoint + "/" + op.ArchiveName) + if op.useHTTPPassword { + httpRequest.Password = op.httpsPassword + httpRequest.Username = op.userName + } + httpRequest.RequestData = op.hostRequestBodyMap[host] + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *httpsCreateArchiveOp) prepare(execContext *opEngineExecContext) error { + err := op.setupRequestBody(op.hosts) + if err != nil { + return err + } + execContext.dispatcher.setup(op.hosts) + + return op.setupClusterHTTPRequest(op.hosts) +} + +func (op *httpsCreateArchiveOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *httpsCreateArchiveOp) processResult(_ *opEngineExecContext) error { + var allErrs error + + // every host needs to have a successful result, otherwise we fail this op + // because we want depot created successfully on all hosts + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + + if result.isUnauthorizedRequest() { + return fmt.Errorf("[%s] wrong password/certificate for https service on host %s", + op.name, host) + } + + if !result.isPassing() { + allErrs = errors.Join(allErrs, result.err) + // not break here because we want to log all the failed nodes + continue + } + + /* decode the json-format response + The successful response object will be a dictionary like below: + { + "detail": "" + } + + */ + _, err := op.parseAndCheckMapResponse(host, result.content) + if err != nil { + err = fmt.Errorf(`[%s] fail to parse result on host %s, details: %w`, op.name, host, err) + allErrs = errors.Join(allErrs, err) + // not break here because we want to log all the failed nodes + continue + } + } + return allErrs +} + +func (op *httpsCreateArchiveOp) finalize(_ *opEngineExecContext) error { + return nil +} diff --git a/vclusterops/https_get_nodes_info_op.go b/vclusterops/https_get_nodes_info_op.go index 66e462b..3eb9065 100644 --- a/vclusterops/https_get_nodes_info_op.go +++ b/vclusterops/https_get_nodes_info_op.go @@ -113,6 +113,13 @@ func (op *httpsGetNodesInfoOp) processResult(_ *opEngineExecContext) error { for host, result := range op.clusterHTTPRequest.ResultCollection { op.logResponse(host, result) + // A host may have precondition failed, such as + // "Local node has not joined cluster yet, HTTP server will accept connections when the node has joined the cluster" + // In this case, we skip use the information from that host + if result.hasPreconditionFailed() { + continue + } + if result.isUnauthorizedRequest() { detail := fmt.Sprintf("[%s] wrong password/certificate for https service on host %s", op.name, host) diff --git a/vclusterops/https_startup_command_op.go b/vclusterops/https_startup_command_op.go index 2a9c432..e4cb5d4 100644 --- a/vclusterops/https_startup_command_op.go +++ b/vclusterops/https_startup_command_op.go @@ -129,13 +129,23 @@ func (op *httpsStartUpCommandOp) prepare(execContext *opEngineExecContext) error } } else { var primaryUpHosts []string + var upHosts []string for host, vnode := range op.vdb.HostNodeMap { - if vnode.IsPrimary && vnode.State == util.NodeUpState && vnode.Sandbox == op.sandbox { - primaryUpHosts = append(primaryUpHosts, host) - break + // If we do not find a primary up host in the same cluster(or sandbox), try to find a secondary up host + if vnode.State == util.NodeUpState && vnode.Sandbox == op.sandbox { + if vnode.IsPrimary { + primaryUpHosts = append(primaryUpHosts, host) + break + } + upHosts = append(upHosts, host) } } - op.hosts = primaryUpHosts + if len(primaryUpHosts) > 0 { + op.hosts = primaryUpHosts + } else { + op.logger.Info("could not find any primary UP nodes, considering secondary UP nodes.") + op.hosts = []string{upHosts[0]} + } } execContext.dispatcher.setup(op.hosts) diff --git a/vclusterops/https_stop_db_op.go b/vclusterops/https_stop_db_op.go index 37e102b..3821e7a 100644 --- a/vclusterops/https_stop_db_op.go +++ b/vclusterops/https_stop_db_op.go @@ -21,6 +21,7 @@ import ( "regexp" "strconv" + mapset "github.com/deckarep/golang-set/v2" "github.com/vertica/vcluster/vclusterops/util" ) @@ -87,6 +88,7 @@ func (op *httpsStopDBOp) prepare(execContext *opEngineExecContext) error { sandboxOnly := false var mainHost string var hosts []string + sandboxes := mapset.NewSet[string]() for h, sb := range execContext.upHostsToSandboxes { if sb == op.sandbox && sb != "" { // stop db only on sandbox @@ -96,7 +98,8 @@ func (op *httpsStopDBOp) prepare(execContext *opEngineExecContext) error { } if sb == "" { mainHost = h - } else { + } else if !sandboxes.Contains(sb) { + sandboxes.Add(sb) hosts = append(hosts, h) } } @@ -124,6 +127,7 @@ func (op *httpsStopDBOp) execute(execContext *opEngineExecContext) error { func (op *httpsStopDBOp) processResult(_ *opEngineExecContext) error { var allErrs error re := regexp.MustCompile(`Set subcluster \(.*\) to draining state.*`) + regHang := regexp.MustCompile(`context\s+deadline\s+exceeded\s+\(Client\.Timeout\s+exceeded\s+while\s+awaiting\s+headers\)`) for host, result := range op.clusterHTTPRequest.ResultCollection { op.logResponse(host, result) @@ -135,6 +139,11 @@ func (op *httpsStopDBOp) processResult(_ *opEngineExecContext) error { } if !result.isPassing() { allErrs = errors.Join(allErrs, result.err) + if regHang.MatchString(result.err.Error()) { + err := fmt.Errorf("hint: use NMA endpoint /v1/vertica-process/signal?signal_type=kill to terminate a hanging Vertica " + + "process on the failed host") + allErrs = errors.Join(allErrs, err) + } continue } diff --git a/vclusterops/https_update_node_state_op.go b/vclusterops/https_update_node_state_op.go index 401f9ba..a8a8703 100644 --- a/vclusterops/https_update_node_state_op.go +++ b/vclusterops/https_update_node_state_op.go @@ -87,6 +87,19 @@ func (op *httpsUpdateNodeStateOp) processResult(execContext *opEngineExecContext for host, result := range op.clusterHTTPRequest.ResultCollection { op.logResponse(host, result) + // A host may have precondition failed, such as + // "Local node has not joined cluster yet, HTTP server will accept connections when the node has joined the cluster" + // In this case, we mark the node status as UNKNOWN + if result.hasPreconditionFailed() { + vnode, ok := op.vdb.HostNodeMap[host] + if !ok { + return fmt.Errorf("cannot find host %s in vdb", host) + } + vnode.State = util.NodeUnknownState + + continue + } + if result.isUnauthorizedRequest() { op.logger.PrintError("[%s] unauthorized request: %s", op.name, result.content) execContext.hostsWithWrongAuth = append(execContext.hostsWithWrongAuth, host) @@ -124,6 +137,7 @@ func (op *httpsUpdateNodeStateOp) processResult(execContext *opEngineExecContext return fmt.Errorf("cannot find host %s in vdb", host) } vnode.State = nodeInfo.State + vnode.IsPrimary = nodeInfo.IsPrimary } else { // if the result format is wrong on any of the hosts, we should throw an error return fmt.Errorf(util.NodeInfoCountMismatch, op.name, len(nodesInformation.NodeList), host) diff --git a/vclusterops/nma_download_config.go b/vclusterops/nma_download_config.go index e929989..c4b0181 100644 --- a/vclusterops/nma_download_config.go +++ b/vclusterops/nma_download_config.go @@ -33,6 +33,7 @@ type nmaDownloadConfigOp struct { endpoint string fileContent *string vdb *VCoordinationDatabase + sandbox *string } func makeNMADownloadConfigOp( @@ -41,6 +42,7 @@ func makeNMADownloadConfigOp( endpoint string, fileContent *string, vdb *VCoordinationDatabase, + sandbox *string, ) nmaDownloadConfigOp { op := nmaDownloadConfigOp{} op.name = opName @@ -53,7 +55,7 @@ func makeNMADownloadConfigOp( } op.fileContent = fileContent op.vdb = vdb - + op.sandbox = sandbox return op } @@ -115,17 +117,29 @@ func (op *nmaDownloadConfigOp) prepare(execContext *opEngineExecContext) error { // For startNodes, If the sourceConfigHost input is a nil value, we find any UP primary nodes as source host to update the host input. // we update the catalogPathMap for next download operation's steps from node information by using HTTPS /v1/nodes var primaryUpHosts []string + var upHosts []string for host, vnode := range op.vdb.HostNodeMap { - if vnode.IsPrimary && vnode.State == util.NodeUpState { - primaryUpHosts = append(primaryUpHosts, host) - op.catalogPathMap[host] = getCatalogPath(vnode.CatalogPath) - break + if vnode.State == util.NodeUpState { + // If we do not find a primary up host in the same cluster(or sandbox), try to find a secondary up host + if vnode.IsPrimary { + primaryUpHosts = append(primaryUpHosts, host) + op.catalogPathMap[host] = getCatalogPath(vnode.CatalogPath) + break + } else if op.sandbox != nil && vnode.Sandbox == *op.sandbox { + upHosts = append(upHosts, host) + op.catalogPathMap[host] = getCatalogPath(vnode.CatalogPath) + } } } if len(primaryUpHosts) == 0 { - return fmt.Errorf("could not find any primary up nodes") + op.logger.Info("could not find any primary UP nodes, considering secondary UP nodes.") + if len(upHosts) == 0 { + return fmt.Errorf("could not find any up nodes") + } + op.hosts = []string{upHosts[0]} + } else { + op.hosts = primaryUpHosts } - op.hosts = primaryUpHosts } execContext.dispatcher.setup(op.hosts) diff --git a/vclusterops/nma_poll_replication_status.go b/vclusterops/nma_poll_replication_status.go new file mode 100644 index 0000000..0029987 --- /dev/null +++ b/vclusterops/nma_poll_replication_status.go @@ -0,0 +1,182 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "encoding/json" + "errors" + "fmt" + "slices" + + "github.com/vertica/vcluster/vclusterops/util" +) + +type nmaPollReplicationStatusOp struct { + opBase + TargetDatabaseOptions + hostRequestBodyMap map[string]string + sandbox string + vdb *VCoordinationDatabase + existingTransactionIDs *[]int64 + newTransactionID *int64 +} + +func makeNMAPollReplicationStatusOp(targetDBOpt *TargetDatabaseOptions, targetUsePassword bool, + sandbox string, vdb *VCoordinationDatabase, existingTransactionIDs *[]int64, newTransactionID *int64) (nmaPollReplicationStatusOp, error) { + op := nmaPollReplicationStatusOp{} + op.name = "NMAPollReplicationStatusOp" + op.description = "Retrieve asynchronous replication transaction ID" + op.TargetDB = targetDBOpt.TargetDB + op.TargetHosts = targetDBOpt.TargetHosts + op.sandbox = sandbox + op.vdb = vdb + op.existingTransactionIDs = existingTransactionIDs + op.newTransactionID = newTransactionID + + if targetUsePassword { + err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, targetDBOpt.TargetUserName) + if err != nil { + return op, err + } + op.TargetUserName = targetDBOpt.TargetUserName + op.TargetPassword = targetDBOpt.TargetPassword + } + + return op, nil +} + +func (op *nmaPollReplicationStatusOp) updateRequestBody(hosts []string) error { + op.hostRequestBodyMap = make(map[string]string) + + for _, host := range hosts { + requestData := nmaReplicationStatusRequestData{} + requestData.DBName = op.TargetDB + requestData.ExcludedTransactionIDs = *op.existingTransactionIDs + requestData.GetTransactionIDsOnly = true + requestData.TransactionID = 0 + requestData.UserName = op.TargetUserName + requestData.Password = op.TargetPassword + + dataBytes, err := json.Marshal(requestData) + if err != nil { + return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err) + } + + op.hostRequestBodyMap[host] = string(dataBytes) + } + + return nil +} + +func (op *nmaPollReplicationStatusOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildNMAEndpoint("replicate/status") + httpRequest.RequestData = op.hostRequestBodyMap[host] + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *nmaPollReplicationStatusOp) prepare(execContext *opEngineExecContext) error { + err := op.updateRequestBody(op.TargetHosts) + if err != nil { + return err + } + + execContext.dispatcher.setup(op.TargetHosts) + + return op.setupClusterHTTPRequest(op.TargetHosts) +} + +func (op *nmaPollReplicationStatusOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *nmaPollReplicationStatusOp) finalize(_ *opEngineExecContext) error { + return nil +} + +func (op *nmaPollReplicationStatusOp) processResult(execContext *opEngineExecContext) error { + err := pollState(op, execContext) + if err != nil { + return fmt.Errorf("error polling replication status, %w", err) + } + + return nil +} + +func (op *nmaPollReplicationStatusOp) getPollingTimeout() int { + return OneMinute +} + +func (op *nmaPollReplicationStatusOp) shouldStopPolling() (bool, error) { + var allErrs error + + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + + if result.isUnauthorizedRequest() { + return true, fmt.Errorf("[%s] wrong certificate for NMA service on host %s", + op.name, host) + } + + if !result.isPassing() { + allErrs = errors.Join(allErrs, result.err) + continue + } + + responseObj := []replicationStatusResponse{} + err := op.parseAndCheckResponse(host, result.content, &responseObj) + if err != nil { + return true, errors.Join(allErrs, err) + } + + // We should only receive 1 new transaction ID. + // More than 1 means multiple replication jobs were started at the same time. + // If this happens, we can't determine which transaction ID belongs to which job. + if len(responseObj) > 1 { + return true, errors.Join(allErrs, fmt.Errorf("[%s] expects one transaction ID but retrieved %d: %+v", + op.name, len(responseObj), responseObj)) + } + + // Stop polling if NMA responds with a single new transaction ID + if len(responseObj) == 1 { + newTransactionID := responseObj[0].TransactionID + + // The transaction ID should be new, i.e. not in the list of existing transaction IDs + if slices.Contains(*op.existingTransactionIDs, newTransactionID) { + return true, errors.Join(allErrs, fmt.Errorf("[%s] transaction ID already exists %d", + op.name, newTransactionID)) + } + + *op.newTransactionID = newTransactionID + return true, nil + } + + // If we're here, we've successfully received a status from one of the target hosts and there are no new transaction IDs + // Keep polling in this case + return false, nil + } + + return true, allErrs +} diff --git a/vclusterops/nma_replication_start.go b/vclusterops/nma_replication_start.go new file mode 100644 index 0000000..a050794 --- /dev/null +++ b/vclusterops/nma_replication_start.go @@ -0,0 +1,154 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/vertica/vcluster/vclusterops/util" +) + +type nmaReplicationStartOp struct { + opBase + nmaStartReplicationRequestData + hostRequestBodyMap map[string]string + sandbox string + vdb *VCoordinationDatabase +} + +func makeNMAReplicationStartOp(sourceHosts []string, + sourceUsePassword bool, targetUsePassword bool, + replicationRequestData *nmaStartReplicationRequestData, + vdb *VCoordinationDatabase) (nmaReplicationStartOp, error) { + op := nmaReplicationStartOp{} + op.name = "NMAReplicationStartOp" + op.description = "Start asynchronous database replication" + op.hosts = sourceHosts + op.nmaStartReplicationRequestData = *replicationRequestData + op.vdb = vdb + + if sourceUsePassword { + err := util.ValidateUsernameAndPassword(op.name, sourceUsePassword, replicationRequestData.Username) + if err != nil { + return op, err + } + op.Username = replicationRequestData.Username + op.Password = replicationRequestData.Password + } + if targetUsePassword { + err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, replicationRequestData.TargetUserName) + if err != nil { + return op, err + } + op.TargetUserName = replicationRequestData.TargetUserName + op.TargetPassword = replicationRequestData.TargetPassword + } + + return op, nil +} + +type nmaStartReplicationRequestData struct { + DBName string `json:"dbname"` + ExcludePattern string `json:"exclude_pattern,omitempty"` + IncludePattern string `json:"include_pattern,omitempty"` + TableOrSchemaName string `json:"table_or_schema_name,omitempty"` + Username string `json:"username"` + Password *string `json:"password"` + TargetDBName string `json:"target_dbname"` + TargetHost string `json:"target_hostname"` + TargetNamespace string `json:"target_namespace,omitempty"` + TargetUserName string `json:"target_username,omitempty"` + TargetPassword *string `json:"target_password,omitempty"` + TLSConfig string `json:"tls_config,omitempty"` +} + +func (op *nmaReplicationStartOp) updateRequestBody(hosts []string) error { + op.hostRequestBodyMap = make(map[string]string) + + for _, host := range hosts { + dataBytes, err := json.Marshal(op.nmaStartReplicationRequestData) + if err != nil { + return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err) + } + + op.hostRequestBodyMap[host] = string(dataBytes) + } + + return nil +} + +func (op *nmaReplicationStartOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildNMAEndpoint("replicate/start") + httpRequest.RequestData = op.hostRequestBodyMap[host] + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *nmaReplicationStartOp) prepare(execContext *opEngineExecContext) error { + sourceHost, err := getInitiatorHostForReplication(op.name, op.sandbox, op.hosts, op.vdb) + if err != nil { + return err + } + // use first up host to execute https post request + op.hosts = sourceHost + + err = op.updateRequestBody(op.hosts) + if err != nil { + return err + } + + execContext.dispatcher.setup(op.hosts) + + return op.setupClusterHTTPRequest(op.hosts) +} + +func (op *nmaReplicationStartOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *nmaReplicationStartOp) finalize(_ *opEngineExecContext) error { + return nil +} + +func (op *nmaReplicationStartOp) processResult(_ *opEngineExecContext) error { + var allErrs error + + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + + if result.isUnauthorizedRequest() { + return fmt.Errorf("[%s] wrong certificate for NMA service on host %s", + op.name, host) + } + + if !result.isPassing() { + allErrs = errors.Join(allErrs, result.err) + } + } + + return allErrs +} diff --git a/vclusterops/nma_replication_status.go b/vclusterops/nma_replication_status.go new file mode 100644 index 0000000..daf6656 --- /dev/null +++ b/vclusterops/nma_replication_status.go @@ -0,0 +1,167 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "encoding/json" + "errors" + "fmt" + + mapset "github.com/deckarep/golang-set/v2" + "github.com/vertica/vcluster/vclusterops/util" +) + +type nmaReplicationStatusOp struct { + opBase + nmaReplicationStatusRequestData + TargetHosts []string + hostRequestBodyMap map[string]string + sandbox string + vdb *VCoordinationDatabase + transactionIDs *[]int64 +} + +func makeNMAReplicationStatusOp(targetHosts []string, targetUsePassword bool, + replicationStatusData *nmaReplicationStatusRequestData, sandbox string, vdb *VCoordinationDatabase, + transactionIDs *[]int64) (nmaReplicationStatusOp, error) { + op := nmaReplicationStatusOp{} + op.name = "NMAReplicationStatusOp" + op.description = "Get asynchronous replication status" + op.TargetHosts = targetHosts + op.nmaReplicationStatusRequestData = *replicationStatusData + op.sandbox = sandbox + op.vdb = vdb + op.transactionIDs = transactionIDs + + if targetUsePassword { + err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, replicationStatusData.UserName) + if err != nil { + return op, err + } + op.UserName = replicationStatusData.UserName + op.Password = replicationStatusData.Password + } + + return op, nil +} + +type nmaReplicationStatusRequestData struct { + DBName string `json:"dbname"` + ExcludedTransactionIDs []int64 `json:"excluded_txn_ids,omitempty"` + GetTransactionIDsOnly bool `json:"get_txn_ids_only,omitempty"` + TransactionID int64 `json:"txn_id,omitempty"` + UserName string `json:"username"` + Password *string `json:"password"` +} + +func (op *nmaReplicationStatusOp) updateRequestBody(hosts []string) error { + op.hostRequestBodyMap = make(map[string]string) + + for _, host := range hosts { + dataBytes, err := json.Marshal(op.nmaReplicationStatusRequestData) + if err != nil { + return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err) + } + + op.hostRequestBodyMap[host] = string(dataBytes) + } + + return nil +} + +func (op *nmaReplicationStatusOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildNMAEndpoint("replicate/status") + httpRequest.RequestData = op.hostRequestBodyMap[host] + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *nmaReplicationStatusOp) prepare(execContext *opEngineExecContext) error { + err := op.updateRequestBody(op.TargetHosts) + if err != nil { + return err + } + + execContext.dispatcher.setup(op.TargetHosts) + + return op.setupClusterHTTPRequest(op.TargetHosts) +} + +func (op *nmaReplicationStatusOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *nmaReplicationStatusOp) finalize(_ *opEngineExecContext) error { + return nil +} + +type replicationStatusResponse struct { + EndTime string `json:"end_time"` + NodeName string `json:"node_name"` + OpName string `json:"op_name"` + SentBytes int64 `json:"sent_bytes"` + StartTime string `json:"start_time"` + Status string `json:"status"` + TotalBytes int64 `json:"total_bytes"` + TransactionID int64 `json:"txn_id"` +} + +func (op *nmaReplicationStatusOp) processResult(_ *opEngineExecContext) error { + var allErrs error + + transactionIDs := mapset.NewSet[int64]() + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + + if result.isUnauthorizedRequest() { + return fmt.Errorf("[%s] wrong certificate for NMA service on host %s", + op.name, host) + } + + if !result.isPassing() { + allErrs = errors.Join(allErrs, result.err) + continue + } + + responseObj := []replicationStatusResponse{} + err := op.parseAndCheckResponse(host, result.content, &responseObj) + if err != nil { + allErrs = errors.Join(allErrs, err) + continue + } + + // Get a list of transaction IDs. This can be used to retrieve a transaction ID when replication is started + for _, replicationStatus := range responseObj { + transactionIDs.Add(replicationStatus.TransactionID) + } + + // If we're here, we've successfully received a status from one of the target hosts. + // We don't need to check responses from other hosts as they should be the same + *op.transactionIDs = transactionIDs.ToSlice() + return nil + } + + return allErrs +} diff --git a/vclusterops/nma_save_restore_points_op.go b/vclusterops/nma_save_restore_points_op.go new file mode 100644 index 0000000..a549226 --- /dev/null +++ b/vclusterops/nma_save_restore_points_op.go @@ -0,0 +1,132 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/vertica/vcluster/vclusterops/vlog" +) + +type nmaSaveRestorePointsOp struct { + opBase + dbName string + username string + archiveName string + sandbox string +} + +type saveRestorePointsRequestData struct { + DBName string `json:"dbname"` + ArchiveName string `json:"archive_name"` + UserName string `json:"username"` +} + +// This op is used to save restore points in a database +func makeNMASaveRestorePointsOp(logger vlog.Printer, hosts []string, dbName, username string, + archiveName string, sandbox string) nmaSaveRestorePointsOp { + return nmaSaveRestorePointsOp{ + opBase: opBase{ + name: "NMASaveRestorePointsOp", + description: "Run save restore point query", + logger: logger.WithName("NMASaveRestorePointsOp"), + hosts: hosts, + }, + dbName: dbName, + username: username, + archiveName: archiveName, + sandbox: sandbox, + } +} + +// make https json data +func (op *nmaSaveRestorePointsOp) setupRequestBody() (map[string]string, error) { + hostRequestBodyMap := make(map[string]string, len(op.hosts)) + for _, host := range op.hosts { + requestData := saveRestorePointsRequestData{} + requestData.DBName = op.dbName + requestData.ArchiveName = op.archiveName + requestData.UserName = op.username + + dataBytes, err := json.Marshal(requestData) + if err != nil { + return nil, fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err) + } + hostRequestBodyMap[host] = string(dataBytes) + } + return hostRequestBodyMap, nil +} + +func (op *nmaSaveRestorePointsOp) setupClusterHTTPRequest(hostRequestBodyMap map[string]string) error { + for host, requestBody := range hostRequestBodyMap { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildNMAEndpoint("restore-points/save") + httpRequest.RequestData = requestBody + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + return nil +} + +func (op *nmaSaveRestorePointsOp) prepare(execContext *opEngineExecContext) error { + hostRequestBody, err := op.setupRequestBody() + if err != nil { + return err + } + execContext.dispatcher.setup(op.hosts) + return op.setupClusterHTTPRequest(hostRequestBody) +} + +func (op *nmaSaveRestorePointsOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *nmaSaveRestorePointsOp) finalize(_ *opEngineExecContext) error { + return nil +} + +/* +Sample response from the NMA restore-points endpoint: +RespStr: "" (status code:200) +*/ +func (op *nmaSaveRestorePointsOp) processResult(_ *opEngineExecContext) error { + var allErrs error + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + if result.isUnauthorizedRequest() { + return fmt.Errorf("[%s] wrong certificate for NMA service on host %s", + op.name, host) + } + if result.isPassing() { + var responseObj RestorePoint + err := op.parseAndCheckResponse(host, result.content, &responseObj) + if err != nil { + allErrs = errors.Join(allErrs, err) + continue + } + op.logger.PrintInfo("OP Name: [%s], response: %v", op.name, result.content) + return nil + } + allErrs = errors.Join(allErrs, result.err) + } + return allErrs +} diff --git a/vclusterops/nma_show_restore_points_op.go b/vclusterops/nma_show_restore_points_op.go index 318682c..74c8e27 100644 --- a/vclusterops/nma_show_restore_points_op.go +++ b/vclusterops/nma_show_restore_points_op.go @@ -188,7 +188,6 @@ func (op *nmaShowRestorePointsOp) processResult(execContext *opEngineExecContext allErrs = errors.Join(allErrs, err) continue } - op.logger.PrintInfo("[%s] response: %v", op.name, result.content) execContext.restorePoints = responseObj return nil diff --git a/vclusterops/remove_node.go b/vclusterops/remove_node.go index 5210986..e198b30 100644 --- a/vclusterops/remove_node.go +++ b/vclusterops/remove_node.go @@ -298,7 +298,7 @@ func getMainClusterNodes(vdb *VCoordinationDatabase, options *VRemoveNodeOptions hostsAfterRemoval := util.SliceDiff(vdb.HostList, options.HostsToRemove) for _, host := range hostsAfterRemoval { vnode := vdb.HostNodeMap[host] - if vnode.Sandbox == "" { + if vnode.Sandbox == "" && vnode.State == util.NodeUpState { *mainClusterNodes = append(*mainClusterNodes, vnode.Name) } } diff --git a/vclusterops/replication.go b/vclusterops/replication.go index f0dbced..b7d8d26 100644 --- a/vclusterops/replication.go +++ b/vclusterops/replication.go @@ -30,6 +30,13 @@ type TargetDatabaseOptions struct { TargetPassword *string } +type ReplicationOptions struct { + TableOrSchemaName string + IncludePattern string + ExcludePattern string + TargetNamespace string +} + type VReplicationDatabaseOptions struct { /* part 1: basic db info */ DatabaseOptions @@ -39,10 +46,7 @@ type VReplicationDatabaseOptions struct { SourceTLSConfig string SandboxName string Async bool - ObjectName string - IncludePattern string - ExcludePattern string - TargetNamespace string + ReplicationOptions } func VReplicationDatabaseFactory() VReplicationDatabaseOptions { @@ -99,8 +103,8 @@ func (options *VReplicationDatabaseOptions) validateExtraOptions() error { } func (options *VReplicationDatabaseOptions) validateFineGrainedReplicationOptions() error { - if options.ObjectName != "" { - err := util.ValidateQualifiedObjectNamePattern(options.ObjectName, false) + if options.TableOrSchemaName != "" { + err := util.ValidateQualifiedObjectNamePattern(options.TableOrSchemaName, false) if err != nil { return err } @@ -191,7 +195,7 @@ func (options *VReplicationDatabaseOptions) validateAnalyzeOptions(logger vlog.P } // VReplicateDatabase can copy all table data and metadata from this cluster to another -func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOptions) error { +func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOptions) (int64, error) { /* * - Produce Instructions * - Create a VClusterOpEngine @@ -201,20 +205,22 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti // validate and analyze options err := options.validateAnalyzeOptions(vcc.Log) if err != nil { - return err + return 0, err } // retrieve information from the database to accurately determine the state of each node in both the main cluster and a given sandbox vdb := makeVCoordinationDatabase() err = vcc.getVDBFromRunningDBIncludeSandbox(&vdb, &options.DatabaseOptions, options.SandboxName) if err != nil { - return err + return 0, err } + asyncReplicationTransactionID := new(int64) + // produce database replication instructions - instructions, err := vcc.produceDBReplicationInstructions(options, &vdb) + instructions, err := vcc.produceDBReplicationInstructions(options, &vdb, asyncReplicationTransactionID) if err != nil { - return fmt.Errorf("fail to produce instructions, %w", err) + return 0, fmt.Errorf("fail to produce instructions, %w", err) } // create a VClusterOpEngine, and add certs to the engine @@ -229,9 +235,9 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti "2. set EnableConnectCredentialForwarding to True in source database using vsql " + "3. configure a Trust Authentication in target database using vsql") } - return fmt.Errorf("fail to replicate database: %w", runError) + return 0, fmt.Errorf("fail to replicate database: %w", runError) } - return nil + return *asyncReplicationTransactionID, nil } // The generated instructions will later perform the following operations necessary @@ -240,7 +246,7 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti // - Check Vertica versions // - Replicate database func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicationDatabaseOptions, - vdb *VCoordinationDatabase) ([]clusterOp, error) { + vdb *VCoordinationDatabase, asyncReplicationTransactionID *int64) ([]clusterOp, error) { var instructions []clusterOp // need username for https operations in source database @@ -263,30 +269,78 @@ func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicati vcc.Log.Info("Current target username", "username", options.TargetUserName) } - httpsDisallowMultipleNamespacesOp, err := makeHTTPSDisallowMultipleNamespacesOp(options.Hosts, - options.usePassword, options.UserName, options.Password, options.SandboxName, vdb) - if err != nil { - return instructions, err - } + nmaHealthOp := makeNMAHealthOp(append(options.Hosts, options.TargetHosts...)) + + initiatorTargetHost := getInitiator(options.TargetHosts) + if options.Async { + transactionIDs := &[]int64{} + + nmaReplicationStatusData := nmaReplicationStatusRequestData{} + nmaReplicationStatusData.DBName = options.TargetDB + nmaReplicationStatusData.ExcludedTransactionIDs = []int64{} + nmaReplicationStatusData.GetTransactionIDsOnly = true + nmaReplicationStatusData.TransactionID = 0 + nmaReplicationStatusData.UserName = options.TargetUserName + nmaReplicationStatusData.Password = options.TargetPassword + + nmaReplicationStatusOp, err := makeNMAReplicationStatusOp(options.TargetHosts, targetUsePassword, + &nmaReplicationStatusData, options.SandboxName, vdb, transactionIDs) + if err != nil { + return instructions, err + } - nmaHealthOp := makeNMAHealthOp(options.Hosts) + nmaReplicationData := nmaStartReplicationRequestData{} + nmaReplicationData.DBName = options.DBName + nmaReplicationData.ExcludePattern = options.ExcludePattern + nmaReplicationData.IncludePattern = options.IncludePattern + nmaReplicationData.TableOrSchemaName = options.TableOrSchemaName + nmaReplicationData.Username = options.UserName + nmaReplicationData.Password = options.Password + nmaReplicationData.TargetDBName = options.TargetDB + nmaReplicationData.TargetHost = initiatorTargetHost + nmaReplicationData.TargetNamespace = options.TargetNamespace + nmaReplicationData.TargetUserName = options.TargetUserName + nmaReplicationData.TargetPassword = options.TargetPassword + nmaReplicationData.TLSConfig = options.SourceTLSConfig + + nmaStartReplicationOp, err := makeNMAReplicationStartOp(options.Hosts, options.usePassword, targetUsePassword, + &nmaReplicationData, vdb) + if err != nil { + return instructions, err + } - // require to have the same vertica version - nmaVerticaVersionOp := makeNMACheckVerticaVersionOp(options.Hosts, true, true /*IsEon*/) + nmaPollReplicationStatusOp, err := makeNMAPollReplicationStatusOp(&options.TargetDatabaseOptions, targetUsePassword, + options.SandboxName, vdb, transactionIDs, asyncReplicationTransactionID) + if err != nil { + return instructions, err + } - initiatorTargetHost := getInitiator(options.TargetHosts) - httpsStartReplicationOp, err := makeHTTPSStartReplicationOp(options.DBName, options.Hosts, options.usePassword, - options.UserName, options.Password, targetUsePassword, &options.TargetDatabaseOptions, initiatorTargetHost, - options.SourceTLSConfig, options.SandboxName, vdb) - if err != nil { - return instructions, err + instructions = append(instructions, + &nmaHealthOp, + &nmaReplicationStatusOp, + &nmaStartReplicationOp, + &nmaPollReplicationStatusOp, + ) + } else { + httpsDisallowMultipleNamespacesOp, err := makeHTTPSDisallowMultipleNamespacesOp(options.Hosts, + options.usePassword, options.UserName, options.Password, options.SandboxName, vdb) + if err != nil { + return instructions, err + } + + httpsStartReplicationOp, err := makeHTTPSStartReplicationOp(options.DBName, options.Hosts, options.usePassword, + options.UserName, options.Password, targetUsePassword, &options.TargetDatabaseOptions, initiatorTargetHost, + options.SourceTLSConfig, options.SandboxName, vdb) + if err != nil { + return instructions, err + } + + instructions = append(instructions, + &nmaHealthOp, + &httpsDisallowMultipleNamespacesOp, + &httpsStartReplicationOp, + ) } - instructions = append(instructions, - &nmaHealthOp, - &nmaVerticaVersionOp, - &httpsDisallowMultipleNamespacesOp, - &httpsStartReplicationOp, - ) return instructions, nil } diff --git a/vclusterops/revive_db_test.go b/vclusterops/revive_db_test.go index 9a7068a..67dbd11 100644 --- a/vclusterops/revive_db_test.go +++ b/vclusterops/revive_db_test.go @@ -45,7 +45,8 @@ func TestFindSpecifiedRestorePoint(t *testing.T) { options.RestorePoint.ID = expectedID _, err = options.findSpecifiedRestorePoint(allRestorePoints) expectedErr := fmt.Errorf("found 2 restore points instead of 1: " + - "[{Archive:archive1 ID:id3 Index:2 Timestamp: VerticaVersion:} {Archive:archive1 ID:id3 Index:3 Timestamp: VerticaVersion:}]") + "[{Archive:archive1 ID:id3 Index:2 Timestamp: VerticaVersion:} " + + "{Archive:archive1 ID:id3 Index:3 Timestamp: VerticaVersion:}]") assert.EqualError(t, err, expectedErr.Error()) // Test case: No matching restore points found diff --git a/vclusterops/save_restore_points.go b/vclusterops/save_restore_points.go new file mode 100644 index 0000000..4cd7f6e --- /dev/null +++ b/vclusterops/save_restore_points.go @@ -0,0 +1,172 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "fmt" + + "github.com/vertica/vcluster/vclusterops/util" + "github.com/vertica/vcluster/vclusterops/vlog" +) + +type VSaveRestorePointOptions struct { + DatabaseOptions + ArchiveName string + + // the name of the sandbox to target, if left empty the main cluster is assumed + Sandbox string +} + +func VSaveRestorePointFactory() VSaveRestorePointOptions { + options := VSaveRestorePointOptions{} + // set default values to the params + options.setDefaultValues() + return options +} + +func (options *VSaveRestorePointOptions) validateEonOptions(_ vlog.Printer) error { + if !options.IsEon { + return fmt.Errorf("save restore point is only supported in Eon mode") + } + return nil +} + +// Save restore impl +func (options *VSaveRestorePointOptions) validateRequiredOptions(logger vlog.Printer) error { + err := options.validateEonOptions(logger) + if err != nil { + return err + } + err = options.validateBaseOptions(SaveRestorePointsCmd, logger) + if err != nil { + return err + } + if options.ArchiveName == "" { + return fmt.Errorf("must specify an archive name") + } + err = util.ValidateArchiveName(options.ArchiveName) + if err != nil { + return err + } + return nil +} + +func (options *VSaveRestorePointOptions) validateExtraOptions() error { + if options.Sandbox != "" { + return util.ValidateSandboxName(options.Sandbox) + } + return nil +} + +func (options *VSaveRestorePointOptions) validateParseOptions(logger vlog.Printer) error { + // batch 1: validate required parameters + err := options.validateRequiredOptions(logger) + if err != nil { + return err + } + + // batch 2: validate all other params + err = options.validateExtraOptions() + if err != nil { + return err + } + return nil +} + +// analyzeOptions will modify some options based on what is chosen +func (options *VSaveRestorePointOptions) analyzeOptions() (err error) { + // we analyze host names when it is set in user input, otherwise we use hosts in yaml config + if len(options.RawHosts) > 0 { + // resolve RawHosts to be IP addresses + hostAddresses, err := util.ResolveRawHostsToAddresses(options.RawHosts, options.IPv6) + if err != nil { + return err + } + options.Hosts = hostAddresses + } + return nil +} + +func (options *VSaveRestorePointOptions) validateAnalyzeOptions(logger vlog.Printer) error { + if err := options.validateParseOptions(logger); err != nil { + return err + } + if err := options.validateUserName(logger); err != nil { + return err + } + return options.analyzeOptions() +} + +// VSaveRestorePoint can save restore point to a given archive +func (vcc VClusterCommands) VSaveRestorePoint(options *VSaveRestorePointOptions) (err error) { + /* + * - Produce Instructions + * - Create a VClusterOpEngine + * - Give the instructions to the VClusterOpEngine to run + */ + + // validate and analyze options + err = options.validateAnalyzeOptions(vcc.Log) + if err != nil { + return err + } + + // produce save restore points instructions + instructions, err := vcc.produceSaveRestorePointsInstructions(options) + if err != nil { + return fmt.Errorf("fail to produce instructions, %w", err) + } + + // create a VClusterOpEngine, and add certs to the engine + clusterOpEngine := makeClusterOpEngine(instructions, options) + + // give the instructions to the VClusterOpEngine to run + runError := clusterOpEngine.run(vcc.Log) + if runError != nil { + return fmt.Errorf("fail to save restore point: %w", runError) + } + return nil +} + +// The generated instructions will later perform the following operations necessary +// for a successful save_restore_point: +// - Retrieve VDB from HTTP endpoints +// - Check NMA connectivity +// - Run save restore points on the target node +func (vcc VClusterCommands) produceSaveRestorePointsInstructions(options *VSaveRestorePointOptions) ([]clusterOp, error) { + var instructions []clusterOp + vdb := makeVCoordinationDatabase() + + err := vcc.getVDBFromRunningDBIncludeSandbox(&vdb, &options.DatabaseOptions, util.MainClusterSandbox) + if err != nil { + return instructions, err + } + + // get up hosts + hosts := options.Hosts + nmaHealthOp := makeNMAHealthOp(options.Hosts) + // Trim host list + hosts = vdb.filterUpHostlist(hosts, options.Sandbox) + bootstrapHost := []string{getInitiator(hosts)} + + nmaSaveRestorePointOp := makeNMASaveRestorePointsOp(vcc.Log, bootstrapHost, + options.DBName, options.UserName, options.ArchiveName, options.Sandbox) + + instructions = append(instructions, + &nmaHealthOp, + &nmaSaveRestorePointOp) + return instructions, nil +} diff --git a/vclusterops/restore_points.go b/vclusterops/show_restore_points.go similarity index 100% rename from vclusterops/restore_points.go rename to vclusterops/show_restore_points.go diff --git a/vclusterops/start_db.go b/vclusterops/start_db.go index 2bcb911..9982ada 100644 --- a/vclusterops/start_db.go +++ b/vclusterops/start_db.go @@ -339,7 +339,8 @@ func (vcc VClusterCommands) produceStartDBInstructions(options *VStartDatabaseOp &instructions, nil, /*source hosts for transferring configuration files*/ options.Hosts, - nil /*db configurations retrieved from a running db*/) + nil, /*db configurations retrieved from a running db*/ + nil /*sandbox name*/) nmaStartNewNodesOp := makeNMAStartNodeOp(options.Hosts, options.StartUpConf) httpsPollNodeStateOp, err := makeHTTPSPollNodeStateOp(options.Hosts, diff --git a/vclusterops/start_node.go b/vclusterops/start_node.go index d30a617..8d9a178 100644 --- a/vclusterops/start_node.go +++ b/vclusterops/start_node.go @@ -326,7 +326,8 @@ func (options *VStartNodesOptions) checkQuorum(vdb *VCoordinationDatabase, resta if upHostCount < len(restartNodeInfo.ReIPList) { restartNodeInfo.SerialReIP = true } - if len(sandboxPrimaryUpNodes) <= lenOfPrimaryReIPLIst { + + if len(sandboxPrimaryUpNodes) <= lenOfPrimaryReIPLIst && lenOfPrimaryReIPLIst > 0 { return &ReIPNoClusterQuorumError{ Detail: fmt.Sprintf("Quorum check failed: %d up node(s) is/are not enough to re-ip %d primary node(s)", len(sandboxPrimaryUpNodes), lenOfPrimaryReIPLIst), @@ -372,6 +373,7 @@ func (vcc VClusterCommands) produceStartNodesInstructions(startNodeInfo *VStartN &nmaHealthOp, &httpsGetUpNodesOp, ) + var sandboxName *string // If we identify any nodes that need re-IP, HostsToStart will contain the nodes that need re-IP. // Otherwise, HostsToStart will consist of all hosts with IPs recorded in the catalog, which are provided by user input. if len(startNodeInfo.ReIPList) != 0 { @@ -414,6 +416,8 @@ func (vcc VClusterCommands) produceStartNodesInstructions(startNodeInfo *VStartN &httpsReloadSpreadOp, &httpsGetNodesInfoOp, ) + } else { + sandboxName = &startNodeInfo.Sandbox } // require to have the same vertica version nmaVerticaVersionOp := makeNMAVerticaVersionOpBeforeStartNode(vdb, startNodeInfo.unreachableHosts, @@ -423,11 +427,8 @@ func (vcc VClusterCommands) produceStartNodesInstructions(startNodeInfo *VStartN // we use information from v1/nodes endpoint to get all node information to update the sourceConfHost value // after we find any UP primary nodes as source host for syncing spread.conf and vertica.conf // we will remove the nil parameters in VER-88401 by adding them in execContext - produceTransferConfigOps( - &instructions, - nil, /*source hosts for transferring configuration files*/ - startNodeInfo.HostsToStart, - vdb) + produceTransferConfigOps(&instructions, nil /*source hosts for transferring configuration files*/, startNodeInfo.HostsToStart, + vdb, sandboxName) httpsRestartUpCommandOp, err := makeHTTPSStartUpCommandWithSandboxOp(options.usePassword, options.UserName, options.Password, vdb, startNodeInfo.Sandbox) if err != nil { @@ -439,7 +440,6 @@ func (vcc VClusterCommands) produceStartNodesInstructions(startNodeInfo *VStartN if err != nil { return instructions, err } - httpsPollNodeStateOp.cmdType = StartNodeCmd instructions = append(instructions, &httpsRestartUpCommandOp, diff --git a/vclusterops/util/util.go b/vclusterops/util/util.go index 352c37d..bedbb7c 100644 --- a/vclusterops/util/util.go +++ b/vclusterops/util/util.go @@ -65,6 +65,7 @@ const ( ShutDownEndpoint = "/shutdown" NodesEndpoint = "nodes/" DropEndpoint = "/drop" + ArchiveEndpoint = "archives" ) const ( @@ -561,6 +562,10 @@ func ValidateSandboxName(dbName string) error { return ValidateName(dbName, "sandbox", true) } +func ValidateArchiveName(archive string) error { + return ValidateName(archive, "archive", true) +} + // suppress help message for hidden options func SetParserUsage(parser *flag.FlagSet, op string) { fmt.Printf("Usage of %s:\n", op)