Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VStream: Allow for automatic resume after Reshard across VStreams #15395

Merged
merged 11 commits into from
Mar 19, 2024
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
action, workflow, output)
}
if err != nil {
t.Fatalf("Reshard %s command failed with %+v\n", action, err)
t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output)
}
}

Expand Down
198 changes: 195 additions & 3 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func insertRow(keyspace, table string, id int) {
vtgateConn.ExecuteFetch("begin", 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
log.Infof("error inserting row %d: %v", id, err)
log.Errorf("error inserting row %d: %v", id, err)
}
vtgateConn.ExecuteFetch("commit", 1000, false)
}
Expand Down Expand Up @@ -387,13 +387,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
defer vc.TearDown()

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
require.NoError(t, err)

ctx := context.Background()
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
Expand Down Expand Up @@ -512,6 +514,196 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
return ne
}

// Validate that we can resume a VStream when the keyspace has been resharded
// while not streaming. Ensure that there we successfully transition from the
// old shards -- which are in the VGTID from the previous stream -- and that
// we miss no row events during the process.
func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
ctx := context.Background()
ks := "testks"
wf := "multiVStreamsKeyspaceReshard"
baseTabletID := 100
tabletType := topodatapb.TabletType_PRIMARY.String()
oldShards := "-80,80-"
newShards := "-40,40-80,80-c0,c0-"
oldShardRowEvents, newShardRowEvents := 0, 0
vc = NewVitessCluster(t, nil)
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
ogdr := defaultReplicas
defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

// For our sequences etc.
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil)
require.NoError(t, err)

// Setup the keyspace with our old/original shards.
keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil)
require.NoError(t, err)

// Add the new shards.
err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// Ensure that we're starting with a clean slate.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false)
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
insertRow(ks, "customer", id)
time.Sleep(250 * time.Millisecond)
id++
}
}
}()

// Create the Reshard workflow and wait for it to finish the copy phase.
reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String())

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*", // Match all keyspaces just to be more realistic.
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// Only stream the customer table and its sequence backing table.
Match: "/customer.*",
}},
}
flags := &vtgatepb.VStreamFlags{}

// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.GetRowEvent().GetShard()
switch shard {
case "-80", "80-":
oldShardRowEvents++
case "0":
// We expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
if len(newVGTID.GetShardGtids()) == 3 {
// We want a VGTID with a position for the global shard and the old shards.
canStop := true
for _, sg := range newVGTID.GetShardGtids() {
if sg.GetGtid() == "" {
canStop = false
}
}
if canStop {
return
}
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-streamCtx.Done():
return
default:
}
}
}()

// Confirm that we have shard GTIDs for the global shard and the old/original shards.
require.Len(t, newVGTID.GetShardGtids(), 3)

// Switch the traffic to the new shards.
reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType)

// Now start a new VStream from our previous VGTID which only has the old/original shards.
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "-80", "80-":
oldShardRowEvents++
case "-40", "40-80", "80-c0", "c0-":
newShardRowEvents++
case "0":
// Again, we expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-done:
return
default:
}
}
}()

// We should have a mix of events across the old and new shards.
require.NotZero(t, oldShardRowEvents)
require.NotZero(t, newShardRowEvents)

// The number of row events streamed by the VStream API should match the number of rows inserted.
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")
customerCount, err := customerResult.Rows[0][0].ToInt64()
require.NoError(t, err)
require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents))
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down
5 changes: 2 additions & 3 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"strings"
"sync"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/topo"
)

// FakeFactory implements the Factory interface. This is supposed to be used only for testing
Expand Down
90 changes: 81 additions & 9 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -497,24 +500,40 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var err error
cells := vs.getCells()

tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
tpo := vs.tabletPickerOptions
resharded, err := vs.keyspaceHasBeenResharded(ctx, sgtid.Keyspace)
if err != nil {
log.Errorf(err.Error())
return err
return vterrors.Wrapf(err, "failed to determine if keyspace %s has been resharded", sgtid.Keyspace)
}
if resharded {
// The non-serving tablet in the old / non-serving shard will contain all of
// the GTIDs that we need before transitioning to the new shards along with
// the journal event that will then allow us to automatically transition to
// the new shards (provided the stop_on_reshard option is not set).
tpo.IncludeNonServingTablets = true
}

tabletPickerErr := func(err error) error {
tperr := vterrors.Wrapf(err, "failed to find a %s tablet for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
log.Errorf("%v", tperr)
return tperr
}
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.GetKeyspace(), sgtid.GetShard(), vs.tabletType.String(), tpo, ignoreTablets...)
if err != nil {
return tabletPickerErr(err)
}
// Create a child context with a stricter timeout when picking a tablet.
// This will prevent hanging in the case no tablets are found.
tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout)
defer tpCancel()

tablet, err := tp.PickForStreaming(tpCtx)
if err != nil {
log.Errorf(err.Error())
return err
return tabletPickerErr(err)
}
log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), strings.Join(cells, ","),
sgtid.Keyspace, sgtid.Shard, vs.tabletType.String())
log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))

target := &querypb.Target{
Keyspace: sgtid.Keyspace,
Shard: sgtid.Shard,
Expand Down Expand Up @@ -737,7 +756,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
if err := vs.getError(); err != nil {
return err
}
// convert all gtids to vgtids. This should be done here while holding the lock.
// Convert all gtids to vgtids. This should be done here while holding the lock.
for j, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
// Update the VGtid and send that instead.
Expand Down Expand Up @@ -921,3 +940,56 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar
close(je.done)
return je, nil
}

// keyspaceHasBeenResharded returns true if the keyspace's serving shard set has changed
// since the last VStream as indicated by the shard definitions provided in the VGTID.
func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, error) {
shards, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil || len(shards) == 0 {
return false, err
}

// First check the typical case, where the VGTID shards match the serving shards.
// In that case it's NOT possible that an applicable reshard has happened because
// the VGTID contains shards that are all serving.
reshardPossible := false
ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids))
for _, s := range vs.vgtid.ShardGtids {
if s.GetKeyspace() == keyspace {
ksShardGTIDs = append(ksShardGTIDs, s)
}
}
for _, s := range ksShardGTIDs {
shard := shards[s.GetShard()]
if shard == nil {
return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in the %s keyspace", s.GetShard(), keyspace)
}
if !shard.GetIsPrimaryServing() {
reshardPossible = true
break
}
}
if !reshardPossible {
return false, nil
}

// Now that we know there MAY have been an applicable reshard, let's make a
// definitive determination by looking at the shard keyranges.
// All we care about are the shard info records now.
sis := maps.Values(shards)
for i := range sis {
for j := range sis {
if sis[i].ShardName() == sis[j].ShardName() && key.KeyRangeEqual(sis[i].GetKeyRange(), sis[j].GetKeyRange()) {
// It's the same shard so skip it.
continue
}
if key.KeyRangeIntersect(sis[i].GetKeyRange(), sis[j].GetKeyRange()) {
// We have different shards with overlapping keyranges so we know
// that a reshard has happened.
return true, nil
}
}
}

return false, nil
}
Loading
Loading