diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 152409540c8..6be5ac7f445 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -103,7 +103,17 @@ func (mz *materializer) createMoveTablesStreams(req *vtctldatapb.MoveTablesCreat } sourceShards := mz.filterSourceShards(target) - blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards) + // streamKeyRangesEqual allows us to optimize the stream for the cases + // where while the target keyspace may be sharded, the target shard has + // a single source shard to stream data from and the target and source + // shard have equal key ranges. This can be done, for example, when doing + // shard by shard migrations -- migrating a single shard at a time between + // sharded source and sharded target keyspaces. + streamKeyRangesEqual := false + if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, target.KeyRange) { + streamKeyRangesEqual = true + } + blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards, streamKeyRangesEqual) if err != nil { return err } @@ -139,7 +149,17 @@ func (mz *materializer) createMaterializerStreams() error { insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { sourceShards := mz.filterSourceShards(targetShard) - inserts, err := mz.generateInserts(mz.ctx, sourceShards) + // streamKeyRangesEqual allows us to optimize the stream for the cases + // where while the target keyspace may be sharded, the target shard has + // a single source shard to stream data from and the target and source + // shard have equal key ranges. This can be done, for example, when doing + // shard by shard migrations -- migrating a single shard at a time between + // sharded source and sharded target keyspaces. + streamKeyRangesEqual := false + if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) { + streamKeyRangesEqual = true + } + inserts, err := mz.generateInserts(mz.ctx, sourceShards, streamKeyRangesEqual) if err != nil { return err } @@ -151,7 +171,7 @@ func (mz *materializer) createMaterializerStreams() error { return nil } -func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) { +func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo, keyRangesEqual bool) (string, error) { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}") for _, sourceShard := range sourceShards { @@ -185,7 +205,7 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) } filter := ts.SourceExpression - if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { return "", err @@ -251,7 +271,7 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top return ig.String(), nil } -func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) { +func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo, keyRangesEqual bool) ([]*binlogdatapb.BinlogSource, error) { blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards)) for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ @@ -284,7 +304,7 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard * return nil, fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) } filter := ts.SourceExpression - if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { return nil, err diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index f1ddf6be645..1026628405e 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -26,19 +26,23 @@ import ( "sync" "testing" - _flag "vitess.io/vitess/go/internal/flag" + "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tmclient" + _flag "vitess.io/vitess/go/internal/flag" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type queryResult struct { @@ -154,6 +158,7 @@ func (env *testMaterializerEnv) addTablet(id int, keyspace, shard string, tablet if tabletType == topodatapb.TabletType_PRIMARY { _, err := env.ws.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias + si.IsPrimaryServing = true return nil }) if err != nil { @@ -175,10 +180,11 @@ type testMaterializerTMClient struct { tmclient.TabletManagerClient schema map[string]*tabletmanagerdatapb.SchemaDefinition - mu sync.Mutex - vrQueries map[int][]*queryResult - getSchemaCounts map[string]int - muSchemaCount sync.Mutex + mu sync.Mutex + vrQueries map[int][]*queryResult + createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + getSchemaCounts map[string]int + muSchemaCount sync.Mutex // Used to confirm the number of times WorkflowDelete was called. workflowDeleteCalls int @@ -186,9 +192,10 @@ type testMaterializerTMClient struct { func newTestMaterializerTMClient() *testMaterializerTMClient { return &testMaterializerTMClient{ - schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), - vrQueries: make(map[int][]*queryResult), - getSchemaCounts: make(map[string]int), + schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), + vrQueries: make(map[int][]*queryResult), + createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + getSchemaCounts: make(map[string]int), } } @@ -205,6 +212,11 @@ func (tmc *testMaterializerTMClient) schemaRequested(uid uint32) { } func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { + if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { + if !proto.Equal(expect, request) { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) + } + } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil } @@ -288,6 +300,13 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r }) } +func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + tmc.createVReplicationWorkflowRequests[tabletID] = req +} + func (tmc *testMaterializerTMClient) verifyQueries(t *testing.T) { t.Helper() tmc.mu.Lock() diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 5121590e3c4..9b5c7c5c1cd 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -19,10 +19,13 @@ package workflow import ( "context" "fmt" + "slices" "strings" "testing" + "time" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/sqltypes" @@ -3284,3 +3287,285 @@ func TestMaterializerNoVindexInExpression(t *testing.T) { err := env.ws.Materialize(ctx, ms) require.EqualError(t, err, "could not find vindex column c1") } + +// TestKeyRangesEqualOptimization tests that we optimize the source +// filtering when there's only one source shard for the stream and +// its keyrange is equal to the target shard for the stream. This +// means that even if the target keyspace is sharded, the source +// does not need to perform the in_keyrange filtering. +func TestKeyRangesEqualOptimization(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + workflow := "testwf" + cells := []string{"cell"} + sourceKs := "sourceks" + targetKs := "targetks" + table := "t1" + tableSettings := []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: table, + SourceExpression: fmt.Sprintf("select * from %s", table), + }} + targetVSchema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + table: { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "xxhash", + }, + }, + }, + }, + } + + testCases := []struct { + name string + sourceShards []string + targetShards []string + moveTablesReq *vtctldatapb.MoveTablesCreateRequest + // Target Shards are in the order specifed in the targetShards slice + // with the UIDs starting at 200 and increasing by 10 for each tablet + // and shard since there's only a primary tablet per shard. + wantReqs map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + }{ + { + name: "no in_keyrange filter -- partial, one equal shard", + moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cells: []string{"cell"}, + SourceShards: []string{"-80"}, // Partial MoveTables just for this shard + IncludeTables: []string{table}, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantReqs: map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest{ + 200: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + WorkflowSubType: binlogdatapb.VReplicationWorkflowSubType_Partial, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-80", // Keyranges are equal between the source and target + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "in_keyrange filter -- unequal shards", + moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cells: []string{"cell"}, + IncludeTables: []string{table}, + }, + sourceShards: []string{"-"}, + targetShards: []string{"-80", "80-"}, + wantReqs: map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest{ + 200: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-80')", table, targetKs), + }, + }, + }, + }, + }, + }, + 210: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '80-')", table, targetKs), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "in_keyrange filter -- unequal shards on merge", + moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cells: []string{"cell"}, + IncludeTables: []string{table}, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-"}, + wantReqs: map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest{ + 200: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-80", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-')", table, targetKs), + }, + }, + }, + }, + { + Keyspace: sourceKs, + Shard: "80-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-')", table, targetKs), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "no in_keyrange filter -- all equal shards", + moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cells: []string{"cell"}, + IncludeTables: []string{table}, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantReqs: map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest{ + 200: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-80", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + 210: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "80-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if len(tc.wantReqs) == 0 { + require.FailNow(t, "invalid test case", "no wanted requests specified") + } + workflowType := maps.Values(tc.wantReqs)[0].WorkflowType + ms := &vtctldatapb.MaterializeSettings{ + Workflow: tc.moveTablesReq.Workflow, + MaterializationIntent: vtctldatapb.MaterializationIntent_MOVETABLES, + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Cell: strings.Join(tc.moveTablesReq.Cells, ","), + SourceShards: tc.moveTablesReq.SourceShards, + TableSettings: tableSettings, + } + env := newTestMaterializerEnv(t, ctx, ms, tc.sourceShards, tc.targetShards) + defer env.close() + + // Target is always sharded. + err := env.ws.ts.SaveVSchema(ctx, targetKs, targetVSchema) + require.NoError(t, err, "SaveVSchema failed: %v", err) + + for _, tablet := range env.tablets { + // Queries will only be executed on primary tablets in the target keyspace. + if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY { + continue + } + env.tmc.expectVRQuery(int(tablet.Alias.Uid), mzSelectFrozenQuery, &sqltypes.Result{}) + // If we are doing a partial MoveTables, we will only perform the workflow + // stream creation / INSERT statment on the shard(s) we're migrating. + if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) { + continue + } + env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tc.wantReqs[tablet.Alias.Uid]) + } + + mz := &materializer{ + ctx: ctx, + ts: env.ws.ts, + sourceTs: env.ws.ts, + tmc: env.tmc, + ms: ms, + workflowType: workflowType, + } + err = mz.createMoveTablesStreams(tc.moveTablesReq) + require.NoError(t, err, "createMoveTablesStreams failed: %v", err) + }) + } +} diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 0fba424eacd..990492bd191 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1029,7 +1029,17 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { sourceShards := mz.filterSourceShards(targetShard) - inserts, err := mz.generateInserts(ctx, sourceShards) + // streamKeyRangesEqual allows us to optimize the stream for the cases + // where while the target keyspace may be sharded, the target shard has + // a single source shard to stream data from and the target and source + // shard have equal key ranges. This can be done, for example, when doing + // shard by shard migrations -- migrating a single shard at a time between + // sharded source and sharded target keyspaces. + streamKeyRangesEqual := false + if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) { + streamKeyRangesEqual = true + } + inserts, err := mz.generateInserts(ctx, sourceShards, streamKeyRangesEqual) if err != nil { return nil, err } @@ -1319,7 +1329,7 @@ func stripTableConstraints(ddl string) (string, error) { return newDDL, nil } -func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) { +func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo, keyRangesEqual bool) (string, error) { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}") for _, sourceShard := range sourceShards { @@ -1353,7 +1363,8 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) } filter := ts.SourceExpression - if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + + if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { return "", err diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 6c236a038bf..b98621ffa1b 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -200,12 +200,13 @@ func (env *testMaterializerEnv) addTablet(id int, keyspace, shard string, tablet }, } env.tablets[id] = tablet - if err := env.wr.TopoServer().InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + if err := env.wr.ts.InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { panic(err) } if tabletType == topodatapb.TabletType_PRIMARY { _, err := env.wr.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias + si.IsPrimaryServing = true return nil }) if err != nil { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 3984641fcf8..91dfff80d11 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -19,12 +19,15 @@ package wrangler import ( "context" "fmt" + "regexp" + "slices" "sort" "strings" "testing" "time" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/sqltypes" @@ -3510,3 +3513,230 @@ func TestAddTablesToVSchema(t *testing.T) { }) } } + +// TestKeyRangesEqualOptimization tests that we optimize the source +// filtering when there's only one source shard for the stream and +// its keyrange is equal to the target shard for the stream. This +// means that even if the target keyspace is sharded, the source +// does not need to perform the in_keyrange filtering. +func TestKeyRangesEqualOptimization(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + workflow := "testwf" + sourceKs := "sourceks" + targetKs := "targetks" + table := "t1" + mzi := vtctldatapb.MaterializationIntent_MOVETABLES + tableMaterializeSettings := []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: table, + SourceExpression: fmt.Sprintf("select * from %s", table), + }, + } + targetVSchema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + table: { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "xxhash", + }, + }, + }, + }, + } + + testCases := []struct { + name string + ms *vtctldatapb.MaterializeSettings + sourceShards []string + targetShards []string + wantBls map[string]*binlogdatapb.BinlogSource + }{ + { + name: "no in_keyrange filter -- partial, one equal shard", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + SourceShards: []string{"-80"}, // Partial MoveTables just for this shard + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-80": { + Keyspace: sourceKs, + Shard: "-80", // Keyranges are equal between the source and target + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + { + name: "in_keyrange filter -- unequal shards", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-"}, + targetShards: []string{"-80", "80-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-80": { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-80')", table, targetKs), + }, + }, + }, + }, + "80-": { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '80-')", table, targetKs), + }, + }, + }, + }, + }, + }, + { + name: "in_keyrange filter -- unequal shards on merge", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-": { + Keyspace: sourceKs, + Shard: "-80", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-')", table, targetKs), + }, + }, + }, + }, + }, + }, + { + name: "no in_keyrange filter -- all equal shards", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-80": { + Keyspace: sourceKs, + Shard: "-80", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + "80-": { + Keyspace: sourceKs, + Shard: "80-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + env := newTestMaterializerEnv(t, ctx, tc.ms, tc.sourceShards, tc.targetShards) + defer env.close() + + // Target is always sharded. + err := env.wr.ts.SaveVSchema(ctx, targetKs, targetVSchema) + require.NoError(t, err, "SaveVSchema failed: %v", err) + + for _, tablet := range env.tablets { + // Queries will only be executed on primary tablets in the target keyspace. + if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY { + continue + } + env.tmc.expectVRQuery(int(tablet.Alias.Uid), mzSelectFrozenQuery, &sqltypes.Result{}) + // If we are doing a partial MoveTables, we will only perform the workflow + // stream creation / INSERT statment on the shard(s) we're migrating. + if len(tc.ms.SourceShards) > 0 && !slices.Contains(tc.ms.SourceShards, tablet.Shard) { + continue + } + bls := tc.wantBls[tablet.Shard] + require.NotNil(t, bls, "no binlog source defined for tablet %+v", tablet) + if bls.Filter != nil { + for i, rule := range bls.Filter.Rules { + // It's escaped in the SQL statement. + bls.Filter.Rules[i].Filter = strings.ReplaceAll(rule.Filter, `'`, `\'`) + } + } + blsBytes, err := prototext.Marshal(bls) + require.NoError(t, err, "failed to marshal binlog source: %v", err) + // This is also escaped in the SQL statement. + blsStr := strings.ReplaceAll(string(blsBytes), `"`, `\"`) + // Escape the string for the regexp comparison. + blsStr = regexp.QuoteMeta(blsStr) + // For some reason we end up with an extra slash added by QuoteMeta for the + // escaped single quotes in the filter. + blsStr = strings.ReplaceAll(blsStr, `\\\\`, `\\\`) + expectedQuery := fmt.Sprintf(`/insert into _vt.vreplication.* values \('%s', '%s'`, workflow, blsStr) + env.tmc.expectVRQuery(int(tablet.Alias.Uid), expectedQuery, &sqltypes.Result{}) + } + + _, err = env.wr.prepareMaterializerStreams(ctx, tc.ms) + require.NoError(t, err, "prepareMaterializerStreams failed: %v", err) + }) + } +}