Skip to content

Commit

Permalink
Use std protobuf package name (vtrpcb)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 19, 2023
1 parent 9f45ef2 commit dd37e1a
Showing 1 changed file with 37 additions and 38 deletions.
75 changes: 37 additions & 38 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/proto/vtrpc"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
vttimepb "vitess.io/vitess/go/vt/proto/vttime"
)
Expand Down Expand Up @@ -282,7 +281,7 @@ func (s *Server) GetCellsWithTableReadsSwitched(
getKeyspace := func(ruleTarget string) (string, error) {
arr := strings.Split(ruleTarget, ".")
if len(arr) != 2 {
return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "rule target is not correctly formatted: %s", ruleTarget)
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "rule target is not correctly formatted: %s", ruleTarget)
}

return arr[0], nil
Expand Down Expand Up @@ -852,7 +851,7 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN

// We assume a consistent state, so only choose routing rule for one table.
if len(ts.Tables()) == 0 {
return nil, nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", targetKeyspace, workflowName)
return nil, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", targetKeyspace, workflowName)

}
table := ts.Tables()[0]
Expand Down Expand Up @@ -998,7 +997,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}
if vschema == nil {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "no vschema found for target keyspace %s", targetKeyspace)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no vschema found for target keyspace %s", targetKeyspace)
}
ksTables, err := getTablesInKeyspace(ctx, sourceTopo, s.tmc, sourceKeyspace)
if err != nil {
Expand All @@ -1013,7 +1012,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if req.AllTables {
tables = ksTables
} else {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "no tables to move")
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables to move")
}
}
if len(req.ExcludeTables) > 0 {
Expand All @@ -1030,7 +1029,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}
tables = tables2
if len(tables) == 0 {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "no tables to move")
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables to move")
}
log.Infof("Found tables to move: %s", strings.Join(tables, ","))

Expand Down Expand Up @@ -1181,7 +1180,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
migrationID, strings.Join(tablets, ","))
msg += fmt.Sprintf("please review and delete it before proceeding and then start the workflow using: MoveTables --workflow %s --target-keyspace %s start",
req.Workflow, req.TargetKeyspace)
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, msg)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, msg)
}
}

Expand Down Expand Up @@ -1356,7 +1355,7 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe
return nil, err
}
if ts.frozen {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s",
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s",
req.TargetKeyspace, req.Workflow)
}

Expand Down Expand Up @@ -1553,7 +1552,7 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
}

if len(res) == 0 {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "the %s workflow does not exist in the %s keyspace", req.Workflow, req.Keyspace)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "the %s workflow does not exist in the %s keyspace", req.Workflow, req.Keyspace)
}

response := &vtctldatapb.WorkflowDeleteResponse{}
Expand Down Expand Up @@ -1752,15 +1751,15 @@ func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state
break
}
if sourceDbName == "" {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "no sources found for workflow %s.%s", state.TargetKeyspace, state.Workflow)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no sources found for workflow %s.%s", state.TargetKeyspace, state.Workflow)
}
targetDbName := ""
for _, tsTarget := range ts.targets {
targetDbName = tsTarget.GetPrimary().DbName()
break
}
if sourceDbName == "" || targetDbName == "" {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "workflow %s.%s is incorrectly configured", state.TargetKeyspace, state.Workflow)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "workflow %s.%s is incorrectly configured", state.TargetKeyspace, state.Workflow)
}
sort.Strings(tableList) // sort list for repeatability for mocking in tests
tablesStr := strings.Join(tableList, ",")
Expand Down Expand Up @@ -1826,7 +1825,7 @@ func (s *Server) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUp
}

if len(res) == 0 {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "the %s workflow does not exist in the %s keyspace", req.TabletRequest.Workflow, req.Keyspace)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "the %s workflow does not exist in the %s keyspace", req.TabletRequest.Workflow, req.Keyspace)
}

response := &vtctldatapb.WorkflowUpdateResponse{}
Expand Down Expand Up @@ -1864,7 +1863,7 @@ func (s *Server) validateSourceTablesExist(ctx context.Context, sourceKeyspace s
}
}
if len(missingTables) > 0 {
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "table(s) not found in source keyspace %s: %s", sourceKeyspace, strings.Join(missingTables, ","))
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "table(s) not found in source keyspace %s: %s", sourceKeyspace, strings.Join(missingTables, ","))
}
return nil
}
Expand Down Expand Up @@ -2164,7 +2163,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
ts.externalTopo = externalTopo
}
} else if ts.sourceKeyspace != bls.Keyspace {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "source keyspaces are mismatched across streams: %v vs %v", ts.sourceKeyspace, bls.Keyspace)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "source keyspaces are mismatched across streams: %v vs %v", ts.sourceKeyspace, bls.Keyspace)
}

if ts.tables == nil {
Expand All @@ -2179,7 +2178,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
}
sort.Strings(tables)
if !reflect.DeepEqual(ts.tables, tables) {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "table lists are mismatched across streams: %v vs %v", ts.tables, tables)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "table lists are mismatched across streams: %v vs %v", ts.tables, tables)
}
}

Expand Down Expand Up @@ -2355,7 +2354,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs
// Check the Serving map for the shard, we don't want to
// remove a serving shard if not absolutely sure.
if !evenIfServing && len(servingCells) > 0 {
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "shard %v/%v is still serving, cannot delete it, use the even-if-serving flag if needed", keyspace, shard)
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard %v/%v is still serving, cannot delete it, use the even-if-serving flag if needed", keyspace, shard)
}

cells, err := s.ts.GetCellInfoNames(ctx)
Expand All @@ -2379,7 +2378,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs
// try to delete them.
aliases, err = s.ts.GetTabletAliasesByCell(ctx, cell)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "GetTabletsByCell(%v) failed: %v", cell, err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "GetTabletsByCell(%v) failed: %v", cell, err)
}
case err == nil:
// We found a ShardReplication object. We
Expand All @@ -2389,7 +2388,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs
aliases[i] = n.TabletAlias
}
default:
return vterrors.Errorf(vtrpc.Code_INTERNAL, "GetShardReplication(%v, %v, %v) failed: %v", cell, keyspace, shard, err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "GetShardReplication(%v, %v, %v) failed: %v", cell, keyspace, shard, err)
}

// Get the corresponding Tablet records. Note
Expand All @@ -2398,7 +2397,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs
// still referenced.
tabletMap, err := s.ts.GetTabletMap(ctx, aliases)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "GetTabletMap() failed: %v", err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "GetTabletMap() failed: %v", err)
}

// Remove the tablets that don't belong to our
Expand All @@ -2412,7 +2411,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs
// Now see if we need to DeleteTablet, and if we can, do it.
if len(tabletMap) > 0 {
if !recursive {
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "shard %v/%v still has %v tablets in cell %v; use --recursive or remove them manually", keyspace, shard, len(tabletMap), cell)
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard %v/%v still has %v tablets in cell %v; use --recursive or remove them manually", keyspace, shard, len(tabletMap), cell)
}

log.Infof("Deleting all tablets in shard %v/%v cell %v", keyspace, shard, cell)
Expand All @@ -2430,7 +2429,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs
//
// If the problem is temporary, or resolved externally, re-running
// DeleteShard will skip over tablets that were already deleted.
return vterrors.Errorf(vtrpc.Code_INTERNAL, "can't delete tablet %v: %v", tabletAlias, err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "can't delete tablet %v: %v", tabletAlias, err)
}
}
}
Expand Down Expand Up @@ -2541,7 +2540,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
}

if startState.WorkflowType == TypeMigrate {
return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic")
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic")
}

maxReplicationLagAllowed, set, err := protoutil.DurationFromProto(req.MaxReplicationLagAllowed)
Expand All @@ -2564,7 +2563,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
return nil, err
}
if reason != "" {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", startState.Workflow, reason)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", startState.Workflow, reason)
}
hasReplica, hasRdonly, hasPrimary, err = parseTabletTypes(req.TabletTypes)
if err != nil {
Expand Down Expand Up @@ -2646,14 +2645,14 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc

log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, ts.optCells, state.String())
if !switchReplica && !switchRdonly {
return handleError("invalid tablet types", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
}
if !ts.isPartialMigration { // shard level traffic switching is all or nothing
if direction == DirectionBackward && switchReplica && len(state.ReplicaCellsSwitched) == 0 {
return handleError("invalid request", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
}
if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 {
return handleError("invalid request", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
}
}
var cells = req.Cells
Expand Down Expand Up @@ -2863,7 +2862,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
} else {
if cancel {
return handleError("invalid cancel", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "traffic switching has reached the point of no return, cannot cancel"))
return handleError("invalid cancel", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "traffic switching has reached the point of no return, cannot cancel"))
}
ts.Logger().Infof("Journals were found. Completing the left over steps.")
// Need to gather positions in case all journals were not created.
Expand Down Expand Up @@ -2983,16 +2982,16 @@ func (s *Server) VReplicationExec(ctx context.Context, tabletAlias *topodatapb.T
func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool, destKeyspace, destShard string, waitReplicasTimeout time.Duration, skipVerify bool) error {
destShardInfo, err := s.ts.GetShard(ctx, destKeyspace, destShard)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "GetShard(%v, %v) failed: %v", destKeyspace, destShard, err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "GetShard(%v, %v) failed: %v", destKeyspace, destShard, err)
}

if destShardInfo.PrimaryAlias == nil {
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "no primary in shard record %v/%v. Consider running 'vtctl InitShardPrimary' in case of a new shard or reparenting the shard to fix the topology data", destKeyspace, destShard)
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no primary in shard record %v/%v. Consider running 'vtctl InitShardPrimary' in case of a new shard or reparenting the shard to fix the topology data", destKeyspace, destShard)
}

diffs, err := schematools.CompareSchemas(ctx, s.ts, s.tmc, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "CopySchemaShard failed because schemas could not be compared initially: %v", err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard failed because schemas could not be compared initially: %v", err)
}
if diffs == nil {
// Return early because dest has already the same schema as source.
Expand All @@ -3002,19 +3001,19 @@ func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodat
req := &tabletmanagerdatapb.GetSchemaRequest{Tables: tables, ExcludeTables: excludeTables, IncludeViews: includeViews}
sourceSd, err := schematools.GetSchema(ctx, s.ts, s.tmc, sourceTabletAlias, req)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "GetSchema(%v, %v, %v, %v) failed: %v", sourceTabletAlias, tables, excludeTables, includeViews, err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "GetSchema(%v, %v, %v, %v) failed: %v", sourceTabletAlias, tables, excludeTables, includeViews, err)
}

createSQLstmts := tmutils.SchemaDefinitionToSQLStrings(sourceSd)

destTabletInfo, err := s.ts.GetTablet(ctx, destShardInfo.PrimaryAlias)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "GetTablet(%v) failed: %v", destShardInfo.PrimaryAlias, err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "GetTablet(%v) failed: %v", destShardInfo.PrimaryAlias, err)
}
for _, createSQL := range createSQLstmts {
err = s.applySQLShard(ctx, destTabletInfo, createSQL)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "creating a table failed."+
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "creating a table failed."+
" Most likely some tables already exist on the destination and differ from the source."+
" Please remove all to be copied tables from the destination manually and run this command again."+
" Full error: %v", err)
Expand All @@ -3024,7 +3023,7 @@ func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodat
// Remember the replication position after all the above were applied.
destPrimaryPos, err := s.tmc.PrimaryPosition(ctx, destTabletInfo.Tablet)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "CopySchemaShard: can't get replication position after schema applied: %v", err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard: can't get replication position after schema applied: %v", err)
}

// Although the copy was successful, we have to verify it to catch the case
Expand All @@ -3035,10 +3034,10 @@ func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodat
if !skipVerify {
diffs, err = schematools.CompareSchemas(ctx, s.ts, s.tmc, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "CopySchemaShard failed because schemas could not be compared finally: %v", err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard failed because schemas could not be compared finally: %v", err)
}
if diffs != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "CopySchemaShard was not successful because the schemas between the two tablets %v and %v differ: %v", sourceTabletAlias, destShardInfo.PrimaryAlias, diffs)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard was not successful because the schemas between the two tablets %v and %v differ: %v", sourceTabletAlias, destShardInfo.PrimaryAlias, diffs)
}
}

Expand All @@ -3047,7 +3046,7 @@ func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodat
defer cancel()
_, ok := schematools.ReloadShard(reloadCtx, s.ts, s.tmc, logutil.NewMemoryLogger(), destKeyspace, destShard, destPrimaryPos, nil, true)
if !ok {
log.Error(vterrors.Errorf(vtrpc.Code_INTERNAL, "CopySchemaShard: failed to reload schema on all replicas"))
log.Error(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard: failed to reload schema on all replicas"))
}

return err
Expand All @@ -3063,7 +3062,7 @@ func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodat
func (s *Server) applySQLShard(ctx context.Context, tabletInfo *topo.TabletInfo, change string) error {
filledChange, err := fillStringTemplate(change, map[string]string{"DatabaseName": tabletInfo.DbName()})
if err != nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "fillStringTemplate failed: %v", err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "fillStringTemplate failed: %v", err)
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand Down

0 comments on commit dd37e1a

Please sign in to comment.