diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 6383438f1fa..e1226b0a7b6 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -345,55 +345,53 @@ func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet * } func waitForWorkflowToBeCreated(t *testing.T, vc *VitessCluster, ksWorkflow string) { + keyspace, workflow, ok := strings.Cut(ksWorkflow, ".") + require.True(t, ok, "invalid '.' value: %s", ksWorkflow) require.NoError(t, waitForCondition("workflow to be created", func() bool { - _, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") - return err == nil + output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", keyspace, "show", "--workflow", workflow, "--compact", "--include-logs=false") + return err == nil && output != emptyWorkflowShowResponse }, defaultTimeout)) } // waitForWorkflowState waits for all of the given workflow's // streams to reach the provided state. You can pass optional // key value pairs of the form "key==value" to also wait for -// additional stream sub-state such as "Message==for vdiff". +// additional stream sub-state such as "message==for vdiff". // Invalid checks are ignored. func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wantState string, fieldEqualityChecks ...string) { + keyspace, workflow, ok := strings.Cut(ksWorkflow, ".") + require.True(t, ok, "invalid '.' value: %s", ksWorkflow) done := false timer := time.NewTimer(workflowStateTimeout) log.Infof("Waiting for workflow %q to fully reach %q state", ksWorkflow, wantState) for { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", keyspace, "show", "--workflow", workflow, "--compact", "--include-logs=false") require.NoError(t, err, output) done = true state := "" - result := gjson.Get(output, "ShardStatuses") - result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each participating tablet - tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream - if streamId.String() == "PrimaryReplicationStatuses" { - streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream - // we need to wait for all streams to have the desired state - state = attributeValue.Get("State").String() - if state == wantState { - for i := 0; i < len(fieldEqualityChecks); i++ { - if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 { - key := kvparts[0] - val := kvparts[1] - res := attributeValue.Get(key).String() - if !strings.EqualFold(res, val) { - done = false - } - } - } - if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" { - done = false - } - } else { + streams := gjson.Get(output, "workflows.0.shard_streams.*.streams") + streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream + info := stream.Map() + // We need to wait for all streams to have the desired state. + state = info["state"].String() + if state == wantState { + for i := 0; i < len(fieldEqualityChecks); i++ { + if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 { + key := kvparts[0] + val := kvparts[1] + res := info[key].String() + if !strings.EqualFold(res, val) { done = false } - return true - }) + } + } + if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && + (info["position"].Exists() && info["position"].String() == "") { + done = false } - return true - }) + } else { + done = false + } return true }) if done { @@ -548,7 +546,7 @@ func checkIfTableExists(t *testing.T, vc *VitessCluster, tabletAlias string, tab var err error found := false - if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetSchema", "--", "--tables", table, tabletAlias); err != nil { + if output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", "--tables", table, tabletAlias); err != nil { return false, err } jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) { @@ -571,19 +569,10 @@ func validateTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, ta } func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) { - var output string - var err error - found := false - if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard); err != nil { - require.Fail(t, "GetShard error", "%v %v", err, output) - return false, err - } - jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) { - if string(value) == table { - found = true - } - }, "tablet_controls", "[0]", "denied_tables") - return found, nil + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetShard", ksShard) + require.NoError(t, err, "GetShard error", "%v %v", err, output) + deniedTable := gjson.Get(output, fmt.Sprintf("shard.tablet_controls.0.denied_tables.#(==\"%s\"", table)) + return deniedTable.Exists(), nil } // expectNumberOfStreams waits for the given number of streams to be present and @@ -609,7 +598,7 @@ func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database str func printShardPositions(vc *VitessCluster, ksShards []string) { for _, ksShard := range ksShards { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("ShardReplicationPositions", ksShard) + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ShardReplicationPositions", ksShard) if err != nil { fmt.Printf("Error in ShardReplicationPositions: %v, output %v", err, output) } else { @@ -621,7 +610,7 @@ func printShardPositions(vc *VitessCluster, ksShards []string) { func printRoutingRules(t *testing.T, vc *VitessCluster, msg string) error { var output string var err error - if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules"); err != nil { + if output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules", "--compact"); err != nil { return err } fmt.Printf("Routing Rules::%s:\n%s\n", msg, output) @@ -648,29 +637,22 @@ func getDebugVar(t *testing.T, port int, varPath []string) (string, error) { func confirmWorkflowHasCopiedNoData(t *testing.T, targetKS, workflow string) { timer := time.NewTimer(defaultTimeout) defer timer.Stop() - ksWorkflow := fmt.Sprintf("%s.%s", targetKS, workflow) for { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") - require.NoError(t, err) - result := gjson.Get(output, "ShardStatuses") - result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each source tablet - tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream - if streamId.String() == "PrimaryReplicationStatuses" { - streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream - state := attributeValue.Get("State").String() - pos := attributeValue.Get("Pos").String() - // If we've actually copied anything then we'll have a position in the stream - if (state == binlogdatapb.VReplicationWorkflowState_Running.String() || state == binlogdatapb.VReplicationWorkflowState_Copying.String()) && pos != "" { - require.FailNowf(t, "Unexpected data copied in workflow", - "The MoveTables workflow %q copied data in less than %s when it should have been waiting. Show output: %s", - ksWorkflow, defaultTimeout, output) - } - return true // end attribute loop - }) - } - return true // end stream loop - }) - return true // end tablet loop + output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow, "--compact", "--include-logs=false") + require.NoError(t, err, output) + streams := gjson.Get(output, "workflows.0.shard_streams.*.streams") + streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream + info := stream.Map() + state := info["state"] + pos := info["position"] + // If we've actually copied anything then we'll have a position in the stream + if (state.Exists() && (state.String() == binlogdatapb.VReplicationWorkflowState_Running.String() || state.String() == binlogdatapb.VReplicationWorkflowState_Copying.String())) && + (pos.Exists() && pos.String() != "") { + require.FailNowf(t, "Unexpected data copied in workflow", + "The MoveTables workflow %q copied data in less than %s when it should have been waiting. Show output: %s", + ksWorkflow, defaultTimeout, output) + } + return true }) select { case <-timer.C: diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 0a5664b8486..0b6e8e16fff 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -286,7 +286,7 @@ func TestVreplicationCopyThrottling(t *testing.T) { moveTablesActionWithTabletTypes(t, "Create", defaultCell.Name, workflow, sourceKs, targetKs, table, "primary", true) // Wait for the copy phase to start waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKs, workflow), binlogdatapb.VReplicationWorkflowState_Copying.String()) - // The initial copy phase should be blocking on the history list + // The initial copy phase should be blocking on the history list. confirmWorkflowHasCopiedNoData(t, targetKs, workflow) releaseInnoDBRowHistory(t, trxConn) trxConn.Close() @@ -623,7 +623,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { testVStreamFrom(t, vtgate, keyspace, 2) }) shardCustomer(t, true, []*Cell{cell1, cell2}, "alias", false) - isTableInDenyList(t, vc, "product:0", "customer") + isTableInDenyList(t, vc, "product/0", "customer") // we tag along this test so as not to create the overhead of creating another cluster testVStreamCellFlag(t) } @@ -878,13 +878,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl switchWrites(t, workflowType, ksWorkflow, false) var exists bool - exists, err = isTableInDenyList(t, vc, "product:0", "customer") + exists, err = isTableInDenyList(t, vc, "product/0", "customer") require.NoError(t, err, "Error getting denylist for customer:0") require.True(t, exists) moveTablesAction(t, "Complete", cellNames, workflow, sourceKs, targetKs, tables) - exists, err = isTableInDenyList(t, vc, "product:0", "customer") + exists, err = isTableInDenyList(t, vc, "product/0", "customer") require.NoError(t, err, "Error getting denylist for customer:0") require.False(t, exists) @@ -1739,12 +1739,12 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e require.Equal(t, 1, len(res.Rows)) historyLen, err = res.Rows[0][0].ToInt64() require.NoError(t, err) - if historyLen > expectedLength { + if historyLen >= expectedLength { return } select { case <-timer.C: - t.Fatalf("Did not reach the expected InnoDB history length of %d before the timeout of %s; last seen value: %d", expectedLength, defaultTimeout, historyLen) + require.FailNow(t, "Did not reach the minimum expected InnoDB history length of %d before the timeout of %s; last seen value: %d", expectedLength, defaultTimeout, historyLen) default: time.Sleep(defaultTick) }