From 7708be80cb7337ae94df56006d5d8259a84e5bf7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 17 Dec 2024 13:22:14 -0500 Subject: [PATCH] WiP Signed-off-by: Matt Lord --- go/cmd/vtcombo/cli/vschema_watcher.go | 10 ++-- go/vt/topo/helpers/copy.go | 4 +- go/vt/topo/srv_vschema.go | 4 +- go/vt/topo/vschema.go | 48 ++++++++++++++------ go/vt/topotools/vschema_ddl.go | 7 +-- go/vt/vtcombo/tablet_map.go | 6 ++- go/vt/vtctl/grpcvtctldserver/server.go | 44 ++++++++++-------- go/vt/vtctl/vtctl.go | 35 ++++++++------ go/vt/vtctl/workflow/materializer.go | 8 ++-- go/vt/vtctl/workflow/resharder.go | 3 +- go/vt/vtctl/workflow/server.go | 30 ++++++------ go/vt/vtctl/workflow/traffic_switcher.go | 10 ++-- go/vt/vtgate/executorcontext/vcursor_impl.go | 13 ++++-- go/vt/vtgate/vschema_manager.go | 12 ++--- go/vt/wrangler/materializer.go | 24 +++++----- go/vt/wrangler/resharder.go | 3 +- go/vt/wrangler/traffic_switcher.go | 8 ++-- 17 files changed, 152 insertions(+), 117 deletions(-) diff --git a/go/cmd/vtcombo/cli/vschema_watcher.go b/go/cmd/vtcombo/cli/vschema_watcher.go index 484c7736424..125d80891cb 100644 --- a/go/cmd/vtcombo/cli/vschema_watcher.go +++ b/go/cmd/vtcombo/cli/vschema_watcher.go @@ -56,17 +56,19 @@ func loadKeyspacesFromDir(ctx context.Context, dir string, ts *topo.Server) { log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err) } - keyspace := &vschemapb.Keyspace{} - err = json.Unmarshal(jsonData, keyspace) + keyspace := &topo.KeyspaceVSchemaInfo{ + Name: ks.Name, + } + err = json.Unmarshal(jsonData, keyspace.Keyspace) if err != nil { log.Fatalf("Unable to parse keyspace file %v: %v", ksFile, err) } - _, err = vindexes.BuildKeyspace(keyspace, env.Parser()) + _, err = vindexes.BuildKeyspace(keyspace.Keyspace, env.Parser()) if err != nil { log.Fatalf("Invalid keyspace definition: %v", err) } - ts.SaveVSchema(ctx, ks.Name, keyspace) + ts.SaveVSchema(ctx, keyspace) log.Infof("Loaded keyspace %v from %v\n", ks.Name, ksFile) } } diff --git a/go/vt/topo/helpers/copy.go b/go/vt/topo/helpers/copy.go index 6dff1c6ac22..940222768b1 100644 --- a/go/vt/topo/helpers/copy.go +++ b/go/vt/topo/helpers/copy.go @@ -58,12 +58,12 @@ func CopyKeyspaces(ctx context.Context, fromTS, toTS *topo.Server, parser *sqlpa vs, err := fromTS.GetVSchema(ctx, keyspace) switch { case err == nil: - _, err = vindexes.BuildKeyspace(vs, parser) + _, err = vindexes.BuildKeyspace(vs.Keyspace, parser) if err != nil { log.Errorf("BuildKeyspace(%v): %v", keyspace, err) break } - if err := toTS.SaveVSchema(ctx, keyspace, vs); err != nil { + if err := toTS.SaveVSchema(ctx, vs); err != nil { log.Errorf("SaveVSchema(%v): %v", keyspace, err) } case topo.IsErrType(err, topo.NoNode): diff --git a/go/vt/topo/srv_vschema.go b/go/vt/topo/srv_vschema.go index f69fca83537..ef1ba5b0d5b 100644 --- a/go/vt/topo/srv_vschema.go +++ b/go/vt/topo/srv_vschema.go @@ -174,7 +174,7 @@ func (ts *Server) RebuildSrvVSchema(ctx context.Context, cells []string) error { k, err := ts.GetVSchema(ctx, keyspace) if IsErrType(err, NoNode) { err = nil - k = &vschemapb.Keyspace{} + k = &KeyspaceVSchemaInfo{} } mu.Lock() @@ -184,7 +184,7 @@ func (ts *Server) RebuildSrvVSchema(ctx context.Context, cells []string) error { finalErr = err return } - srvVSchema.Keyspaces[keyspace] = k + srvVSchema.Keyspaces[keyspace] = k.Keyspace }(keyspace) } wg.Wait() diff --git a/go/vt/topo/vschema.go b/go/vt/topo/vschema.go index 21192e1aacb..4f0bc96200b 100644 --- a/go/vt/topo/vschema.go +++ b/go/vt/topo/vschema.go @@ -28,26 +28,37 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) +// KeyspaceVSchemaInfo is a meta struct that contains metadata to give +// the data more context and convenience. This is the main way we +// interact with a keyspace's vschema. +type KeyspaceVSchemaInfo struct { + Name string + *vschemapb.Keyspace + version Version +} + // SaveVSchema saves a Vschema. A valid Vschema should be passed in. It does not verify its correctness. // If the VSchema is empty, just remove it. -func (ts *Server) SaveVSchema(ctx context.Context, keyspace string, vschema *vschemapb.Keyspace) error { +func (ts *Server) SaveVSchema(ctx context.Context, ks *KeyspaceVSchemaInfo) error { if err := ctx.Err(); err != nil { return err } - nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile) - data, err := vschema.MarshalVT() + nodePath := path.Join(KeyspacesPath, ks.Name, VSchemaFile) + data, err := ks.MarshalVT() if err != nil { return err } - _, err = ts.globalCell.Update(ctx, nodePath, data, nil) + version, err := ts.globalCell.Update(ctx, nodePath, data, ks.version) if err != nil { - log.Errorf("failed to update vschema for keyspace %s: %v", keyspace, err) - } else { - log.Infof("successfully updated vschema for keyspace %s: %+v", keyspace, vschema) + ve := vterrors.Wrapf(err, "failed to update vschema for keyspace %s", ks.Name) + log.Error(ve) + return ve } - return err + ks.version = version + log.Infof("successfully updated vschema for keyspace %s: %+v", ks.Name, ks.Keyspace) + return nil } // DeleteVSchema delete the keyspace if it exists @@ -61,13 +72,13 @@ func (ts *Server) DeleteVSchema(ctx context.Context, keyspace string) error { } // GetVSchema fetches the vschema from the topo. -func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Keyspace, error) { +func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*KeyspaceVSchemaInfo, error) { if err := ctx.Err(); err != nil { return nil, err } nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile) - data, _, err := ts.globalCell.Get(ctx, nodePath) + data, version, err := ts.globalCell.Get(ctx, nodePath) if err != nil { return nil, err } @@ -76,7 +87,11 @@ func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.K if err != nil { return nil, vterrors.Wrapf(err, "bad vschema data: %q", data) } - return &vs, nil + return &KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vs, + version: version, + }, nil } // EnsureVSchema makes sure that a vschema is present for this keyspace or creates a blank one if it is missing @@ -86,10 +101,13 @@ func (ts *Server) EnsureVSchema(ctx context.Context, keyspace string) error { log.Infof("error in getting vschema for keyspace %s: %v", keyspace, err) } if vschema == nil || IsErrType(err, NoNode) { - err = ts.SaveVSchema(ctx, keyspace, &vschemapb.Keyspace{ - Sharded: false, - Vindexes: make(map[string]*vschemapb.Vindex), - Tables: make(map[string]*vschemapb.Table), + err = ts.SaveVSchema(ctx, &KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vschemapb.Keyspace{ + Sharded: false, + Vindexes: make(map[string]*vschemapb.Vindex), + Tables: make(map[string]*vschemapb.Table), + }, }) if err != nil { log.Errorf("could not create blank vschema: %v", err) diff --git a/go/vt/topotools/vschema_ddl.go b/go/vt/topotools/vschema_ddl.go index 3c6f5bced3c..b5acfc9ca00 100644 --- a/go/vt/topotools/vschema_ddl.go +++ b/go/vt/topotools/vschema_ddl.go @@ -20,6 +20,7 @@ import ( "reflect" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -28,11 +29,7 @@ import ( // ApplyVSchemaDDL applies the given DDL statement to the vschema // keyspace definition and returns the modified keyspace object. -func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlparser.AlterVschema) (*vschemapb.Keyspace, error) { - if ks == nil { - ks = new(vschemapb.Keyspace) - } - +func ApplyVSchemaDDL(ksName string, ks *topo.KeyspaceVSchemaInfo, alterVschema *sqlparser.AlterVschema) (*topo.KeyspaceVSchemaInfo, error) { if ks.Tables == nil { ks.Tables = map[string]*vschemapb.Table{} } diff --git a/go/vt/vtcombo/tablet_map.go b/go/vt/vtcombo/tablet_map.go index fc02409bd5f..e09b357ef46 100644 --- a/go/vt/vtcombo/tablet_map.go +++ b/go/vt/vtcombo/tablet_map.go @@ -410,7 +410,11 @@ func CreateKs( if err != nil { return 0, fmt.Errorf("BuildKeyspace(%v) failed: %v", keyspace, err) } - if err := ts.SaveVSchema(ctx, keyspace, formal); err != nil { + ks := &topo.KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: formal, + } + if err := ts.SaveVSchema(ctx, ks); err != nil { return 0, fmt.Errorf("SaveVSchema(%v) failed: %v", keyspace, err) } } else { diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index c3dc22d21b4..c0b7ecbd5d4 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -338,7 +338,9 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV return nil, err } - var vs *vschemapb.Keyspace + ks := &topo.KeyspaceVSchemaInfo{ + Name: req.Keyspace, + } if req.Sql != "" { span.Annotate("sql_mode", true) @@ -355,29 +357,29 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV return nil, err } - vs, err = s.ts.GetVSchema(ctx, req.Keyspace) + ks, err = s.ts.GetVSchema(ctx, req.Keyspace) if err != nil && !topo.IsErrType(err, topo.NoNode) { err = vterrors.Wrapf(err, "GetVSchema(%s)", req.Keyspace) return nil, err } // otherwise, we keep the empty vschema object from above - vs, err = topotools.ApplyVSchemaDDL(req.Keyspace, vs, ddl) + ks, err = topotools.ApplyVSchemaDDL(req.Keyspace, ks, ddl) if err != nil { - err = vterrors.Wrapf(err, "ApplyVSchemaDDL(%s,%v,%v)", req.Keyspace, vs, ddl) + err = vterrors.Wrapf(err, "ApplyVSchemaDDL(%s,%v,%v)", req.Keyspace, ks, ddl) return nil, err } } else { // "jsonMode" span.Annotate("sql_mode", false) - vs = req.VSchema + ks.Keyspace = req.VSchema } - ksVs, err := vindexes.BuildKeyspace(vs, s.ws.SQLParser()) + ksVs, err := vindexes.BuildKeyspace(ks.Keyspace, s.ws.SQLParser()) if err != nil { err = vterrors.Wrapf(err, "BuildKeyspace(%s)", req.Keyspace) return nil, err } response := &vtctldatapb.ApplyVSchemaResponse{ - VSchema: vs, + VSchema: ks.Keyspace, UnknownVindexParams: make(map[string]*vtctldatapb.ApplyVSchemaResponse_ParamList), } @@ -409,7 +411,7 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV return response, err } - if err = s.ts.SaveVSchema(ctx, req.Keyspace, vs); err != nil { + if err = s.ts.SaveVSchema(ctx, ks); err != nil { err = vterrors.Wrapf(err, "SaveVSchema(%s, %v)", req.Keyspace, req.VSchema) return nil, err } @@ -425,7 +427,7 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV err = vterrors.Wrapf(err, "GetVSchema(%s)", req.Keyspace) return nil, err } - response.VSchema = updatedVS + response.VSchema = updatedVS.Keyspace return response, nil } @@ -934,16 +936,18 @@ func (s *VtctldServer) CreateKeyspace(ctx context.Context, req *vtctldatapb.Crea } if req.Type == topodatapb.KeyspaceType_SNAPSHOT { - var vs *vschemapb.Keyspace - vs, err = s.ts.GetVSchema(ctx, req.BaseKeyspace) + ks, err := s.ts.GetVSchema(ctx, req.BaseKeyspace) if err != nil { log.Infof("error from GetVSchema(%v) = %v", req.BaseKeyspace, err) if topo.IsErrType(err, topo.NoNode) { log.Infof("base keyspace %v does not exist; continuing with bare, unsharded vschema", req.BaseKeyspace) - vs = &vschemapb.Keyspace{ - Sharded: false, - Tables: map[string]*vschemapb.Table{}, - Vindexes: map[string]*vschemapb.Vindex{}, + ks = &topo.KeyspaceVSchemaInfo{ + Name: req.BaseKeyspace, + Keyspace: &vschemapb.Keyspace{ + Sharded: false, + Tables: map[string]*vschemapb.Table{}, + Vindexes: map[string]*vschemapb.Vindex{}, + }, } } else { return nil, err @@ -951,10 +955,10 @@ func (s *VtctldServer) CreateKeyspace(ctx context.Context, req *vtctldatapb.Crea } // SNAPSHOT keyspaces are excluded from global routing. - vs.RequireExplicitRouting = true + ks.RequireExplicitRouting = true - if err = s.ts.SaveVSchema(ctx, req.Name, vs); err != nil { - err = fmt.Errorf("SaveVSchema(%v) = %w", vs, err) + if err = s.ts.SaveVSchema(ctx, ks); err != nil { + err = fmt.Errorf("SaveVSchema(%v) = %w", ks, err) return nil, err } } @@ -2656,13 +2660,13 @@ func (s *VtctldServer) GetVSchema(ctx context.Context, req *vtctldatapb.GetVSche span.Annotate("keyspace", req.Keyspace) - vschema, err := s.ts.GetVSchema(ctx, req.Keyspace) + ks, err := s.ts.GetVSchema(ctx, req.Keyspace) if err != nil { return nil, err } return &vtctldatapb.GetVSchemaResponse{ - VSchema: vschema, + VSchema: ks.Keyspace, }, nil } diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 1a4735b1c82..a35c841fc8c 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -1890,11 +1890,14 @@ func commandCreateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags if err != nil { wr.Logger().Infof("error from GetVSchema for base_keyspace: %v, %v", *baseKeyspace, err) if topo.IsErrType(err, topo.NoNode) { - vs = &vschemapb.Keyspace{ - Sharded: false, - Tables: make(map[string]*vschemapb.Table), - Vindexes: make(map[string]*vschemapb.Vindex), - RequireExplicitRouting: true, + vs = &topo.KeyspaceVSchemaInfo{ + Name: *baseKeyspace, + Keyspace: &vschemapb.Keyspace{ + Sharded: false, + Tables: make(map[string]*vschemapb.Table), + Vindexes: make(map[string]*vschemapb.Vindex), + RequireExplicitRouting: true, + }, } } else { return err @@ -1903,7 +1906,7 @@ func commandCreateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags // SNAPSHOT keyspaces are excluded from global routing. vs.RequireExplicitRouting = true } - if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil { + if err := wr.TopoServer().SaveVSchema(ctx, vs); err != nil { wr.Logger().Infof("error from SaveVSchema %v:%v", vs, err) return err } @@ -3343,7 +3346,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p } keyspace := subFlags.Arg(0) - var vs *vschemapb.Keyspace + var vs *topo.KeyspaceVSchemaInfo var err error sqlMode := (*sql != "") != (*sqlFile != "") @@ -3378,7 +3381,10 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p vs, err = wr.TopoServer().GetVSchema(ctx, keyspace) if err != nil { if topo.IsErrType(err, topo.NoNode) { - vs = &vschemapb.Keyspace{} + vs = &topo.KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vschemapb.Keyspace{}, + } } else { return err } @@ -3402,8 +3408,11 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p schema = []byte(*vschema) } - vs = &vschemapb.Keyspace{} - err := json2.UnmarshalPB(schema, vs) + vs = &topo.KeyspaceVSchemaInfo{ + Name: keyspace, + Keyspace: &vschemapb.Keyspace{}, + } + err := json2.UnmarshalPB(schema, vs.Keyspace) if err != nil { return err } @@ -3417,7 +3426,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p } // Validate the VSchema. - ksVs, err := vindexes.BuildKeyspace(vs, wr.SQLParser()) + ksVs, err := vindexes.BuildKeyspace(vs.Keyspace, wr.SQLParser()) if err != nil { return err } @@ -3449,11 +3458,11 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p return err } - if _, err := vindexes.BuildKeyspace(vs, wr.SQLParser()); err != nil { + if _, err := vindexes.BuildKeyspace(vs.Keyspace, wr.SQLParser()); err != nil { return err } - if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil { + if err := wr.TopoServer().SaveVSchema(ctx, vs); err != nil { return err } diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index c65a00bf614..8cd99d384ee 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -283,7 +283,7 @@ func (mz *materializer) deploySchema() error { // We do, however, allow the user to override this behavior and retain them. removeAutoInc := false updatedVSchema := false - var targetVSchema *vschemapb.Keyspace + var targetVSchema *topo.KeyspaceVSchemaInfo if mz.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && (mz.targetVSchema != nil && mz.targetVSchema.Keyspace != nil && mz.targetVSchema.Keyspace.Sharded) && (mz.ms.GetWorkflowOptions() != nil && mz.ms.GetWorkflowOptions().ShardedAutoIncrementHandling != vtctldatapb.ShardedAutoIncrementHandling_LEAVE) { @@ -472,7 +472,7 @@ func (mz *materializer) deploySchema() error { } if updatedVSchema { - return mz.ts.SaveVSchema(mz.ctx, mz.ms.TargetKeyspace, targetVSchema) + return mz.ts.SaveVSchema(mz.ctx, targetVSchema) } return nil @@ -485,7 +485,7 @@ func (mz *materializer) buildMaterializer() error { if err != nil { return err } - targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace, mz.env.Parser()) + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema.Keyspace, ms.TargetKeyspace, mz.env.Parser()) if err != nil { return err } @@ -563,7 +563,7 @@ func (mz *materializer) buildMaterializer() error { if err != nil { return fmt.Errorf("failed to get source keyspace vschema: %v", err) } - differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + differentPVs = primaryVindexesDiffer(ms, sourceVSchema.Keyspace, vschema.Keyspace) mz.targetVSchema = targetVSchema mz.sourceShards = sourceShards diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index e3f7380af69..c270a9a6f0b 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -38,7 +38,6 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -50,7 +49,7 @@ type resharder struct { sourcePrimaries map[string]*topo.TabletInfo targetShards []*topo.ShardInfo targetPrimaries map[string]*topo.TabletInfo - vschema *vschemapb.Keyspace + vschema *topo.KeyspaceVSchemaInfo refStreams map[string]*refStream // This can be single cell name or cell alias but it can // also be a comma-separated list of cells. diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index baea602b7a4..acba3ccd8a5 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -576,7 +576,7 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup return nil, err } - if err := s.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { + if err := s.ts.SaveVSchema(ctx, targetVSchema); err != nil { return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", targetVSchema, ms.TargetKeyspace) } @@ -592,7 +592,7 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup return nil, err } if ms.SourceKeyspace != ms.TargetKeyspace { - if err := s.ts.SaveVSchema(ctx, ms.SourceKeyspace, sourceVSchema); err != nil { + if err := s.ts.SaveVSchema(ctx, sourceVSchema); err != nil { return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", sourceVSchema, ms.SourceKeyspace) } @@ -687,7 +687,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L // Remove the write_only param and save the source vschema. delete(vindex.Params, "write_only") - if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVschema); err != nil { + if err := s.ts.SaveVSchema(ctx, sourceVschema); err != nil { return nil, err } return resp, s.ts.RebuildSrvVSchema(ctx, nil) @@ -800,9 +800,8 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl s.Logger().Infof("Successfully opened external topo: %+v", externalTopo) } - var vschema *vschemapb.Keyspace - var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create - vschema, err = s.ts.GetVSchema(ctx, targetKeyspace) + var origVSchema *topo.KeyspaceVSchemaInfo // If we need to rollback a failed create + vschema, err := s.ts.GetVSchema(ctx, targetKeyspace) if err != nil { return nil, err } @@ -859,11 +858,11 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if !vschema.Sharded { // Save the original in case we need to restore it for a late failure in // the defer(). - origVSchema = vschema.CloneVT() - if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil { + origVSchema.Keyspace = vschema.CloneVT() + if err := s.addTablesToVSchema(ctx, sourceKeyspace, vschema.Keyspace, tables, externalTopo == nil); err != nil { return nil, err } - if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { + if err := s.ts.SaveVSchema(ctx, vschema); err != nil { return nil, err } } @@ -964,7 +963,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if origVSchema == nil { // There's no previous version to restore return } - if cerr := s.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil { + if cerr := s.ts.SaveVSchema(ctx, origVSchema); cerr != nil { err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr) } } @@ -2310,7 +2309,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf if err != nil { return nil, err } - ts.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs, ts.sourceKeyspace, s.env.Parser()) + ts.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs.Keyspace, ts.sourceKeyspace, s.env.Parser()) if err != nil { return nil, err } @@ -3411,7 +3410,7 @@ func fillStringTemplate(tmpl string, vars any) (string, error) { // prepareCreateLookup performs the preparatory steps for creating a // Lookup Vindex. func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( - ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { + ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *topo.KeyspaceVSchemaInfo, cancelFunc func() error, err error) { // Important variables are pulled out here. var ( vindexName string @@ -3702,7 +3701,10 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // Save a copy of the original vschema if we modify it and need to provide // a cancelFunc. - ogTargetVSchema := targetVSchema.CloneVT() + ogTargetVSchema := &topo.KeyspaceVSchemaInfo{ + Name: targetKeyspace, + } + ogTargetVSchema.Keyspace = targetVSchema.CloneVT() targetChanged := false // Update targetVSchema. @@ -3769,7 +3771,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str if targetChanged { cancelFunc = func() error { // Restore the original target vschema. - return s.ts.SaveVSchema(ctx, targetKeyspace, ogTargetVSchema) + return s.ts.SaveVSchema(ctx, ogTargetVSchema) } } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 4fc34992b0f..3b151103d7f 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -409,7 +409,7 @@ func (ts *trafficSwitcher) addParticipatingTablesToKeyspace(ctx context.Context, vschema.Tables[table] = &vschemapb.Table{} } } - return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, vschema) } func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error { @@ -538,7 +538,7 @@ func (ts *trafficSwitcher) dropParticipatingTablesFromKeyspace(ctx context.Conte for _, tableName := range ts.Tables() { delete(vschema.Tables, tableName) } - return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, vschema) } func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) error { @@ -1013,7 +1013,7 @@ func (ts *trafficSwitcher) buildTenantPredicate(ctx context.Context) (*sqlparser if err != nil { return nil, err } - targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ts.targetKeyspace, parser) + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema.Keyspace, ts.targetKeyspace, parser) if err != nil { return nil, err } @@ -1441,7 +1441,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s return nil, nil } - sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema) + sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema.Keyspace) if err != nil { return nil, err } @@ -1609,7 +1609,7 @@ func (ts trafficSwitcher) createMissingSequenceTables(ctx context.Context, seque } } if updatedGlobalVSchema { - err = ts.ws.ts.SaveVSchema(ctx, globalKeyspace, globalVSchema) + err = ts.ws.ts.SaveVSchema(ctx, globalVSchema) if err != nil { return vterrors.Wrapf(err, "failed to update vschema in the global-keyspace %s", globalKeyspace) } diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index c1f341b38cf..a90688dabf9 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -122,7 +122,7 @@ type ( // VSchemaOperator is an interface to Vschema Operations VSchemaOperator interface { GetCurrentSrvVschema() *vschemapb.SrvVSchema - UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error + UpdateVSchema(ctx context.Context, ks *topo.KeyspaceVSchemaInfo, vschema *vschemapb.SrvVSchema) error } // VCursorImpl implements the VCursor functionality used by dependent @@ -1370,15 +1370,18 @@ func (vc *VCursorImpl) ExecuteVSchema(ctx context.Context, keyspace string, vsch return ErrNoKeyspace } - ks := srvVschema.Keyspaces[ksName] - ks, err := topotools.ApplyVSchemaDDL(ksName, ks, vschemaDDL) + ks, err := vc.topoServer.GetVSchema(ctx, ksName) + if err != nil { + return err + } + ks, err = topotools.ApplyVSchemaDDL(ksName, ks, vschemaDDL) if err != nil { return err } - srvVschema.Keyspaces[ksName] = ks + srvVschema.Keyspaces[ksName] = ks.Keyspace - return vc.vm.UpdateVSchema(ctx, ksName, srvVschema) + return vc.vm.UpdateVSchema(ctx, ks, srvVschema) } func (vc *VCursorImpl) MessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, tableName string, callback func(*sqltypes.Result) error) error { diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index 62ea2cd3455..15ca140094f 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -64,20 +64,18 @@ func (vm *VSchemaManager) GetCurrentSrvVschema() *vschemapb.SrvVSchema { // UpdateVSchema propagates the updated vschema to the topo. The entry for // the given keyspace is updated in the global topo, and the full SrvVSchema // is updated in all known cells. -func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error { +func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ks *topo.KeyspaceVSchemaInfo, srv *vschemapb.SrvVSchema) error { topoServer, err := vm.serv.GetTopoServer() if err != nil { return err } - ks := vschema.Keyspaces[ksName] - - _, err = vindexes.BuildKeyspace(ks, vm.parser) + _, err = vindexes.BuildKeyspace(ks.Keyspace, vm.parser) if err != nil { return err } - err = topoServer.SaveVSchema(ctx, ksName, ks) + err = topoServer.SaveVSchema(ctx, ks) if err != nil { return err } @@ -89,7 +87,7 @@ func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vsch // even if one cell fails, continue to try the others for _, cell := range cells { - cellErr := topoServer.UpdateSrvVSchema(ctx, cell, vschema) + cellErr := topoServer.UpdateSrvVSchema(ctx, cell, srv) if cellErr != nil { err = cellErr log.Errorf("error updating vschema in cell %s: %v", cell, cellErr) @@ -100,7 +98,7 @@ func (vm *VSchemaManager) UpdateVSchema(ctx context.Context, ksName string, vsch } // Update all the local copy of VSchema if the topo update is successful. - vm.VSchemaUpdate(vschema, err) + vm.VSchemaUpdate(srv, err) return nil } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index bd7ae553130..88b97f91ed0 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -148,8 +148,8 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta log.Infof("Successfully opened external topo: %+v", externalTopo) } - var vschema *vschemapb.Keyspace - var origVSchema *vschemapb.Keyspace // If we need to rollback a failed create + var vschema *topo.KeyspaceVSchemaInfo + var origVSchema *topo.KeyspaceVSchemaInfo // If we need to rollback a failed create vschema, err = wr.ts.GetVSchema(ctx, targetKeyspace) if err != nil { return err @@ -214,8 +214,8 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta if !vschema.Sharded { // Save the original in case we need to restore it for a late failure // in the defer(). - origVSchema = vschema.CloneVT() - if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil { + origVSchema.Keyspace = vschema.CloneVT() + if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema.Keyspace, tables, externalTopo == nil); err != nil { return err } } @@ -280,7 +280,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta if origVSchema == nil { // There's no previous version to restore return } - if cerr := wr.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil { + if cerr := wr.ts.SaveVSchema(ctx, origVSchema); cerr != nil { err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr) } } @@ -321,7 +321,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta } // We added to the vschema. - if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { + if err := wr.ts.SaveVSchema(ctx, vschema); err != nil { return err } } @@ -477,7 +477,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe if err != nil { return err } - if err := wr.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { + if err := wr.ts.SaveVSchema(ctx, targetVSchema); err != nil { return err } ms.Cell = cell @@ -495,7 +495,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe if err := wr.Materialize(ctx, ms); err != nil { return err } - if err := wr.ts.SaveVSchema(ctx, keyspace, sourceVSchema); err != nil { + if err := wr.ts.SaveVSchema(ctx, sourceVSchema); err != nil { return err } @@ -503,7 +503,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe } // prepareCreateLookup performs the preparatory steps for creating a lookup vindex. -func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) { +func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *topo.KeyspaceVSchemaInfo, err error) { // Important variables are pulled out here. var ( // lookup vindex info @@ -928,7 +928,7 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s // Remove the write_only param and save the source vschema. delete(sourceVindex.Params, "write_only") - if err := wr.ts.SaveVSchema(ctx, sourceKeyspace, sourceVSchema); err != nil { + if err := wr.ts.SaveVSchema(ctx, sourceVSchema); err != nil { return err } return wr.ts.RebuildSrvVSchema(ctx, nil) @@ -1062,7 +1062,7 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if err != nil { return nil, err } - targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema, ms.TargetKeyspace, wr.env.Parser()) + targetVSchema, err := vindexes.BuildKeyspaceSchema(vschema.Keyspace, ms.TargetKeyspace, wr.env.Parser()) if err != nil { return nil, err } @@ -1129,7 +1129,7 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater if err != nil { return nil, fmt.Errorf("failed to get source keyspace vschema: %v", err) } - differentPVs = primaryVindexesDiffer(ms, sourceVSchema, vschema) + differentPVs = primaryVindexesDiffer(ms, sourceVSchema.Keyspace, vschema.Keyspace) return &materializer{ wr: wr, diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index b041ce32041..09004032dd3 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -39,7 +39,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) type resharder struct { @@ -50,7 +49,7 @@ type resharder struct { sourcePrimaries map[string]*topo.TabletInfo targetShards []*topo.ShardInfo targetPrimaries map[string]*topo.TabletInfo - vschema *vschemapb.Keyspace + vschema *topo.KeyspaceVSchemaInfo refStreams map[string]*refStream cell string //single cell or cellsAlias or comma-separated list of cells/cellsAliases tabletTypes string diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index d337c1ee515..9006e8b1555 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -995,7 +995,7 @@ func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, wo if err != nil { return nil, err } - ts.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs, ts.sourceKeyspace, wr.env.Parser()) + ts.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs.Keyspace, ts.sourceKeyspace, wr.env.Parser()) if err != nil { return nil, err } @@ -1833,7 +1833,7 @@ func (ts *trafficSwitcher) dropParticipatingTablesFromKeyspace(ctx context.Conte for _, tableName := range ts.Tables() { delete(vschema.Tables, tableName) } - return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, vschema) } // FIXME: even after dropSourceShards there are still entries in the topo, need to research and fix @@ -1995,7 +1995,7 @@ func (ts *trafficSwitcher) addParticipatingTablesToKeyspace(ctx context.Context, vschema.Tables[table] = &vschemapb.Table{} } } - return ts.TopoServer().SaveVSchema(ctx, keyspace, vschema) + return ts.TopoServer().SaveVSchema(ctx, vschema) } func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) { @@ -2034,7 +2034,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s return nil, nil } - sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema) + sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema.Keyspace) if err != nil { return nil, err }