From e87457ee2401a8bb121a0befda756708fe826414 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 6 Nov 2024 14:11:05 -0500 Subject: [PATCH] VReplication: Relax restrictions on Cancel and ReverseTraffic when writes not involved (#17128) Signed-off-by: Matt Lord --- .../vreplication/multi_tenant_test.go | 12 + .../vreplication_vtctldclient_cli_test.go | 24 +- go/vt/vtctl/workflow/server.go | 76 +++-- go/vt/vtctl/workflow/server_test.go | 285 ++++++++++++++++-- go/vt/vtctl/workflow/utils.go | 2 +- 5 files changed, 344 insertions(+), 55 deletions(-) diff --git a/go/test/endtoend/vreplication/multi_tenant_test.go b/go/test/endtoend/vreplication/multi_tenant_test.go index c82ee8a620f..6bceaeefc6e 100644 --- a/go/test/endtoend/vreplication/multi_tenant_test.go +++ b/go/test/endtoend/vreplication/multi_tenant_test.go @@ -214,6 +214,18 @@ func TestMultiTenantSimple(t *testing.T) { require.Zero(t, rowCount) }) + t.Run("cancel after switching reads", func(t *testing.T) { + // First let's test canceling the workflow after only switching reads + // to ensure that it properly cleans up all of the state. + createFunc() + mt.SwitchReads() + confirmOnlyReadsSwitched(t) + mt.Cancel() + confirmNoRoutingRules(t) + rowCount := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1")) + require.Zero(t, rowCount) + }) + // Create again and run it to completion. createFunc() diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 4b556955815..4ee977c4d74 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -304,6 +304,22 @@ func testMoveTablesFlags3(t *testing.T, sourceKeyspace, targetKeyspace string, t // Confirm that the source tables were renamed. require.True(t, checkTablesExist(t, "zone1-100", []string{"_customer2_old"})) require.False(t, checkTablesExist(t, "zone1-100", []string{"customer2"})) + + // Confirm that we can cancel a workflow after ONLY switching read traffic. + mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, "customer", createFlags, nil, nil) + mt.Start() // Need to start because we set stop-after-copy to true. + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + for _, tab := range targetTabs { + catchup(t, tab, workflowName, "MoveTables") + } + mt.SwitchReads() + wf := mt.(iWorkflow) + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) + mt.Cancel() + confirmNoRoutingRules(t) } // Create two workflows in order to confirm that listing all workflows works. @@ -450,6 +466,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards "--all-cells", "--format=json", "--config-overrides", mapToCSV(overrides), } + rs := newReshard(vc, &reshardWorkflow{ workflowInfo: &workflowInfo{ vc: vc, @@ -460,7 +477,6 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards targetShards: targetShards, createFlags: createFlags, }, workflowFlavorVtctld) - ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName) wf := rs.(iWorkflow) rs.Create() @@ -769,8 +785,10 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules { } func confirmNoRoutingRules(t *testing.T) { - routingRulesResponse := getRoutingRules(t) - require.Zero(t, len(routingRulesResponse.Rules)) + rrRes := getRoutingRules(t) + require.Zero(t, len(rrRes.Rules)) + krrRes := getKeyspaceRoutingRules(t, vc) + require.Zero(t, len(krrRes.Rules)) } func confirmRoutingRulesExist(t *testing.T) { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index b47d99118ce..7c49de58c9b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -148,9 +148,9 @@ var ( // ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple // target keyspaces across different shard primaries. This should be // impossible. - ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow") - ErrWorkflowNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic") - ErrWorkflowPartiallySwitched = errors.New("cannot cancel workflow because you have already switched some or all read and write traffic") + ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow") + ErrWorkflowCompleteNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic") + ErrWorkflowDeleteWritesSwitched = errors.New("cannot delete workflow because you have already switched write traffic") ) // Server provides an API to work with Vitess workflows, like vreplication @@ -1736,7 +1736,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa } if !state.WritesSwitched || len(state.ReplicaCellsNotSwitched) > 0 || len(state.RdonlyCellsNotSwitched) > 0 { - return nil, ErrWorkflowNotFullySwitched + return nil, ErrWorkflowCompleteNotFullySwitched } var renameTable TableRemovalType if req.RenameTables { @@ -2111,10 +2111,12 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe } if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { - // Return an error if the workflow traffic is partially switched. - if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 { - return nil, ErrWorkflowPartiallySwitched + // Return an error if the write workflow traffic is switched. + if state.WritesSwitched { + return nil, ErrWorkflowDeleteWritesSwitched } + // If only reads have been switched, then we can delete the + // workflow and its related artifacts. } // Lock the workflow for deletion. @@ -2158,22 +2160,21 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe ts.workflowType) } // We need to delete the rows that the target tables would have for the tenant. - // We don't cleanup other related artifacts since they are not tied to the tenant. if !req.GetKeepData() { if err := s.deleteTenantData(ctx, ts, req.DeleteBatchSize); err != nil { return nil, vterrors.Wrapf(err, "failed to fully delete all migrated data for tenant %s, please retry the operation", ts.options.TenantId) } } - } else { - // Cleanup related data and artifacts. There are none for a LookupVindex workflow. - if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { - if _, err := s.dropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil { - if topo.IsErrType(err, topo.NoNode) { - return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace()) - } - return nil, err + } + + // Cleanup related data and artifacts. There are none for a LookupVindex workflow. + if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { + if _, err := s.dropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil { + if topo.IsErrType(err, topo.NoNode) { + return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace()) } + return nil, err } } @@ -2697,8 +2698,10 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData, if !keepData { switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: - if err := sw.removeTargetTables(ctx); err != nil { - return nil, err + if !ts.IsMultiTenantMigration() { + if err := sw.removeTargetTables(ctx); err != nil { + return nil, err + } } if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err @@ -3239,12 +3242,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") } - if direction == DirectionBackward && ts.IsMultiTenantMigration() { - // In a multi-tenant migration, multiple migrations would be writing to the same - // table, so we can't stop writes like we do with MoveTables, using denied tables, - // since it would block all other migrations as well as traffic for tenants which - // have already been migrated. - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") + if ts.IsMultiTenantMigration() { + // Multi-tenant migrations use keyspace routing rules, so we need to update the state + // using them. + err = updateKeyspaceRoutingState(ctx, ts.TopoServer(), ts.sourceKeyspace, ts.targetKeyspace, startState) + if err != nil { + return nil, vterrors.Wrap(err, "failed to update multi-tenant workflow state using keyspace routing rules") + } } // We need this to know when there isn't a (non-FROZEN) reverse workflow to use. @@ -3255,6 +3259,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor (direction == DirectionBackward && !startState.WritesSwitched) if direction == DirectionBackward && !onlySwitchingReads { + if ts.IsMultiTenantMigration() { + // In a multi-tenant migration, multiple migrations would be writing to the same + // table, so we can't stop writes like we do with MoveTables, using denied tables, + // since it would block all other migrations as well as traffic for tenants which + // have already been migrated. + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse write traffic for multi-tenant migrations") + } // This means that the main workflow is FROZEN and the reverse workflow // exists. So we update the starting state so that we're using the reverse // workflow and we can move forward with a normal traffic switch forward @@ -3336,6 +3347,15 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor resp.StartState = startState.String() s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) _, currentState, err := s.getWorkflowState(ctx, ts.targetKeyspace, ts.workflow) + if ts.IsMultiTenantMigration() { + // Multi-tenant migrations use keyspace routing rules, so we need to update the state + // using them. + sourceKs, targetKs := ts.sourceKeyspace, ts.targetKeyspace + if TrafficSwitchDirection(req.Direction) == DirectionBackward { + sourceKs, targetKs = targetKs, sourceKs + } + err = updateKeyspaceRoutingState(ctx, ts.TopoServer(), sourceKs, targetKs, currentState) + } if err != nil { resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) } else { @@ -3387,11 +3407,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // shard level traffic switching is all or nothing trafficSwitchingIsAllOrNothing = true case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.IsMultiTenantMigration(): - if direction == DirectionBackward { - return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, - "requesting reversal of read traffic for multi-tenant migrations is not supported")) - } - // For multi-tenant migrations, we only support switching traffic to all cells at once + // For multi-tenant migrations, we only support switching traffic to all cells at once. allCells, err := ts.TopoServer().GetCellInfoNames(ctx) if err != nil { return nil, err @@ -3415,7 +3431,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 { return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, - "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) + "requesting reversal of read traffic for RDONLYs but RDONLY reads have not been switched")) } } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index b7783fc2945..dbe06ab1a47 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -753,6 +753,139 @@ func TestWorkflowDelete(t *testing.T) { postFunc func(t *testing.T, env *testEnv) expectedLogs []string }{ + { + name: "delete workflow", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table1Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table2Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table3Name), + result: &querypb.QueryResult{}, + }, + }, + want: &vtctldatapb.WorkflowDeleteResponse{ + Summary: fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + Details: []*vtctldatapb.WorkflowDeleteResponse_TabletInfo{ + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, + Deleted: true, + }, + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, + Deleted: true, + }, + }, + }, + }, + { + name: "delete workflow with only reads switched", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table1Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table2Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table3Name), + result: &querypb.QueryResult{}, + }, + }, + preFunc: func(t *testing.T, env *testEnv) { + // Setup the routing rules as they would be after having previously done SwitchTraffic + // for replica and rdonly tablets. + env.updateTableRoutingRules(t, ctx, roTabletTypes, []string{table1Name, table2Name, table3Name}, + sourceKeyspaceName, targetKeyspaceName, targetKeyspaceName) + }, + want: &vtctldatapb.WorkflowDeleteResponse{ + Summary: fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + Details: []*vtctldatapb.WorkflowDeleteResponse_TabletInfo{ + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, + Deleted: true, + }, + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, + Deleted: true, + }, + }, + }, + }, + { + name: "delete workflow with writes switched", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + preFunc: func(t *testing.T, env *testEnv) { + // Setup the routing rules as they would be after having previously + // done SwitchTraffic with for all tablet types. + env.updateTableRoutingRules(t, ctx, allTabletTypes, []string{table1Name, table2Name, table3Name}, + sourceKeyspaceName, targetKeyspaceName, targetKeyspaceName) + }, + wantErr: ErrWorkflowDeleteWritesSwitched.Error(), + postFunc: func(t *testing.T, env *testEnv) { + // Clear out the routing rules we put in place. + err := env.ts.SaveRoutingRules(ctx, &vschemapb.RoutingRules{}) + require.NoError(t, err) + }, + }, { name: "missing table", sourceKeyspace: &testKeyspace{ @@ -983,7 +1116,7 @@ func TestWorkflowDelete(t *testing.T) { wantErr: "unsupported workflow type \"Reshard\" for multi-tenant migration", }, { - name: "multi-tenant workflow without predicate ", + name: "multi-tenant workflow without predicate", sourceKeyspace: &testKeyspace{ KeyspaceName: sourceKeyspaceName, ShardNames: []string{"0"}, @@ -1339,6 +1472,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { name string sourceKeyspace, targetKeyspace *testKeyspace req *vtctldatapb.WorkflowSwitchTrafficRequest + multiTenant bool preFunc func(env *testEnv) want *vtctldatapb.WorkflowSwitchTrafficResponse wantErr bool @@ -1409,6 +1543,48 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { CurrentState: "Reads Not Switched. Writes Not Switched", }, }, + { + name: "backward for multi-tenant workflow and read-only tablets", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: roTabletTypes, + }, + multiTenant: true, + want: &vtctldatapb.WorkflowSwitchTrafficResponse{ + Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), + StartState: "All Reads Switched. Writes Not Switched", + CurrentState: "Reads Not Switched. Writes Not Switched", + }, + }, + { + name: "backward for multi-tenant workflow for all tablet types", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + multiTenant: true, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: allTabletTypes, + }, + wantErr: true, + }, { name: "forward with tablet refresh error", sourceKeyspace: &testKeyspace{ @@ -1459,6 +1635,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { }, }, } + for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { require.NotNil(t, tc.sourceKeyspace) @@ -1467,6 +1644,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) defer env.close() env.tmc.schema = schema + if tc.req.Direction == int32(DirectionForward) { env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, copyTableQR) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, cutoverQR) @@ -1508,9 +1686,47 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) } } + if tc.preFunc != nil { tc.preFunc(env) } + + if tc.multiTenant { + rwr := &readVReplicationWorkflowRequestResponse{ + req: &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{ + Workflow: workflowName, + }, + res: &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + Options: `{"tenant_id": "1"}`, // This is all we need for it to be considered a multi-tenant workflow + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: sourceKeyspaceName, + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "t1", + Filter: "select * from t1", + }, + }, + }, + }, + }, + }, + }, + } + env.tmc.expectReadVReplicationWorkflowRequestOnTargetTablets(rwr) + // Multi-tenant workflows also use keyspace routing rules. So we set those + // up as if we've already switched the traffic. + if tc.req.Direction == int32(DirectionBackward) { + err := changeKeyspaceRouting(ctx, env.ts, tc.req.TabletTypes, tc.sourceKeyspace.KeyspaceName, + tc.targetKeyspace.KeyspaceName, "SwitchTraffic") + require.NoError(t, err) + } + } + got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req) if tc.wantErr { require.Error(t, err) @@ -1519,34 +1735,61 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { require.NoError(t, err) require.Equal(t, tc.want.String(), got.String(), "Server.WorkflowSwitchTraffic() = %v, want %v", got, tc.want) - // Confirm that we have the expected routing rules. - rr, err := env.ts.GetRoutingRules(ctx) - require.NoError(t, err) - for _, rr := range rr.Rules { - _, rrTabletType, found := strings.Cut(rr.FromTable, "@") - if !found { // No @ is primary - rrTabletType = topodatapb.TabletType_PRIMARY.String() + if tc.multiTenant { // Confirm the keyspace routing rules + gotKrrs, err := env.ts.GetKeyspaceRoutingRules(ctx) + require.NoError(t, err) + sort.Slice(gotKrrs.Rules, func(i, j int) bool { + return gotKrrs.Rules[i].FromKeyspace < gotKrrs.Rules[j].FromKeyspace + }) + expectedKrrs := &vschemapb.KeyspaceRoutingRules{} + for _, tabletType := range tc.req.TabletTypes { + suffix := "" + if tabletType != topodatapb.TabletType_PRIMARY { + suffix = fmt.Sprintf("@%s", strings.ToLower(tabletType.String())) + } + toKs, fromKs := tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName + if tc.req.Direction == int32(DirectionBackward) { + fromKs, toKs = toKs, fromKs + } + expectedKrrs.Rules = append(expectedKrrs.Rules, &vschemapb.KeyspaceRoutingRule{ + FromKeyspace: fromKs + suffix, + ToKeyspace: toKs, + }) } - tabletType, err := topoproto.ParseTabletType(rrTabletType) + sort.Slice(expectedKrrs.Rules, func(i, j int) bool { + return expectedKrrs.Rules[i].FromKeyspace < expectedKrrs.Rules[j].FromKeyspace + }) + require.Equal(t, expectedKrrs.String(), gotKrrs.String()) + } else { // Confirm the [table] routing rules + rr, err := env.ts.GetRoutingRules(ctx) require.NoError(t, err) + for _, rr := range rr.Rules { + _, rrTabletType, found := strings.Cut(rr.FromTable, "@") + if !found { // No @ is primary + rrTabletType = topodatapb.TabletType_PRIMARY.String() + } + tabletType, err := topoproto.ParseTabletType(rrTabletType) + require.NoError(t, err) - var to string - if slices.Contains(tc.req.TabletTypes, tabletType) { - to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) - if tc.req.Direction == int32(DirectionBackward) { + var to string + if slices.Contains(tc.req.TabletTypes, tabletType) { + to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + } + } else { to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + } } - } else { - to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) - if tc.req.Direction == int32(DirectionBackward) { - to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + for _, tt := range rr.ToTables { + require.Equal(t, to, tt, "Additional info: tablet type: %s, rr.FromTable: %s, rr.ToTables: %v, to string: %s", + tabletType.String(), rr.FromTable, rr.ToTables, to) } } - for _, tt := range rr.ToTables { - require.Equal(t, to, tt, "Additional info: tablet type: %s, rr.FromTable: %s, rr.ToTables: %v, to string: %s", - tabletType.String(), rr.FromTable, rr.ToTables, to) - } } + // Confirm that we have the expected denied tables entries. if slices.Contains(tc.req.TabletTypes, topodatapb.TabletType_PRIMARY) { for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 2850dd1678e..50f35667eaf 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -910,7 +910,7 @@ func validateTenantId(dataType querypb.Type, value string) error { } func updateKeyspaceRoutingState(ctx context.Context, ts *topo.Server, sourceKeyspace, targetKeyspace string, state *State) error { - // For multi-tenant migrations, we only support switching traffic to all cells at once + // For multi-tenant migrations, we only support switching traffic to all cells at once. cells, err := ts.GetCellInfoNames(ctx) if err != nil { return err