diff --git a/jenkins-integration-build.sh b/jenkins-integration-build.sh index d26807132d..e134038190 100755 --- a/jenkins-integration-build.sh +++ b/jenkins-integration-build.sh @@ -123,6 +123,7 @@ if [ "${MULTI_NODE:-}" == "true" ]; then else # single node ./integration-test/start_server.sh "${COUCHBASE_SERVER_VERSION}" + export SG_TEST_COUCHBASE_SERVER_DOCKER_NAME="couchbase" fi # Set up test environment variables for CBS runs diff --git a/xdcr/cbs_xdcr.go b/xdcr/cbs_xdcr.go index e09c85cd9a..2d1668bfc0 100644 --- a/xdcr/cbs_xdcr.go +++ b/xdcr/cbs_xdcr.go @@ -14,6 +14,9 @@ import ( "fmt" "net/http" "net/url" + "os/exec" + "runtime" + "slices" "strings" "github.com/couchbase/sync_gateway/base" @@ -94,6 +97,13 @@ func createCluster(ctx context.Context, bucket *base.GocbV2Bucket) error { // newCouchbaseServerManager creates an instance of XDCR backed by Couchbase Server. This is not started until Start is called. func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucket, toBucket *base.GocbV2Bucket, opts XDCROptions) (*couchbaseServerManager, error) { + nodes, err := kvNodes(ctx, fromBucket) + if err != nil { + return nil, err + } + if len(nodes) != 1 { + return nil, fmt.Errorf("To run xdcr tests, exactly one kv node is needed to grep the goxdcr.log file. To extend this to multiple nodes (found %s), all nodes would need to be grepped", nodes) + } // there needs to be a global cluster present, this is a hostname + username + password. There can be only one per hostname, so create it lazily. isPresent, err := isClusterPresent(ctx, fromBucket) if err != nil { @@ -114,6 +124,37 @@ func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucke }, nil } +// kvNodes returns the hostnames of KV nodes in the cluster. +func kvNodes(ctx context.Context, bucket *base.GocbV2Bucket) ([]string, error) { + url := "/pools/default/" + method := http.MethodGet + output, statusCode, err := bucket.MgmtRequest(ctx, method, url, "application/x-www-form-urlencoded", nil) + if err != nil { + return nil, err + } + if statusCode != http.StatusOK { + return nil, fmt.Errorf("Could not get the number bucket metadata: %s. %s %s -> (%d) %s", xdcrClusterName, method, url, statusCode, output) + } + type nodesOutput struct { + Nodes []struct { + Hostname string `json:"hostname"` + Services []string `json:"services"` + } `json:"nodes"` + } + nodes := nodesOutput{} + err = base.JSONUnmarshal(output, &nodes) + if err != nil { + return nil, err + } + var hostnames []string + for _, node := range nodes.Nodes { + if slices.Contains(node.Services, "kv") { + hostnames = append(hostnames, node.Hostname) + } + } + return hostnames, nil +} + // Start starts the XDCR replication. func (x *couchbaseServerManager) Start(ctx context.Context) error { if x.replicationID != "" { @@ -170,8 +211,12 @@ func (x *couchbaseServerManager) Stop(ctx context.Context) error { if statusCode != http.StatusOK { return fmt.Errorf("Could not cancel XDCR replication: %s. %s %s -> (%d) %s", x.replicationID, method, url, statusCode, output) } + err = x.waitForStoppedInLogFile(ctx) + if err != nil { + return err + } x.replicationID = "" - return nil + return err } // Stats returns the stats of the XDCR replication. @@ -230,4 +275,37 @@ outer: return 0, errNoXDCRMetrics } +// waitForStoppedInLogFile waits for the replication to stop by checking the log file. +func (x *couchbaseServerManager) waitForStoppedInLogFile(ctx context.Context) error { + // magic string to indicate that the replication has stopped + grepStr := fmt.Sprintf("%s status is finished shutting down", x.replicationID) + usingDocker, dockerName := base.TestUseCouchbaseServerDockerName() + cmdLine := "" + logFile := "/opt/couchbase/var/lib/couchbase/logs/goxdcr.log" + if usingDocker { + cmdLine = fmt.Sprintf(`docker exec -t %s cat "%s" | grep "%s"`, dockerName, logFile, grepStr) + } else { + if runtime.GOOS == "darwin" { + logFile = "$HOME/Library/Application Support/Couchbase/var/lib/couchbase/logs/goxdcr.log" + } + cmdLine = fmt.Sprintf(`cat "%s" | grep "%s"`, logFile, grepStr) + } + err, _ := base.RetryLoop(ctx, "ReadLogFileUntilStopped", func() (shouldRetry bool, err error, value any) { + cmd := exec.Command("bash", "-c", cmdLine) + output, err := cmd.CombinedOutput() + if err != nil { + return true, fmt.Errorf("Failed to run %s (%w) Output: %s", cmd, err, output), nil + } + return false, nil, nil + }, base.CreateMaxDoublingSleeperFunc(10, 10, 1000)) + if err != nil { + suffix := "" + if !usingDocker { + suffix = fmt.Sprintf(". If you are running in docker, you may need to set the environment variable %s=", base.TestEnvCouchbaseServerDockerName) + } + return fmt.Errorf("Could not find %s in %s. %w%s", grepStr, logFile, err, suffix) + } + return nil +} + var _ Manager = &couchbaseServerManager{}