Skip to content

Commit

Permalink
Refactor tstWorkflowExec to use options for atomicCopy/deferForeignKe…
Browse files Browse the repository at this point in the history
…ys, since deferForeignKeys cannot be used for schemas with foreign keys

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Nov 3, 2023
1 parent 4886266 commit 15f768b
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestMoveTablesBuffering(t *testing.T) {
setupMinimalCustomerKeyspace(t)
tables := "loadtest"
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
tables, workflowActionCreate, "", "", "", false)
tables, workflowActionCreate, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())

Expand Down
12 changes: 6 additions & 6 deletions go/test/endtoend/vreplication/partial_movetables_seq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (wf *workflow) create() {
currentWorkflowType = wrangler.MoveTablesWorkflow
sourceShards := strings.Join(wf.options.sourceShards, ",")
err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace,
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", false)
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", defaultWorkflowExecOptions)
case "reshard":
currentWorkflowType = wrangler.ReshardWorkflow
sourceShards := strings.Join(wf.options.sourceShards, ",")
Expand All @@ -248,7 +248,7 @@ func (wf *workflow) create() {
targetShards = sourceShards
}
err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace,
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, targetShards, false)
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, targetShards, defaultWorkflowExecOptions)
default:
panic(fmt.Sprintf("unknown workflow type: %s", wf.typ))
}
Expand All @@ -266,15 +266,15 @@ func (wf *workflow) create() {
}

func (wf *workflow) switchTraffic() {
require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionSwitchTraffic, "", "", "", false))
require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions))
}

func (wf *workflow) reverseTraffic() {
require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionReverseTraffic, "", "", "", false))
require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions))
}

func (wf *workflow) complete() {
require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionComplete, "", "", "", false))
require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions))
}

// TestPartialMoveTablesWithSequences enhances TestPartialMoveTables by adding an unsharded keyspace which has a
Expand Down Expand Up @@ -505,7 +505,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
// We switched traffic, so it's the reverse workflow we want to cancel.
reverseWf := wf + "_reverse"
reverseKs := sourceKs // customer
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false)
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)

output, err := tc.vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show")
Expand Down
12 changes: 6 additions & 6 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// start the partial movetables for 80-
err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs,
"customer,loadtest", workflowActionCreate, "", shard, "", false)
"customer,loadtest", workflowActionCreate, "", shard, "", defaultWorkflowExecOptions)
require.NoError(t, err)
var lg *loadGenerator
if runWithLoad { // start load after routing rules are set, otherwise we end up with ambiguous tables
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {
require.Contains(t, err.Error(), "target: customer.-80.primary", "Query was routed to the target before any SwitchTraffic")

// Switch all traffic for the shard
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false))
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions))
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// We cannot Complete a partial move tables at the moment because
// it will find that all traffic has (obviously) not been switched.
err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", false)
err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions)
require.Error(t, err)

// Confirm global routing rules: -80 should still be be routed to customer
Expand All @@ -288,14 +288,14 @@ func TestPartialMoveTablesBasic(t *testing.T) {
ksWf = fmt.Sprintf("%s.%s", targetKs, wfName)
// Start the partial movetables for -80, 80- has already been switched
err = tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs,
"customer,loadtest", workflowActionCreate, "", shard, "", false)
"customer,loadtest", workflowActionCreate, "", shard, "", defaultWorkflowExecOptions)
require.NoError(t, err)
targetTab2 := vc.getPrimaryTablet(t, targetKs, shard)
catchup(t, targetTab2, wfName, "Partial MoveTables Customer to Customer2: -80")
vdiffSideBySide(t, ksWf, "")

// Switch all traffic for the shard
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false))
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions))
expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
targetKs, wfName)
require.Equal(t, expectedSwitchOutput, lastOutput)
Expand All @@ -316,7 +316,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {
// We switched traffic, so it's the reverse workflow we want to cancel.
reverseWf := wf + "_reverse"
reverseKs := sourceKs // customer
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false)
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show")
Expand Down
43 changes: 28 additions & 15 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,18 @@ var (
currentWorkflowType wrangler.VReplicationWorkflowType
)

type workflowExecOptions struct {
deferSecondaryKeys bool
atomicCopy bool
}

var defaultWorkflowExecOptions = &workflowExecOptions{
deferSecondaryKeys: true,
}

func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) error {
err := tstWorkflowExec(t, defaultCellName, workflowName, targetKs, targetKs,
"", workflowActionCreate, "", sourceShards, targetShards, false)
"", workflowActionCreate, "", sourceShards, targetShards, defaultWorkflowExecOptions)
require.NoError(t, err)
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab1}, targetKs, "")
Expand All @@ -78,7 +87,7 @@ func createMoveTablesWorkflow(t *testing.T, tables string) {
tables = tablesToMove
}
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
tables, workflowActionCreate, "", "", "", false)
tables, workflowActionCreate, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab1}, targetKs, tables)
Expand All @@ -88,10 +97,12 @@ func createMoveTablesWorkflow(t *testing.T, tables string) {
}

func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error {
return tstWorkflowExec(t, cells, workflowName, sourceKs, targetKs, tablesToMove, action, tabletTypes, "", "", false)
return tstWorkflowExec(t, cells, workflowName, sourceKs, targetKs, tablesToMove, action, tabletTypes, "", "", defaultWorkflowExecOptions)
}

func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error {
func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes,
sourceShards, targetShards string, options *workflowExecOptions) error {

var args []string
if currentWorkflowType == wrangler.MoveTablesWorkflow {
args = append(args, "MoveTables")
Expand All @@ -104,7 +115,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
if BypassLagCheck {
args = append(args, "--max_replication_lag_allowed=2542087h")
}
if atomicCopy {
if options.atomicCopy {
args = append(args, "--atomic-copy")
}
switch action {
Expand All @@ -125,10 +136,12 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
// Test new experimental --defer-secondary-keys flag
switch currentWorkflowType {
case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow:
// fixme: add a parameter to pass flags, so we can conditionally add --defer-secondary-keys
//if !atomicCopy {
//args = append(args, "--defer-secondary-keys")
//}

if !options.atomicCopy && options.deferSecondaryKeys {
log.Infof("Testing --defer-secondary-keys flag, %t, %t, %s, %s, %s",
options.atomicCopy, options.deferSecondaryKeys, sourceKs, targetKs, tables)
args = append(args, "--defer-secondary-keys")
}
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
}
Expand Down Expand Up @@ -318,17 +331,17 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
// use MoveTables to move customer2 from product to customer using
currentWorkflowType = wrangler.MoveTablesWorkflow
err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"customer2", workflowActionCreate, "", "", "", false)
"customer2", workflowActionCreate, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)

waitForWorkflowState(t, vc, "customer.wf2", binlogdatapb.VReplicationWorkflowState_Running.String())
waitForLowLag(t, "customer", "wf2")

err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"", workflowActionSwitchTraffic, "", "", "", false)
"", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)
err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"", workflowActionComplete, "", "", "", false)
"", workflowActionComplete, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)

// sanity check
Expand All @@ -353,16 +366,16 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {

// use MoveTables to move customer2 back to product. Note that now the table has an associated sequence
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"customer2", workflowActionCreate, "", "", "", false)
"customer2", workflowActionCreate, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)
waitForWorkflowState(t, vc, "product.wf3", binlogdatapb.VReplicationWorkflowState_Running.String())

waitForLowLag(t, "product", "wf3")
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"", workflowActionSwitchTraffic, "", "", "", false)
"", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"", workflowActionComplete, "", "", "", false)
"", workflowActionComplete, "", "", "", defaultWorkflowExecOptions)
require.NoError(t, err)

// sanity check
Expand Down
13 changes: 9 additions & 4 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ func (vmt *VtctlMoveTables) Create() {

func (vmt *VtctlMoveTables) SwitchReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionSwitchTraffic, "", "", "", vmt.atomicCopy)
vmt.tables, workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy)
vmt.tables, workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions)
require.NoError(vmt.vc.t, err)
}

Expand All @@ -126,8 +126,12 @@ func (vmt *VtctlMoveTables) Show() {
}

func (vmt *VtctlMoveTables) exec(action string) {
options := &workflowExecOptions{
deferSecondaryKeys: false,
atomicCopy: vmt.atomicCopy,
}
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, action, vmt.tabletTypes, vmt.sourceShards, "", vmt.atomicCopy)
vmt.tables, action, vmt.tabletTypes, vmt.sourceShards, "", options)
require.NoError(vmt.vc.t, err)
}
func (vmt *VtctlMoveTables) SwitchReads() {
Expand Down Expand Up @@ -279,8 +283,9 @@ func (vrs *VtctlReshard) Show() {
}

func (vrs *VtctlReshard) exec(action string) {
options := &workflowExecOptions{}
err := tstWorkflowExec(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace,
"", action, vrs.tabletTypes, vrs.sourceShards, vrs.targetShards, false)
"", action, vrs.tabletTypes, vrs.sourceShards, vrs.targetShards, options)
require.NoError(vrs.vc.t, err)
}

Expand Down
3 changes: 0 additions & 3 deletions go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,6 @@ func (df *vdiff) startQueryStreams(ctx context.Context, keyspace string, partici
log.Errorf("WaitForPosition error: %s", err)
return vterrors.Wrapf(err, "WaitForPosition for tablet %v", topoproto.TabletAliasString(participant.tablet.Alias))
}
log.Infof("WaitForPosition: tablet %s did reach position %s", participant.tablet.Alias.String(), replication.EncodePosition(participant.position))
participant.result = make(chan *sqltypes.Result, 1)
gtidch := make(chan string, 1)

Expand Down Expand Up @@ -973,9 +972,7 @@ func (df *vdiff) streamOne(ctx context.Context, keyspace, shard string, particip
TabletType: participant.tablet.Type,
}
var fields []*querypb.Field
log.Infof("VStreamResults: tablet %s.%s.%s will execute %s", keyspace, shard, participant.tablet.Alias.String(), query)
return conn.VStreamResults(ctx, target, query, func(vrs *binlogdatapb.VStreamResultsResponse) error {
log.Infof("VStreamResults: tablet %s.%s.%s received %d rows, gtid %s", keyspace, shard, participant.tablet.Alias.String(), len(vrs.Rows), vrs.Gtid)
if vrs.Fields != nil {
fields = vrs.Fields
gtidch <- vrs.Gtid
Expand Down

0 comments on commit 15f768b

Please sign in to comment.