From bcd398a4c89011f62ef3b3fe14b9227095abf038 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 19 Sep 2024 12:29:00 -0400 Subject: [PATCH 1/9] Undo vschema changes on workflow creation fail Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_test.go | 16 +-- go/vt/vtctl/workflow/server.go | 132 +++++++++++++++------- 2 files changed, 98 insertions(+), 50 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 763dd7c04d3..84119735dee 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1174,7 +1174,7 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { setStartingVschema() }() } - outms, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) + outms, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) if tcase.err != "" { require.Error(t, err) require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err) @@ -1419,7 +1419,7 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, tcase.out) { t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out) @@ -1654,7 +1654,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { t.Fatal(err) } - _, _, got, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) @@ -1772,7 +1772,7 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("same keyspace: got:\n%v, want\n%v", got, want) @@ -1898,7 +1898,7 @@ func TestCreateCustomizedVindex(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("customize create lookup error same: got:\n%v, want\n%v", got, want) @@ -2016,7 +2016,7 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { t.Fatal(err) } - ms, ks, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + ms, ks, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(wantKs, ks) { t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) @@ -2096,11 +2096,11 @@ func TestStopAfterCopyFlag(t *testing.T) { t.Fatal(err) } - ms1, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + ms1, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) require.Equal(t, ms1.StopAfterCopy, true) - ms2, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) + ms2, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) require.NoError(t, err) require.Equal(t, ms2.StopAfterCopy, false) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 88365bd54ef..f53ceb5c113 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1127,22 +1127,32 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup span.Annotate("cells", req.Cells) span.Annotate("tablet_types", req.TabletTypes) - ms, sourceVSchema, targetVSchema, err := s.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) + ms, sourceVSchema, targetVSchema, cancelFunc, err := s.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) if err != nil { return nil, err } + if err := s.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { return nil, err } - ms.TabletTypes = topoproto.MakeStringTypeCSV(req.TabletTypes) ms.TabletSelectionPreference = req.TabletSelectionPreference if err := s.Materialize(ctx, ms); err != nil { + if cancelFunc != nil { + if cerr := cancelFunc(); cerr != nil { + err = vterrors.Wrapf(err, "failed to restore original vschema '%v' in the %s keyspace: %v", + targetVSchema, ms.TargetKeyspace, cerr) + } + } return nil, err } - if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil { - return nil, err + if ms.SourceKeyspace != ms.TargetKeyspace { + if err := s.ts.SaveVSchema(ctx, ms.SourceKeyspace, sourceVSchema); err != nil { + return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", + sourceVSchema, ms.SourceKeyspace) + } } + if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -3836,7 +3846,8 @@ func fillStringTemplate(tmpl string, vars any) (string, error) { // prepareCreateLookup performs the preparatory steps for creating a // Lookup Vindex. -func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) { +func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( + ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { // Important variables are pulled out here. var ( vindexName string @@ -3862,19 +3873,19 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // Validate input vindex. if specs == nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") } if len(specs.Vindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") } vindexName = maps.Keys(specs.Vindexes)[0] vindex = maps.Values(specs.Vindexes)[0] if !strings.Contains(vindex.Type, "lookup") { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) } targetKeyspace, targetTableName, err = s.env.Parser().ParseTable(vindex.Params["table"]) if err != nil || targetKeyspace == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .", vindex.Params["table"]) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .
", vindex.Params["table"]) } vindexFromCols = strings.Split(vindex.Params["from"], ",") for i, col := range vindexFromCols { @@ -3882,11 +3893,11 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if strings.Contains(vindex.Type, "unique") { if len(vindexFromCols) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") } } else { if len(vindexFromCols) < 2 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "non-unique vindex 'from' should have more than one column") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "non-unique vindex 'from' should have more than one column") } } vindexToCol = vindex.Params["to"] @@ -3895,7 +3906,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str vindex.Params["write_only"] = "true" // See if we can create the vindex without errors. if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { // This mirrors the behavior of vindexes.boolFromMap(). @@ -3905,19 +3916,22 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str case "false": vindexIgnoreNulls = false default: - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", - ignoreNullsStr) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", + ignoreNullsStr) } } // Validate input table. if len(specs.Tables) < 1 || len(specs.Tables) > 2 { - return nil, nil, nil, fmt.Errorf("one or two tables must be specified") + return nil, nil, nil, nil, fmt.Errorf("one or two tables must be specified") } // Loop executes once or twice. for tableName, table := range specs.Tables { if len(table.ColumnVindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", tableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", + tableName) } if tableName != targetTableName { // This is the source table. sourceTableName = tableName @@ -3931,42 +3945,55 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str vindexCols = table.ColumnVindexes[0].Columns } else { if table.ColumnVindexes[0].Column == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", tableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + tableName) } vindexCols = []string{table.ColumnVindexes[0].Column} } if !slices.Equal(vindexCols, vindexFromCols) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", - tableName, strings.Join(vindexCols, ","), strings.Join(vindexFromCols, ",")) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", + tableName, strings.Join(vindexCols, ","), strings.Join(vindexFromCols, ",")) } } // Validate input table and vindex consistency. if sourceTable == nil || len(sourceTable.ColumnVindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table in the %s keyspace", keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table in the %s keyspace", + keyspace) } if sourceTable.ColumnVindexes[0].Name != vindexName { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", sourceTable.ColumnVindexes[0].Name, vindexName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", + sourceTable.ColumnVindexes[0].Name, vindexName) } if vindex.Owner != "" && vindex.Owner != sourceTableName { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", vindex.Owner, sourceTableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", + vindex.Owner, sourceTableName) } if len(sourceTable.ColumnVindexes[0].Columns) != 0 { sourceVindexColumns = sourceTable.ColumnVindexes[0].Columns } else { if sourceTable.ColumnVindexes[0].Column == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", sourceTableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + sourceTableName) } sourceVindexColumns = []string{sourceTable.ColumnVindexes[0].Column} } if len(sourceVindexColumns) != len(vindexFromCols) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", len(sourceVindexColumns), len(vindexFromCols)) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", + len(sourceVindexColumns), len(vindexFromCols)) } // Validate against source vschema. sourceVSchema, err = s.ts.GetVSchema(ctx, keyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if sourceVSchema.Vindexes == nil { sourceVSchema.Vindexes = make(map[string]*vschemapb.Vindex) @@ -3978,7 +4005,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } else { targetVSchema, err = s.ts.GetVSchema(ctx, targetKeyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } } if targetVSchema.Vindexes == nil { @@ -3989,12 +4016,15 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if existing, ok := sourceVSchema.Vindexes[vindexName]; ok { if !proto.Equal(existing, vindex) { // If the exact same vindex already exists then we can re-use it - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", vindexName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", + vindexName, keyspace) } } sourceVSchemaTable = sourceVSchema.Tables[sourceTableName] if sourceVSchemaTable == nil && !schema.IsInternalOperationTableName(sourceTableName) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", sourceTableName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", sourceTableName, keyspace) } for _, colVindex := range sourceVSchemaTable.ColumnVindexes { // For a conflict, the vindex name and column should match. @@ -4011,41 +4041,47 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // are not the same then they are two distinct conflicting vindexes and we should // not proceed. if !slices.Equal(colNames, sourceVindexColumns) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", - strings.Join(colNames, ","), sourceTableName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", + strings.Join(colNames, ","), sourceTableName, keyspace) } } // Validate against source schema. sourceShards, err := s.ts.GetServingShards(ctx, keyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } onesource := sourceShards[0] if onesource.PrimaryAlias == nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) } req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{sourceTableName}} tableSchema, err := schematools.GetSchema(ctx, s.ts, s.tmc, onesource.PrimaryAlias, req) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if len(tableSchema.TableDefinitions) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", len(tableSchema.TableDefinitions), keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", + len(tableSchema.TableDefinitions), keyspace) } // Generate "create table" statement. lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n") if len(lines) < 3 { // Should never happen. - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", tableSchema.TableDefinitions[0].Schema) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", + tableSchema.TableDefinitions[0].Schema) } var modified []string modified = append(modified, strings.Replace(lines[0], sourceTableName, targetTableName, 1)) for i := range sourceVindexColumns { line, err := generateColDef(lines, sourceVindexColumns[i], vindexFromCols[i]) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } modified = append(modified, line) } @@ -4068,7 +4104,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str createDDL = strings.Join(modified, "\n") // Confirm that our DDL is valid before we create anything. if _, err = s.env.Parser().ParseStrictDDL(createDDL); err != nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", err, createDDL) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", + err, createDDL) } // Generate vreplication query. @@ -4103,6 +4140,11 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } materializeQuery = buf.String() + cancelFunc = func() error { + // Restore the original target vschema. + return s.ts.SaveVSchema(ctx, targetKeyspace, targetVSchema) + } + // Update targetVSchema. targetTable := specs.Tables[targetTableName] if targetVSchema.Sharded { @@ -4118,7 +4160,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str if targetVindexType == "" { targetVindexType, err = vindexes.ChooseVindexForType(field.Type) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } } targetVindex = &vschemapb.Vindex{ @@ -4129,11 +4171,15 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if targetVindex == nil { // Unreachable. We validated column names when generating the DDL. - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", + sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) } if existing, ok := targetVSchema.Vindexes[targetVindexType]; ok { if !proto.Equal(existing, targetVindex) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", targetVindexType, targetKeyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", + targetVindexType, targetKeyspace) } } else { targetVSchema.Vindexes[targetVindexType] = targetVindex @@ -4150,7 +4196,9 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if existing, ok := targetVSchema.Tables[targetTableName]; ok { if !proto.Equal(existing, targetTable) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", targetTableName, targetKeyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", + targetTableName, targetKeyspace) } } else { targetVSchema.Tables[targetTableName] = targetTable @@ -4173,7 +4221,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str sourceVSchema.Vindexes[vindexName] = vindex sourceVSchemaTable.ColumnVindexes = append(sourceVSchemaTable.ColumnVindexes, sourceTable.ColumnVindexes[0]) - return ms, sourceVSchema, targetVSchema, nil + return ms, sourceVSchema, targetVSchema, cancelFunc, nil } func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) { From c62d0d27ff5d27cf1b26df57d6caaf1113681125 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 20 Sep 2024 17:43:45 -0400 Subject: [PATCH 2/9] Add unit test Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/framework_test.go | 26 ++++- go/vt/vtctl/workflow/materializer_env_test.go | 39 +++++-- go/vt/vtctl/workflow/materializer_test.go | 103 +++++++++++++++--- go/vt/vtctl/workflow/server.go | 7 +- 4 files changed, 142 insertions(+), 33 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index f3bf5869ab1..bb18ba0cff1 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -267,7 +267,7 @@ type testTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult - createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest primaryPositions map[uint32]string vdiffRequests map[uint32]*vdiffRequestResponse @@ -289,7 +289,7 @@ func newTestTMClient(env *testEnv) *testTMClient { return &testTMClient{ schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), vrQueries: make(map[int][]*queryResult), - createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), @@ -304,9 +304,12 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet defer tmc.mu.Unlock() if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { - if !proto.Equal(expect, req) { + if !proto.Equal(expect.req, req) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect) } + if expect.res != nil { + return expect.res, expect.err + } } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil @@ -418,13 +421,22 @@ func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, q } } -func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { +func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() tmc.createVReplicationWorkflowRequests[tabletID] = req } +func (tmc *testTMClient) expectCreateVReplicationWorkflowRequestOnTargetTablets(req *createVReplicationWorkflowRequestResponse) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + for _, tablet := range tmc.env.tablets[tmc.env.targetKeyspace.KeyspaceName] { + tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid] = req + } +} + func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { tmc.mu.Lock() defer tmc.mu.Unlock() @@ -479,6 +491,12 @@ type vdiffRequestResponse struct { err error } +type createVReplicationWorkflowRequestResponse struct { + req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest + res *tabletmanagerdatapb.CreateVReplicationWorkflowResponse + err error +} + func (tmc *testTMClient) expectVDiffRequest(tablet *topodatapb.Tablet, vrr *vdiffRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index aada59c244d..349450a6a04 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" @@ -120,16 +121,33 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M if err == nil { tableName = table.Name.String() } + stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl) + require.NoError(t, err) + ddl, ok := stmt.(*sqlparser.CreateTable) + require.True(t, ok) + cols := make([]string, len(ddl.TableSpec.Columns)) + fields := make([]*querypb.Field, len(ddl.TableSpec.Columns)) + for i, col := range ddl.TableSpec.Columns { + cols[i] = col.Name.String() + fields[i] = &querypb.Field{ + Name: col.Name.String(), + Type: col.Type.SQLType(), + } + } env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Name: tableName, - Schema: fmt.Sprintf("%s_schema", tableName), + Name: tableName, + Schema: ts.CreateDdl, + Columns: cols, + Fields: fields, }}, } env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Name: ts.TargetTable, - Schema: fmt.Sprintf("%s_schema", ts.TargetTable), + Name: ts.TargetTable, + Schema: ts.CreateDdl, + Columns: cols, + Fields: fields, }}, } } @@ -199,7 +217,7 @@ type testMaterializerTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult - createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse // Used to confirm the number of times WorkflowDelete was called. workflowDeleteCalls int @@ -215,15 +233,18 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe sourceShards: sourceShards, tableSettings: tableSettings, vrQueries: make(map[int][]*queryResult), - createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), } } func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { - if !proto.Equal(expect, request) { + if expect.req != nil && !proto.Equal(expect.req, request) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) } + if expect.res != nil { + return expect.res, expect.err + } } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil @@ -315,7 +336,7 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r }) } -func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { +func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() @@ -344,7 +365,7 @@ func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, table qrs := tmc.vrQueries[int(tablet.Alias.Uid)] if len(qrs) == 0 { - return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query) } matched := false if qrs[0].query[0] == '/' { diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 84119735dee..5d64be43cb0 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -18,6 +18,7 @@ package workflow import ( "context" + "errors" "fmt" "slices" "strings" @@ -2111,6 +2112,16 @@ func TestCreateLookupVindexFailures(t *testing.T) { SourceKeyspace: "sourceks", // Keyspace where the lookup table and VReplication workflow is created. TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "t1", + CreateDdl: "CREATE TABLE `t1` (\n`c1` INT,\n PRIMARY KEY(`c1`)\n)", + }, + { + TargetTable: "t2", + CreateDdl: "CREATE TABLE `t2` (\n`c2` INT,\n PRIMARY KEY(`c2`)\n)", + }, + }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -2122,7 +2133,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2135,10 +2146,10 @@ func TestCreateLookupVindexFailures(t *testing.T) { "xxhash": { Type: "xxhash", }, - "v": { + "v1": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", "write_only": "true", @@ -2148,19 +2159,28 @@ func TestCreateLookupVindexFailures(t *testing.T) { Tables: map[string]*vschemapb.Table{ "t1": { ColumnVindexes: []*vschemapb.ColumnVindex{{ - Name: "v", + Name: "v1", Column: "c1", }}, }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v2", + Column: "c2", + }}, + }, }, } - err := env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs) + err := env.topoServ.SaveVSchema(ctx, ms.SourceKeyspace, vs) + require.NoError(t, err) + err = env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs) require.NoError(t, err) testcases := []struct { - description string - input *vschemapb.Keyspace - err string + description string + input *vschemapb.Keyspace + createRequest *createVReplicationWorkflowRequestResponse + err string }{ { description: "dup vindex", @@ -2208,7 +2228,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1,c2", "to": "c3", }, @@ -2224,7 +2244,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2240,7 +2260,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_noexist", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1,c2", "to": "c2", }, @@ -2264,7 +2284,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2324,7 +2344,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2377,7 +2397,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "xxhash": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2393,7 +2413,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - err: "a conflicting vindex named xxhash already exists in the targetks keyspace", + err: fmt.Sprintf("a conflicting vindex named xxhash already exists in the %s keyspace", ms.TargetKeyspace), }, { description: "source table not in vschema", @@ -2408,7 +2428,37 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - err: "table other not found in the targetks keyspace", + err: fmt.Sprintf("table other not found in the %s keyspace", ms.TargetKeyspace), + }, + { + description: "workflow name already exists", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v2": { + Type: "consistent_lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.t1_lkp", ms.TargetKeyspace), + "from": "c1", + "to": "keyspace_id", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v2", + Column: "c2", + }}, + }, + }, + }, + //vrQuery: "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", + createRequest: &createVReplicationWorkflowRequestResponse{ + req: nil, + res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{}, + err: errors.New("we gots us an error"), + }, + err: "we gots us an error", }, } for _, tcase := range testcases { @@ -2418,10 +2468,26 @@ func TestCreateLookupVindexFailures(t *testing.T) { Keyspace: ms.TargetKeyspace, Vindex: tcase.input, } + if tcase.createRequest != nil { + for _, tablet := range env.tablets { + if tablet.Keyspace == ms.TargetKeyspace { + env.tmc.expectVRQuery(int(tablet.Alias.Uid), "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", &sqltypes.Result{}) + env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tcase.createRequest) + } + } + } _, err := env.ws.LookupVindexCreate(ctx, req) if !strings.Contains(err.Error(), tcase.err) { t.Errorf("CreateLookupVindex(%s) err: %v, must contain %v", tcase.description, err, tcase.err) } + // Confirm that the original vschema where the vindex would + // be created is still in place -- since the workflow + // creation failed in each test case. That vindex is created + // in the source keyspace based on the MaterializeSettings + // definition. + cvs, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) + require.NoError(t, err) + require.True(t, proto.Equal(vs, cvs), "expected: %+v, got: %+v", vs, cvs) }) } } @@ -2696,7 +2762,10 @@ func TestKeyRangesEqualOptimization(t *testing.T) { if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) { continue } - env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tc.wantReqs[tablet.Alias.Uid]) + reqRes := &createVReplicationWorkflowRequestResponse{ + req: tc.wantReqs[tablet.Alias.Uid], + } + env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, reqRes) } mz := &materializer{ diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index d35b281dba6..ee345be8137 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3967,8 +3967,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // Validate input table and vindex consistency. if sourceTable == nil || len(sourceTable.ColumnVindexes) != 1 { return nil, nil, nil, nil, - vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table in the %s keyspace", - keyspace) + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table (%s) in the %s keyspace", + sourceTable, keyspace) } if sourceTable.ColumnVindexes[0].Name != vindexName { return nil, nil, nil, nil, @@ -4146,9 +4146,10 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } materializeQuery = buf.String() + tc := targetVSchema.CloneVT() cancelFunc = func() error { // Restore the original target vschema. - return s.ts.SaveVSchema(ctx, targetKeyspace, targetVSchema) + return s.ts.SaveVSchema(ctx, targetKeyspace, tc) } // Update targetVSchema. From c99eb2c5adc6537ff395ef080478f0f089fbf036 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 21 Sep 2024 19:57:06 -0400 Subject: [PATCH 3/9] Improvements Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/framework_test.go | 4 ++-- go/vt/vtctl/workflow/materializer.go | 23 +++++++++++++------- go/vt/vtctl/workflow/materializer_test.go | 26 +++++++++++++---------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index bb18ba0cff1..ecceaa4b41c 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -304,7 +304,7 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet defer tmc.mu.Unlock() if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { - if !proto.Equal(expect.req, req) { + if expect.req != nil && !proto.Equal(expect.req, req) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect) } if expect.res != nil { @@ -443,7 +443,7 @@ func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatap qrs := tmc.vrQueries[int(tablet.Alias.Uid)] if len(qrs) == 0 { - return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query) } matched := false if qrs[0].query[0] == '/' { diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index ea8b75c41c8..3378f9e9975 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -45,6 +45,7 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -113,13 +114,11 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe if err := validateNewWorkflow(mz.ctx, mz.ts, mz.tmc, mz.ms.TargetKeyspace, mz.ms.Workflow); err != nil { return err } + err := mz.buildMaterializer() if err != nil { return err } - if err := mz.deploySchema(); err != nil { - return err - } var workflowSubType binlogdatapb.VReplicationWorkflowSubType workflowSubType, err = mz.getWorkflowSubType() @@ -133,6 +132,10 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe } req.Options = optionsJSON + if err := mz.deploySchema(); err != nil { + return err + } + return mz.forAllTargets(func(target *topo.ShardInfo) error { targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias) if err != nil { @@ -268,15 +271,15 @@ func (mz *materializer) deploySchema() error { return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { allTables := []string{"/.*/"} - hasTargetTable := map[string]bool{} req := &tabletmanagerdatapb.GetSchemaRequest{Tables: allTables} targetSchema, err := schematools.GetSchema(mz.ctx, mz.ts, mz.tmc, target.PrimaryAlias, req) if err != nil { return err } + hasTargetTable := make(map[string]*tabletmanagerdatapb.TableDefinition, len(targetSchema.TableDefinitions)) for _, td := range targetSchema.TableDefinitions { - hasTargetTable[td.Name] = true + hasTargetTable[td.Name] = td } targetTablet, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias) @@ -286,12 +289,16 @@ func (mz *materializer) deploySchema() error { var applyDDLs []string for _, ts := range mz.ms.TableSettings { - if hasTargetTable[ts.TargetTable] { - // Table already exists. + if td := hasTargetTable[ts.TargetTable]; td != nil { + // Table already exists. Let's be sure that it doesn't already have data. + if td.RowCount > 0 { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "target table %s exists in the %s keyspace and is not empty", td.Name, target.Keyspace()) + } continue } if ts.CreateDdl == "" { - return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable) + return fmt.Errorf("target table %s does not exist and there is no create ddl defined", ts.TargetTable) } var err error diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 5d64be43cb0..a7fc71072d7 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1655,7 +1655,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { t.Fatal(err) } - _, _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, _, got, cancelFunc, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) @@ -1663,6 +1663,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { continue } require.NoError(t, err) + require.NotNil(t, cancelFunc) utils.MustMatch(t, tcase.out, got, tcase.description) } } @@ -2177,10 +2178,11 @@ func TestCreateLookupVindexFailures(t *testing.T) { require.NoError(t, err) testcases := []struct { - description string - input *vschemapb.Keyspace - createRequest *createVReplicationWorkflowRequestResponse - err string + description string + input *vschemapb.Keyspace + createRequest *createVReplicationWorkflowRequestResponse + vrepExecQueries []string + err string }{ { description: "dup vindex", @@ -2431,7 +2433,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { err: fmt.Sprintf("table other not found in the %s keyspace", ms.TargetKeyspace), }, { - description: "workflow name already exists", + description: "workflow creation error", input: &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ "v2": { @@ -2452,7 +2454,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - //vrQuery: "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", + vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"}, createRequest: &createVReplicationWorkflowRequestResponse{ req: nil, res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{}, @@ -2468,10 +2470,12 @@ func TestCreateLookupVindexFailures(t *testing.T) { Keyspace: ms.TargetKeyspace, Vindex: tcase.input, } - if tcase.createRequest != nil { - for _, tablet := range env.tablets { - if tablet.Keyspace == ms.TargetKeyspace { - env.tmc.expectVRQuery(int(tablet.Alias.Uid), "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", &sqltypes.Result{}) + for _, tablet := range env.tablets { + if tablet.Keyspace == ms.TargetKeyspace { + for _, vrq := range tcase.vrepExecQueries { + env.tmc.expectVRQuery(int(tablet.Alias.Uid), vrq, &sqltypes.Result{}) + } + if tcase.createRequest != nil { env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tcase.createRequest) } } From 7dd67a4e89f76c1cef4f296750abff328b06dafb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 21 Sep 2024 20:53:34 -0400 Subject: [PATCH 4/9] Add unit test for non-empty existing table Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_test.go | 68 ++++++++++++++++++++--- 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index a7fc71072d7..7b23c1e1359 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -2109,9 +2109,8 @@ func TestStopAfterCopyFlag(t *testing.T) { func TestCreateLookupVindexFailures(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ - // Keyspace where the vindex is created. - SourceKeyspace: "sourceks", - // Keyspace where the lookup table and VReplication workflow is created. + SourceKeyspace: "sourceks", // Not used + // Keyspace where the lookup table, vindex, and VReplication workflow is created. TargetKeyspace: "targetks", TableSettings: []*vtctldatapb.TableMaterializeSettings{ { @@ -2122,6 +2121,10 @@ func TestCreateLookupVindexFailures(t *testing.T) { TargetTable: "t2", CreateDdl: "CREATE TABLE `t2` (\n`c2` INT,\n PRIMARY KEY(`c2`)\n)", }, + { + TargetTable: "t3", + CreateDdl: "CREATE TABLE `t3` (\n`c3` INT,\n PRIMARY KEY(`c3`)\n)", + }, }, } ctx, cancel := context.WithCancel(context.Background()) @@ -2172,9 +2175,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, } - err := env.topoServ.SaveVSchema(ctx, ms.SourceKeyspace, vs) - require.NoError(t, err) - err = env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs) + err := env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs) require.NoError(t, err) testcases := []struct { @@ -2182,6 +2183,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { input *vschemapb.Keyspace createRequest *createVReplicationWorkflowRequestResponse vrepExecQueries []string + schemaAdditions []*tabletmanagerdatapb.TableDefinition err string }{ { @@ -2462,6 +2464,46 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, err: "we gots us an error", }, + { + description: "table exists and has data", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v2": { + Type: "consistent_lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.t1_lkp", ms.TargetKeyspace), + "from": "c1", + "to": "keyspace_id", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v2", + Column: "c2", + }}, + }, + }, + }, + schemaAdditions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "t1_lkp", + Schema: "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", + Columns: []string{"c1", "keyspace_id"}, + Fields: []*querypb.Field{ + { + Name: "c1", + Type: sqltypes.Int32, + }, + { + Name: "keyspace_id", + Type: sqltypes.VarBinary, + }, + }, + RowCount: 1, + }}, + err: fmt.Sprintf("target table t1_lkp exists in the %s keyspace and is not empty", ms.TargetKeyspace), + }, } for _, tcase := range testcases { t.Run(tcase.description, func(t *testing.T) { @@ -2470,6 +2512,18 @@ func TestCreateLookupVindexFailures(t *testing.T) { Keyspace: ms.TargetKeyspace, Vindex: tcase.input, } + if len(tcase.schemaAdditions) > 0 { + ogs := env.tmc.schema + defer func() { + env.tmc.schema = ogs + }() + // The tables are created in the target keyspace. + for _, tbl := range tcase.schemaAdditions { + env.tmc.schema[ms.TargetKeyspace+"."+tbl.Name] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{tbl}, + } + } + } for _, tablet := range env.tablets { if tablet.Keyspace == ms.TargetKeyspace { for _, vrq := range tcase.vrepExecQueries { @@ -2487,7 +2541,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { // Confirm that the original vschema where the vindex would // be created is still in place -- since the workflow // creation failed in each test case. That vindex is created - // in the source keyspace based on the MaterializeSettings + // in the target keyspace based on the MaterializeSettings // definition. cvs, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) require.NoError(t, err) From 8b880ae5f4079355a51d891429b773561f3d0a29 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 22 Sep 2024 00:29:15 -0400 Subject: [PATCH 5/9] Unit test fixes Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_env_test.go | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 349450a6a04..f6c578d45d8 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -121,17 +121,23 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M if err == nil { tableName = table.Name.String() } - stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl) - require.NoError(t, err) - ddl, ok := stmt.(*sqlparser.CreateTable) - require.True(t, ok) - cols := make([]string, len(ddl.TableSpec.Columns)) - fields := make([]*querypb.Field, len(ddl.TableSpec.Columns)) - for i, col := range ddl.TableSpec.Columns { - cols[i] = col.Name.String() - fields[i] = &querypb.Field{ - Name: col.Name.String(), - Type: col.Type.SQLType(), + var ( + cols []string + fields []*querypb.Field + ) + if ts.CreateDdl != "" { + stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl) + require.NoError(t, err) + ddl, ok := stmt.(*sqlparser.CreateTable) + require.True(t, ok) + cols = make([]string, len(ddl.TableSpec.Columns)) + fields = make([]*querypb.Field, len(ddl.TableSpec.Columns)) + for i, col := range ddl.TableSpec.Columns { + cols[i] = col.Name.String() + fields[i] = &querypb.Field{ + Name: col.Name.String(), + Type: col.Type.SQLType(), + } } } env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{ From 0367d052dbc3caf9aacd7a0a9737c916cbd9a2e7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 22 Sep 2024 02:06:09 -0400 Subject: [PATCH 6/9] Exempt multi-tenant migrations from empty table check Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 4 +++- go/vt/vtctl/workflow/server.go | 16 +++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 3378f9e9975..a00d0894c28 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -291,7 +291,9 @@ func (mz *materializer) deploySchema() error { for _, ts := range mz.ms.TableSettings { if td := hasTargetTable[ts.TargetTable]; td != nil { // Table already exists. Let's be sure that it doesn't already have data. - if td.RowCount > 0 { + // We exclude multi-tenant migrations as the target tables are expected + // to frequently have data from previously migrated tenants. + if !mz.IsMultiTenantMigration() && td.RowCount > 0 { return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "target table %s exists in the %s keyspace and is not empty", td.Name, target.Keyspace()) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index ee345be8137..ff96e881e99 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -4146,11 +4146,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } materializeQuery = buf.String() - tc := targetVSchema.CloneVT() - cancelFunc = func() error { - // Restore the original target vschema. - return s.ts.SaveVSchema(ctx, targetKeyspace, tc) - } + ogTargetVSchema := targetVSchema.CloneVT() + targetChanged := false // Update targetVSchema. targetTable := specs.Tables[targetTableName] @@ -4190,6 +4187,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } } else { targetVSchema.Vindexes[targetVindexType] = targetVindex + targetChanged = true } targetTable = &vschemapb.Table{ @@ -4209,6 +4207,14 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } } else { targetVSchema.Tables[targetTableName] = targetTable + targetChanged = true + } + + if targetChanged { + cancelFunc = func() error { + // Restore the original target vschema. + return s.ts.SaveVSchema(ctx, targetKeyspace, ogTargetVSchema) + } } ms = &vtctldatapb.MaterializeSettings{ From 32c29e1adf619dc4a6a23d0941ea835c75f665b3 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 22 Sep 2024 02:17:09 -0400 Subject: [PATCH 7/9] Update unit tests Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 4 ++-- go/vt/vtctl/workflow/materializer_test.go | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index a00d0894c28..b4a135670e3 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -291,8 +291,8 @@ func (mz *materializer) deploySchema() error { for _, ts := range mz.ms.TableSettings { if td := hasTargetTable[ts.TargetTable]; td != nil { // Table already exists. Let's be sure that it doesn't already have data. - // We exclude multi-tenant migrations as the target tables are expected - // to frequently have data from previously migrated tenants. + // We exclude multi-tenant migrations from this check as the target tables + // are expected to frequently have data from previously migrated tenants. if !mz.IsMultiTenantMigration() && td.RowCount > 0 { return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "target table %s exists in the %s keyspace and is not empty", td.Name, target.Keyspace()) diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 7b23c1e1359..419e702c454 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1175,13 +1175,16 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { setStartingVschema() }() } - outms, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) + outms, _, _, cancelFunc, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) if tcase.err != "" { require.Error(t, err) require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err) return } require.NoError(t, err) + // All of these test cases create a table and thus change the target + // vschema. + require.NotNil(t, cancelFunc) want := strings.Split(tcase.out, "\n") got := strings.Split(outms.TableSettings[0].CreateDdl, "\n") require.Equal(t, want, got, tcase.description) @@ -1663,7 +1666,9 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { continue } require.NoError(t, err) - require.NotNil(t, cancelFunc) + // withTable is a vschema that already contains the table and thus + // we make not vschema changes and there's nothing to cancel. + require.True(t, (cancelFunc != nil) == (tcase.targetVSchema != withTable)) utils.MustMatch(t, tcase.out, got, tcase.description) } } From 5bd4b7eae991e8d9fe6a1a8d6e2ae5b4646207a7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 22 Sep 2024 12:38:03 -0400 Subject: [PATCH 8/9] Minor changes from self review Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_env_test.go | 11 +++++++++++ go/vt/vtctl/workflow/materializer_test.go | 4 ++-- go/vt/vtctl/workflow/server.go | 5 ++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index f6c578d45d8..8b407dbcb0c 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -244,6 +244,8 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe } func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { if expect.req != nil && !proto.Equal(expect.req, request) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) @@ -257,6 +259,9 @@ func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Cont } func (tmc *testMaterializerTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + if tmc.readVReplicationWorkflow != nil { return tmc.readVReplicationWorkflow(ctx, tablet, request) } @@ -310,6 +315,9 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont } func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} for _, table := range request.Tables { if table == "/.*/" { @@ -430,6 +438,9 @@ func (tmc *testMaterializerTMClient) HasVReplicationWorkflows(ctx context.Contex } func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables if len(req.IncludeWorkflows) > 0 { for _, wf := range req.IncludeWorkflows { diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 419e702c454..30c8c24c987 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1667,7 +1667,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { } require.NoError(t, err) // withTable is a vschema that already contains the table and thus - // we make not vschema changes and there's nothing to cancel. + // we don't make any vschema changes and there's nothing to cancel. require.True(t, (cancelFunc != nil) == (tcase.targetVSchema != withTable)) utils.MustMatch(t, tcase.out, got, tcase.description) } @@ -2463,7 +2463,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"}, createRequest: &createVReplicationWorkflowRequestResponse{ - req: nil, + req: nil, // We don't care about defining it in this case res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{}, err: errors.New("we gots us an error"), }, diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index ff96e881e99..1674055ca4f 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1139,7 +1139,8 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup } if err := s.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", + targetVSchema, ms.TargetKeyspace) } ms.TabletTypes = topoproto.MakeStringTypeCSV(req.TabletTypes) ms.TabletSelectionPreference = req.TabletSelectionPreference @@ -4146,6 +4147,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } materializeQuery = buf.String() + // Save a copy of the original vschema if we modify it and need to provide + // a cancelFunc. ogTargetVSchema := targetVSchema.CloneVT() targetChanged := false From 345115e9597558dbd4e4079992a0e6e7e2a5214e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 25 Sep 2024 19:41:50 -0400 Subject: [PATCH 9/9] Remove existing data check related code And defer that work to https://github.com/vitessio/vitess/pull/16826 Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 15 +++------ go/vt/vtctl/workflow/materializer_test.go | 40 ----------------------- 2 files changed, 4 insertions(+), 51 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 4c51f9f74a2..9b8251257a8 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -45,7 +45,6 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -284,15 +283,15 @@ func (mz *materializer) deploySchema() error { return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { allTables := []string{"/.*/"} + hasTargetTable := map[string]bool{} req := &tabletmanagerdatapb.GetSchemaRequest{Tables: allTables} targetSchema, err := schematools.GetSchema(mz.ctx, mz.ts, mz.tmc, target.PrimaryAlias, req) if err != nil { return err } - hasTargetTable := make(map[string]*tabletmanagerdatapb.TableDefinition, len(targetSchema.TableDefinitions)) for _, td := range targetSchema.TableDefinitions { - hasTargetTable[td.Name] = td + hasTargetTable[td.Name] = true } targetTablet, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias) @@ -302,14 +301,8 @@ func (mz *materializer) deploySchema() error { var applyDDLs []string for _, ts := range mz.ms.TableSettings { - if td := hasTargetTable[ts.TargetTable]; td != nil { - // Table already exists. Let's be sure that it doesn't already have data. - // We exclude multi-tenant migrations from this check as the target tables - // are expected to frequently have data from previously migrated tenants. - if !mz.IsMultiTenantMigration() && td.RowCount > 0 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, - "target table %s exists in the %s keyspace and is not empty", td.Name, target.Keyspace()) - } + if hasTargetTable[ts.TargetTable] { + // Table already exists. continue } if ts.CreateDdl == "" { diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 30c8c24c987..8d3335041a3 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -2469,46 +2469,6 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, err: "we gots us an error", }, - { - description: "table exists and has data", - input: &vschemapb.Keyspace{ - Vindexes: map[string]*vschemapb.Vindex{ - "v2": { - Type: "consistent_lookup_unique", - Params: map[string]string{ - "table": fmt.Sprintf("%s.t1_lkp", ms.TargetKeyspace), - "from": "c1", - "to": "keyspace_id", - }, - }, - }, - Tables: map[string]*vschemapb.Table{ - "t2": { - ColumnVindexes: []*vschemapb.ColumnVindex{{ - Name: "v2", - Column: "c2", - }}, - }, - }, - }, - schemaAdditions: []*tabletmanagerdatapb.TableDefinition{{ - Name: "t1_lkp", - Schema: "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", - Columns: []string{"c1", "keyspace_id"}, - Fields: []*querypb.Field{ - { - Name: "c1", - Type: sqltypes.Int32, - }, - { - Name: "keyspace_id", - Type: sqltypes.VarBinary, - }, - }, - RowCount: 1, - }}, - err: fmt.Sprintf("target table t1_lkp exists in the %s keyspace and is not empty", ms.TargetKeyspace), - }, } for _, tcase := range testcases { t.Run(tcase.description, func(t *testing.T) {