Skip to content

Commit

Permalink
VReplication: Gather source positions once we know all writes are don…
Browse files Browse the repository at this point in the history
…e during traffic switch (#16572)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Aug 19, 2024
1 parent 06b6f29 commit fae7540
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 15 deletions.
16 changes: 16 additions & 0 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ type testTMClient struct {
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest
primaryPositions map[uint32]string

env *testEnv // For access to the env config from tmc methods.
reverse atomic.Bool // Are we reversing traffic?
Expand All @@ -266,6 +267,7 @@ func newTestTMClient(env *testEnv) *testTMClient {
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest),
primaryPositions: make(map[uint32]string),
env: env,
}
}
Expand Down Expand Up @@ -513,7 +515,21 @@ func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet
}, nil
}

func (tmc *testTMClient) setPrimaryPosition(tablet *topodatapb.Tablet, position string) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if tmc.primaryPositions == nil {
tmc.primaryPositions = make(map[uint32]string)
}
tmc.primaryPositions[tablet.Alias.Uid] = position
}

func (tmc *testTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if tmc.primaryPositions != nil && tmc.primaryPositions[tablet.Alias.Uid] != "" {
return tmc.primaryPositions[tablet.Alias.Uid], nil
}
return position, nil
}

Expand Down
9 changes: 8 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3497,6 +3497,13 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
}

// Get the source positions now that writes are stopped, the streams were stopped (e.g.
// intra-keyspace materializations that write on the source), and we know for certain
// that any in progress writes are done.
if err := ts.gatherSourcePositions(ctx); err != nil {
return handleError("failed to gather replication positions on migration sources", err)
}

if err := confirmKeyspaceLocksHeld(); err != nil {
return handleError("locks were lost", err)
}
Expand Down Expand Up @@ -3725,7 +3732,7 @@ func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodat
// Notify Replicas to reload schema. This is best-effort.
reloadCtx, cancel := context.WithTimeout(ctx, waitReplicasTimeout)
defer cancel()
_, ok := schematools.ReloadShard(reloadCtx, s.ts, s.tmc, logutil.NewMemoryLogger(), destKeyspace, destShard, destPrimaryPos, nil, true)
_, ok := schematools.ReloadShard(reloadCtx, s.ts, s.tmc, s.Logger(), destKeyspace, destShard, destPrimaryPos, nil, true)
if !ok {
s.Logger().Error(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard: failed to reload schema on all replicas"))
}
Expand Down
31 changes: 20 additions & 11 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,19 +1016,10 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {
err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites)
}
if err != nil {
ts.Logger().Warningf("Error: %s", err)
ts.Logger().Warningf("Error stopping writes on migration sources: %v", err)
return err
}
return ts.ForAllSources(func(source *MigrationSource) error {
var err error
source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet)
ts.Logger().Infof("Stopped Source Writes. Position for source %v:%v: %v",
ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position)
if err != nil {
ts.Logger().Warningf("Error: %s", err)
}
return err
})
return nil
}

// switchDeniedTables switches the denied tables rules for the traffic switch.
Expand Down Expand Up @@ -1318,6 +1309,24 @@ func (ts *trafficSwitcher) gatherPositions(ctx context.Context) error {
})
}

// gatherSourcePositions will get the current replication position for all
// migration sources.
func (ts *trafficSwitcher) gatherSourcePositions(ctx context.Context) error {
return ts.ForAllSources(func(source *MigrationSource) error {
var err error
tablet := source.GetPrimary().Tablet
tabletAlias := topoproto.TabletAliasString(tablet.Alias)
source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, tablet)
if err != nil {
ts.Logger().Errorf("Error getting migration source position on %s: %s", tabletAlias, err)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get position on migration source %s: %v",
tabletAlias, err)
}
ts.Logger().Infof("Position on migration source %s after having stopped writes: %s", tabletAlias, source.Position)
return nil
})
}

func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) {
vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace)
if err != nil {
Expand Down
72 changes: 72 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -361,3 +364,72 @@ func TestGetTargetSequenceMetadata(t *testing.T) {
})
}
}

// TestSwitchTrafficPositionHandling confirms that if any writes are somehow
// executed against the source between the stop source writes and wait for
// catchup steps, that we have the correct position and do not lose the write(s).
func TestTrafficSwitchPositionHandling(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

workflowName := "wf1"
tableName := "t1"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"

schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
tableName: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
},
},
},
}

sourceKeyspace := &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
}
targetKeyspace := &testKeyspace{
KeyspaceName: targetKeyspaceName,
ShardNames: []string{"0"},
}

env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer env.close()
env.tmc.schema = schema

ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName)
require.NoError(t, err)
sw := &switcher{ts: ts, s: env.ws}

lockCtx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "test")
require.NoError(t, lockErr)
ctx = lockCtx
defer sourceUnlock(&err)
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "test")
require.NoError(t, lockErr)
ctx = lockCtx
defer targetUnlock(&err)

err = ts.stopSourceWrites(ctx)
require.NoError(t, err)

// Now we simulate a write on the source.
newPosition := position[:strings.LastIndex(position, "-")+1]
oldSeqNo, err := strconv.Atoi(position[strings.LastIndex(position, "-")+1:])
require.NoError(t, err)
newPosition = fmt.Sprintf("%s%d", newPosition, oldSeqNo+1)
env.tmc.setPrimaryPosition(env.tablets[sourceKeyspaceName][startingSourceTabletUID], newPosition)

// And confirm that we picked up the new position.
err = ts.gatherSourcePositions(ctx)
require.NoError(t, err)
err = ts.ForAllSources(func(ms *MigrationSource) error {
require.Equal(t, newPosition, ms.Position)
return nil
})
require.NoError(t, err)
}
5 changes: 2 additions & 3 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag
optionsJSON := wf.GetOptions()
if optionsJSON != "" {
if err := json.Unmarshal([]byte(optionsJSON), &options); err != nil {
log.Errorf("failed to unmarshal options: %v %s", err, optionsJSON)
return nil, err
return nil, vterrors.Wrapf(err, "failed to unmarshal options: %s", optionsJSON)
}
}

Expand Down Expand Up @@ -671,7 +670,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf

wg.Wait()
if allErrors.HasErrors() {
log.Errorf("%s", allErrors.Error())
ts.Logger().Errorf("%s", allErrors.Error())
return allErrors.Error()
}
return nil
Expand Down

0 comments on commit fae7540

Please sign in to comment.