diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index c28118a97cc..1e0ee2e24a3 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -113,6 +113,73 @@ func throttlerCheckSelf(tablet *cluster.VttabletProcess, throttlerApp throttlera return respBody, err } +// Note: this is a manual test. +func TestVReplicationStatusMatch(t *testing.T) { + var err error + workflow := "vreplication" + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + table := "orders" + cell := "zone1" + shard := "0" + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultCell := vc.Cells[cell] + + if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil); err != nil { + t.Fatal(err) + } + if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil); err != nil { + t.Fatal(err) + } + vtgate := defaultCell.Vtgates[0] + require.NotNil(t, vtgate) + + verifyClusterHealth(t, vc) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + sourceTab = vc.getPrimaryTablet(t, sourceKs, shard) + targetTab := vc.getPrimaryTablet(t, targetKs, shard) + + insertInitialData(t) + + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", sourceKs), 1, false) + require.NoError(t, err) + + // Generate enough data to observe Copying status after resuming workflow. + for id := 3; id < 55555; id++ { + vtgateConn.ExecuteFetch(fmt.Sprintf("insert into customer (cid, name) values (%d, 'mock data')", id), -1, false) + } + + moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, table) + + testVReplicationStatusEquality(t, targetTab.Port, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Copying, "Start workflow") + + moveTablesAction(t, "Stop", defaultCellName, workflow, sourceKs, targetKs, table) + + testVReplicationStatusEquality(t, targetTab.Port, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped, "Stop workflow") + + moveTablesAction(t, "Start", defaultCellName, workflow, sourceKs, targetKs, table) + + testVReplicationStatusEquality(t, targetTab.Port, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Copying, "Resume workflow") + + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + + testVReplicationStatusEquality(t, targetTab.Port, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running, "After copy complete") +} + +func testVReplicationStatusEquality(t *testing.T, tabletPort int, vc *VitessCluster, ksWorkflow string, expectedState binlogdatapb.VReplicationWorkflowState, currentStep string) { + getDebugVarOutput, _ := getDebugVar(t, tabletPort, []string{"VReplicationStreamState"}) + workflowShowOutput, _ := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") + + statState := gjson.Get(getDebugVarOutput, "vreplication*").String() + workflowState := gjson.Get(workflowShowOutput, "ShardStatuses.0/zone1-0000000200.PrimaryReplicationStatuses.0.State").String() + + if statState != expectedState.String() || statState != workflowState { + t.Fatalf("Expected statState (%s) and workflowState (%s) to be %s on step: %s", statState, workflowState, expectedState, currentStep) + } +} + // TestVReplicationDDLHandling tests the DDL handling in // VReplication for the values of IGNORE, STOP, and EXEC. // NOTE: this is a manual test. It is not executed in the diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index dfe51f71dbd..03780315a30 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -388,6 +388,9 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma defer vc.vr.stats.PhaseTimings.Record("copy", time.Now()) defer vc.vr.stats.CopyLoopCount.Add(1) + if err := vc.vr.setState(binlogdatapb.VReplicationWorkflowState_Copying, ""); err != nil { + return err + } log.Infof("Copying table %s, lastpk: %v", tableName, copyState[tableName]) plan, err := buildReplicatorPlan(vc.vr.source, vc.vr.colInfoMap, nil, vc.vr.stats, vc.vr.vre.env.CollationEnv(), vc.vr.vre.env.Parser())