Skip to content

Commit

Permalink
Sync from server repo (028af8ae617)
Browse files Browse the repository at this point in the history
  • Loading branch information
releng committed Sep 18, 2024
1 parent 81becaa commit 6fb73ec
Show file tree
Hide file tree
Showing 7 changed files with 611 additions and 39 deletions.
15 changes: 11 additions & 4 deletions commands/cmd_start_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) {
"[Required] The absolute path to the connection file created with the create_connection command, "+
"containing the database name, hosts, and password (if any) for the target database. "+
"Alternatively, you can provide this information manually with --target-db-name, "+
"--target-hosts, and --target-password-file")
"--target-hosts, and --target-password-file",
)
cmd.Flags().BoolVar(
&c.startRepOptions.Async,
asyncFlag,
Expand All @@ -145,7 +146,7 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) {
"Default value is false.",
)
cmd.Flags().StringVar(
&c.startRepOptions.ObjectName,
&c.startRepOptions.TableOrSchemaName,
tableOrSchemaNameFlag,
"",
"(only async replication)The object name we want to copy from the source side. The available"+
Expand Down Expand Up @@ -274,12 +275,18 @@ func (c *CmdStartReplication) Run(vcc vclusterops.ClusterCommands) error {

options := c.startRepOptions

err := vcc.VReplicateDatabase(options)
transactionID, err := vcc.VReplicateDatabase(options)
if err != nil {
vcc.LogError(err, "failed to replicate to database", "targetDB", options.TargetDB)
return err
}
vcc.DisplayInfo("Successfully replicated to database %s", options.TargetDB)

if options.Async {
vcc.DisplayInfo("Successfully started replication to database %s. Transaction ID: %d", options.TargetDB, transactionID)
} else {
vcc.DisplayInfo("Successfully replicated to database %s", options.TargetDB)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion vclusterops/cluster_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ type ClusterCommands interface {
VRemoveNode(options *VRemoveNodeOptions) (VCoordinationDatabase, error)
VRemoveSubcluster(removeScOpt *VRemoveScOptions) (VCoordinationDatabase, error)
VRenameSubcluster(options *VRenameSubclusterOptions) error
VReplicateDatabase(options *VReplicationDatabaseOptions) error
VReplicateDatabase(options *VReplicationDatabaseOptions) (int64, error)
VReviveDatabase(options *VReviveDatabaseOptions) (dbInfo string, vdbPtr *VCoordinationDatabase, err error)
VSandbox(options *VSandboxOptions) error
VScrutinize(options *VScrutinizeOptions) error
Expand Down
8 changes: 8 additions & 0 deletions vclusterops/https_check_db_running_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/vertica/vcluster/rfc7807"
"github.com/vertica/vcluster/vclusterops/util"
"golang.org/x/exp/slices"
)

type opType int
Expand Down Expand Up @@ -62,6 +63,8 @@ func (op opType) String() string {
return "unknown operation"
}

var maskEOFOp = []opType{DropDB}

// DBIsRunningError is an error to indicate we found the database still running.
// This is emitted from this op. Callers can do type checking to perform an
// action based on the error.
Expand Down Expand Up @@ -291,6 +294,11 @@ func (op *httpsCheckRunningDBOp) processResult(_ *opEngineExecContext) error {
for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

// EOF is expected in node shutdown: we expect the node's HTTPS service to go down quickly
// and the Server HTTPS service does not guarantee that the response being sent back to the client before it closes
if result.isEOF() && slices.Contains(maskEOFOp, op.opType) {
continue
}
if !result.isPassing() {
allErrs = errors.Join(allErrs, result.err)
}
Expand Down
182 changes: 182 additions & 0 deletions vclusterops/nma_poll_replication_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
(c) Copyright [2023-2024] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"encoding/json"
"errors"
"fmt"
"slices"

"github.com/vertica/vcluster/vclusterops/util"
)

type nmaPollReplicationStatusOp struct {
opBase
TargetDatabaseOptions
hostRequestBodyMap map[string]string
sandbox string
vdb *VCoordinationDatabase
existingTransactionIDs *[]int64
newTransactionID *int64
}

func makeNMAPollReplicationStatusOp(targetDBOpt *TargetDatabaseOptions, targetUsePassword bool,
sandbox string, vdb *VCoordinationDatabase, existingTransactionIDs *[]int64, newTransactionID *int64) (nmaPollReplicationStatusOp, error) {
op := nmaPollReplicationStatusOp{}
op.name = "NMAPollReplicationStatusOp"
op.description = "Retrieve asynchronous replication transaction ID"
op.TargetDB = targetDBOpt.TargetDB
op.TargetHosts = targetDBOpt.TargetHosts
op.sandbox = sandbox
op.vdb = vdb
op.existingTransactionIDs = existingTransactionIDs
op.newTransactionID = newTransactionID

if targetUsePassword {
err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, targetDBOpt.TargetUserName)
if err != nil {
return op, err
}
op.TargetUserName = targetDBOpt.TargetUserName
op.TargetPassword = targetDBOpt.TargetPassword
}

return op, nil
}

func (op *nmaPollReplicationStatusOp) updateRequestBody(hosts []string) error {
op.hostRequestBodyMap = make(map[string]string)

for _, host := range hosts {
requestData := nmaReplicationStatusRequestData{}
requestData.DBName = op.TargetDB
requestData.ExcludedTransactionIDs = *op.existingTransactionIDs
requestData.GetTransactionIDsOnly = true
requestData.TransactionID = 0
requestData.UserName = op.TargetUserName
requestData.Password = op.TargetPassword

dataBytes, err := json.Marshal(requestData)
if err != nil {
return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err)
}

op.hostRequestBodyMap[host] = string(dataBytes)
}

return nil
}

func (op *nmaPollReplicationStatusOp) setupClusterHTTPRequest(hosts []string) error {
for _, host := range hosts {
httpRequest := hostHTTPRequest{}
httpRequest.Method = PostMethod
httpRequest.buildNMAEndpoint("replicate/status")
httpRequest.RequestData = op.hostRequestBodyMap[host]
op.clusterHTTPRequest.RequestCollection[host] = httpRequest
}

return nil
}

func (op *nmaPollReplicationStatusOp) prepare(execContext *opEngineExecContext) error {
err := op.updateRequestBody(op.TargetHosts)
if err != nil {
return err
}

execContext.dispatcher.setup(op.TargetHosts)

return op.setupClusterHTTPRequest(op.TargetHosts)
}

func (op *nmaPollReplicationStatusOp) execute(execContext *opEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

func (op *nmaPollReplicationStatusOp) finalize(_ *opEngineExecContext) error {
return nil
}

func (op *nmaPollReplicationStatusOp) processResult(execContext *opEngineExecContext) error {
err := pollState(op, execContext)
if err != nil {
return fmt.Errorf("error polling replication status, %w", err)
}

return nil
}

func (op *nmaPollReplicationStatusOp) getPollingTimeout() int {
return OneMinute
}

func (op *nmaPollReplicationStatusOp) shouldStopPolling() (bool, error) {
var allErrs error

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if result.isUnauthorizedRequest() {
return true, fmt.Errorf("[%s] wrong certificate for NMA service on host %s",
op.name, host)
}

if !result.isPassing() {
allErrs = errors.Join(allErrs, result.err)
continue
}

responseObj := []replicationStatusResponse{}
err := op.parseAndCheckResponse(host, result.content, &responseObj)
if err != nil {
return true, errors.Join(allErrs, err)
}

// We should only receive 1 new transaction ID.
// More than 1 means multiple replication jobs were started at the same time.
// If this happens, we can't determine which transaction ID belongs to which job.
if len(responseObj) > 1 {
return true, errors.Join(allErrs, fmt.Errorf("[%s] expects one transaction ID but retrieved %d: %+v",
op.name, len(responseObj), responseObj))
}

// Stop polling if NMA responds with a single new transaction ID
if len(responseObj) == 1 {
newTransactionID := responseObj[0].TransactionID

// The transaction ID should be new, i.e. not in the list of existing transaction IDs
if slices.Contains(*op.existingTransactionIDs, newTransactionID) {
return true, errors.Join(allErrs, fmt.Errorf("[%s] transaction ID already exists %d",
op.name, newTransactionID))
}

*op.newTransactionID = newTransactionID
return true, nil
}

// If we're here, we've successfully received a status from one of the target hosts and there are no new transaction IDs
// Keep polling in this case
return false, nil
}

return true, allErrs
}
Loading

0 comments on commit 6fb73ec

Please sign in to comment.