From bcd398a4c89011f62ef3b3fe14b9227095abf038 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 19 Sep 2024 12:29:00 -0400 Subject: [PATCH] Undo vschema changes on workflow creation fail Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_test.go | 16 +-- go/vt/vtctl/workflow/server.go | 132 +++++++++++++++------- 2 files changed, 98 insertions(+), 50 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 763dd7c04d3..84119735dee 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -1174,7 +1174,7 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { setStartingVschema() }() } - outms, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) + outms, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) if tcase.err != "" { require.Error(t, err) require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err) @@ -1419,7 +1419,7 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, tcase.out) { t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out) @@ -1654,7 +1654,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { t.Fatal(err) } - _, _, got, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) @@ -1772,7 +1772,7 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("same keyspace: got:\n%v, want\n%v", got, want) @@ -1898,7 +1898,7 @@ func TestCreateCustomizedVindex(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("customize create lookup error same: got:\n%v, want\n%v", got, want) @@ -2016,7 +2016,7 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { t.Fatal(err) } - ms, ks, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + ms, ks, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(wantKs, ks) { t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) @@ -2096,11 +2096,11 @@ func TestStopAfterCopyFlag(t *testing.T) { t.Fatal(err) } - ms1, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + ms1, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) require.Equal(t, ms1.StopAfterCopy, true) - ms2, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) + ms2, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) require.NoError(t, err) require.Equal(t, ms2.StopAfterCopy, false) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 88365bd54ef..f53ceb5c113 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1127,22 +1127,32 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup span.Annotate("cells", req.Cells) span.Annotate("tablet_types", req.TabletTypes) - ms, sourceVSchema, targetVSchema, err := s.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) + ms, sourceVSchema, targetVSchema, cancelFunc, err := s.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) if err != nil { return nil, err } + if err := s.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { return nil, err } - ms.TabletTypes = topoproto.MakeStringTypeCSV(req.TabletTypes) ms.TabletSelectionPreference = req.TabletSelectionPreference if err := s.Materialize(ctx, ms); err != nil { + if cancelFunc != nil { + if cerr := cancelFunc(); cerr != nil { + err = vterrors.Wrapf(err, "failed to restore original vschema '%v' in the %s keyspace: %v", + targetVSchema, ms.TargetKeyspace, cerr) + } + } return nil, err } - if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil { - return nil, err + if ms.SourceKeyspace != ms.TargetKeyspace { + if err := s.ts.SaveVSchema(ctx, ms.SourceKeyspace, sourceVSchema); err != nil { + return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", + sourceVSchema, ms.SourceKeyspace) + } } + if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -3836,7 +3846,8 @@ func fillStringTemplate(tmpl string, vars any) (string, error) { // prepareCreateLookup performs the preparatory steps for creating a // Lookup Vindex. -func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) { +func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( + ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { // Important variables are pulled out here. var ( vindexName string @@ -3862,19 +3873,19 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // Validate input vindex. if specs == nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") } if len(specs.Vindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") } vindexName = maps.Keys(specs.Vindexes)[0] vindex = maps.Values(specs.Vindexes)[0] if !strings.Contains(vindex.Type, "lookup") { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) } targetKeyspace, targetTableName, err = s.env.Parser().ParseTable(vindex.Params["table"]) if err != nil || targetKeyspace == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .", vindex.Params["table"]) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .
", vindex.Params["table"]) } vindexFromCols = strings.Split(vindex.Params["from"], ",") for i, col := range vindexFromCols { @@ -3882,11 +3893,11 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if strings.Contains(vindex.Type, "unique") { if len(vindexFromCols) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") } } else { if len(vindexFromCols) < 2 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "non-unique vindex 'from' should have more than one column") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "non-unique vindex 'from' should have more than one column") } } vindexToCol = vindex.Params["to"] @@ -3895,7 +3906,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str vindex.Params["write_only"] = "true" // See if we can create the vindex without errors. if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { // This mirrors the behavior of vindexes.boolFromMap(). @@ -3905,19 +3916,22 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str case "false": vindexIgnoreNulls = false default: - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", - ignoreNullsStr) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", + ignoreNullsStr) } } // Validate input table. if len(specs.Tables) < 1 || len(specs.Tables) > 2 { - return nil, nil, nil, fmt.Errorf("one or two tables must be specified") + return nil, nil, nil, nil, fmt.Errorf("one or two tables must be specified") } // Loop executes once or twice. for tableName, table := range specs.Tables { if len(table.ColumnVindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", tableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", + tableName) } if tableName != targetTableName { // This is the source table. sourceTableName = tableName @@ -3931,42 +3945,55 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str vindexCols = table.ColumnVindexes[0].Columns } else { if table.ColumnVindexes[0].Column == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", tableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + tableName) } vindexCols = []string{table.ColumnVindexes[0].Column} } if !slices.Equal(vindexCols, vindexFromCols) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", - tableName, strings.Join(vindexCols, ","), strings.Join(vindexFromCols, ",")) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", + tableName, strings.Join(vindexCols, ","), strings.Join(vindexFromCols, ",")) } } // Validate input table and vindex consistency. if sourceTable == nil || len(sourceTable.ColumnVindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table in the %s keyspace", keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table in the %s keyspace", + keyspace) } if sourceTable.ColumnVindexes[0].Name != vindexName { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", sourceTable.ColumnVindexes[0].Name, vindexName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", + sourceTable.ColumnVindexes[0].Name, vindexName) } if vindex.Owner != "" && vindex.Owner != sourceTableName { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", vindex.Owner, sourceTableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", + vindex.Owner, sourceTableName) } if len(sourceTable.ColumnVindexes[0].Columns) != 0 { sourceVindexColumns = sourceTable.ColumnVindexes[0].Columns } else { if sourceTable.ColumnVindexes[0].Column == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", sourceTableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + sourceTableName) } sourceVindexColumns = []string{sourceTable.ColumnVindexes[0].Column} } if len(sourceVindexColumns) != len(vindexFromCols) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", len(sourceVindexColumns), len(vindexFromCols)) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", + len(sourceVindexColumns), len(vindexFromCols)) } // Validate against source vschema. sourceVSchema, err = s.ts.GetVSchema(ctx, keyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if sourceVSchema.Vindexes == nil { sourceVSchema.Vindexes = make(map[string]*vschemapb.Vindex) @@ -3978,7 +4005,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } else { targetVSchema, err = s.ts.GetVSchema(ctx, targetKeyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } } if targetVSchema.Vindexes == nil { @@ -3989,12 +4016,15 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if existing, ok := sourceVSchema.Vindexes[vindexName]; ok { if !proto.Equal(existing, vindex) { // If the exact same vindex already exists then we can re-use it - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", vindexName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", + vindexName, keyspace) } } sourceVSchemaTable = sourceVSchema.Tables[sourceTableName] if sourceVSchemaTable == nil && !schema.IsInternalOperationTableName(sourceTableName) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", sourceTableName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", sourceTableName, keyspace) } for _, colVindex := range sourceVSchemaTable.ColumnVindexes { // For a conflict, the vindex name and column should match. @@ -4011,41 +4041,47 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // are not the same then they are two distinct conflicting vindexes and we should // not proceed. if !slices.Equal(colNames, sourceVindexColumns) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", - strings.Join(colNames, ","), sourceTableName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", + strings.Join(colNames, ","), sourceTableName, keyspace) } } // Validate against source schema. sourceShards, err := s.ts.GetServingShards(ctx, keyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } onesource := sourceShards[0] if onesource.PrimaryAlias == nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) } req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{sourceTableName}} tableSchema, err := schematools.GetSchema(ctx, s.ts, s.tmc, onesource.PrimaryAlias, req) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if len(tableSchema.TableDefinitions) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", len(tableSchema.TableDefinitions), keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", + len(tableSchema.TableDefinitions), keyspace) } // Generate "create table" statement. lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n") if len(lines) < 3 { // Should never happen. - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", tableSchema.TableDefinitions[0].Schema) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", + tableSchema.TableDefinitions[0].Schema) } var modified []string modified = append(modified, strings.Replace(lines[0], sourceTableName, targetTableName, 1)) for i := range sourceVindexColumns { line, err := generateColDef(lines, sourceVindexColumns[i], vindexFromCols[i]) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } modified = append(modified, line) } @@ -4068,7 +4104,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str createDDL = strings.Join(modified, "\n") // Confirm that our DDL is valid before we create anything. if _, err = s.env.Parser().ParseStrictDDL(createDDL); err != nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", err, createDDL) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", + err, createDDL) } // Generate vreplication query. @@ -4103,6 +4140,11 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } materializeQuery = buf.String() + cancelFunc = func() error { + // Restore the original target vschema. + return s.ts.SaveVSchema(ctx, targetKeyspace, targetVSchema) + } + // Update targetVSchema. targetTable := specs.Tables[targetTableName] if targetVSchema.Sharded { @@ -4118,7 +4160,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str if targetVindexType == "" { targetVindexType, err = vindexes.ChooseVindexForType(field.Type) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } } targetVindex = &vschemapb.Vindex{ @@ -4129,11 +4171,15 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if targetVindex == nil { // Unreachable. We validated column names when generating the DDL. - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", + sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) } if existing, ok := targetVSchema.Vindexes[targetVindexType]; ok { if !proto.Equal(existing, targetVindex) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", targetVindexType, targetKeyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", + targetVindexType, targetKeyspace) } } else { targetVSchema.Vindexes[targetVindexType] = targetVindex @@ -4150,7 +4196,9 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if existing, ok := targetVSchema.Tables[targetTableName]; ok { if !proto.Equal(existing, targetTable) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", targetTableName, targetKeyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", + targetTableName, targetKeyspace) } } else { targetVSchema.Tables[targetTableName] = targetTable @@ -4173,7 +4221,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str sourceVSchema.Vindexes[vindexName] = vindex sourceVSchemaTable.ColumnVindexes = append(sourceVSchemaTable.ColumnVindexes, sourceTable.ColumnVindexes[0]) - return ms, sourceVSchema, targetVSchema, nil + return ms, sourceVSchema, targetVSchema, cancelFunc, nil } func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) {