diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 2f6e0659025..6c38e4da09b 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -994,8 +994,11 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } +// cancelMigration attempts to revert all changes made during the migration so that we can get back to the +// state when traffic switching (or reversing) was initiated. func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { var err error +<<<<<<< HEAD if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { err = ts.changeTableSourceWrites(ctx, allowWrites) } else { @@ -1003,21 +1006,43 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat } if err != nil { ts.Logger().Errorf("Cancel migration failed: %v", err) +======= + + if ctx.Err() != nil { + // Even though we create a new context later on we still record any context error: + // for forensics in case of failures. + ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err()) +>>>>>>> e4dc8729ec (SwitchTraffic: use separate context while canceling a migration (#17340)) } - sm.CancelStreamMigrations(ctx) + // We create a new context while canceling the migration, so that we are independent of the original + // context being cancelled prior to or during the cancel operation. + cmTimeout := 60 * time.Second + cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout) + defer cmCancel() + + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { + err = ts.switchDeniedTables(cmCtx) + } else { + err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) + } + if err != nil { + ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) + } + + sm.CancelStreamMigrations(cmCtx) err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) - _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query) + _, err := ts.TabletManagerClient().VReplicationExec(cmCtx, target.GetPrimary().Tablet, query) return err }) if err != nil { ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } - err = ts.deleteReverseVReplication(ctx) + err = ts.deleteReverseVReplication(cmCtx) if err != nil { ts.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err) }