Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: properly split cell values in record when using TabletPicker #14099

Merged
merged 4 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,7 @@ func registerVDiffCommands(root *cobra.Command) {
create.Flags().StringSliceVar(&createOptions.SourceCells, "source-cells", nil, "The source cell(s) to compare from; default is any available cell.")
create.Flags().StringSliceVar(&createOptions.TargetCells, "target-cells", nil, "The target cell(s) to compare with; default is any available cell.")
create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.")
create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was missed during vtctldclient migration.

Copy link
Contributor Author

@mattlord mattlord Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is unrelated to this PR, but in working on it I realized that I had missed adding it here: #13976

create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.")
create.Flags().Uint32Var(&createOptions.Limit, "limit", math.MaxUint32, "Max rows to stop comparing after.")
create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.")
Expand Down
29 changes: 18 additions & 11 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,27 @@ var testCases = []*testCase{
}

func TestVDiff2(t *testing.T) {
allCellNames = "zone1"
defaultCellName := "zone1"
allCellNames = "zone5,zone1,zone2,zone3,zone4"
sourceKs := "product"
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables
extraVTTabletArgs = []string{"--vstream_packet_size=1"}

vc = NewVitessCluster(t, "TestVDiff2", []string{allCellNames}, mainClusterConfig)
vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
require.NotNil(t, vc)
defaultCell = vc.Cells[defaultCellName]
cells := []*Cell{defaultCell}
zone1 := vc.Cells["zone1"]
zone2 := vc.Cells["zone2"]
zone3 := vc.Cells["zone3"]
defaultCell = zone1

defer vc.TearDown(t)

vc.AddKeyspace(t, cells, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)
// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
_, err := vc.AddKeyspace(t, []*Cell{zone2, zone1, zone3}, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)
require.NoError(t, err)

vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
Expand All @@ -140,23 +144,25 @@ func TestVDiff2(t *testing.T) {

generateMoreCustomers(t, sourceKs, 100)

_, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
tks, err := vc.AddKeyspace(t, []*Cell{zone3, zone1, zone2}, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
require.NoError(t, err)
for _, shard := range targetShards {
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard))
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
testWorkflow(t, vc, tc, cells)
// Primary tablets for any new shards are added in the first cell.
testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1})
})
}
}

func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) {
func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) {
arrTargetShards := strings.Split(tc.targetShards, ",")
if tc.typ == "Reshard" {
tks := vc.Cells[cells[0].Name].Keyspaces[tc.targetKs]
require.NoError(t, vc.AddShards(t, cells, tks, tc.targetShards, 0, 0, tc.tabletBaseID, targetKsOpts))
for _, shard := range arrTargetShards {
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, tc.targetKs, shard))
Expand All @@ -169,6 +175,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
if tc.typ == "Reshard" {
args = append(args, "--source_shards", tc.sourceShards, "--target_shards", tc.targetShards)
}
args = append(args, "--cells", allCellNames)
args = append(args, "--tables", tc.tables)
args = append(args, "Create")
args = append(args, ksWorkflow)
Expand All @@ -180,7 +187,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
catchup(t, tab, tc.workflow, tc.typ)
}

vdiff(t, tc.targetKs, tc.workflow, cells[0].Name, true, true, nil)
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)

if tc.autoRetryError {
testAutoRetryError(t, tc, cells[0].Name)
Expand Down
12 changes: 10 additions & 2 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a
require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow)

if useVtctlclient {
args := []string{"VDiff", "--", "--tablet_types=primary", "--source_cell=" + cells, "--format=json"}
// This will always result in us using a PRIMARY tablet, which is all
// we start in many e2e tests, but it avoids the tablet picker logic
// where when you ONLY specify the PRIMARY type it then picks the
// shard's primary and ignores any cell settings.
args := []string{"VDiff", "--", "--tablet_types=in_order:primary,replica", "--source_cell=" + cells, "--format=json"}
if len(extraFlags) > 0 {
args = append(args, extraFlags...)
}
Expand All @@ -195,7 +199,11 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a
} else {
args := []string{"VDiff", "--target-keyspace", targetKeyspace, "--workflow", workflowName, "--format=json", action}
if strings.ToLower(action) == string(vdiff2.CreateAction) {
args = append(args, "--tablet-types=primary", "--source-cells="+cells)
// This will always result in us using a PRIMARY tablet, which is all
// we start in many e2e tests, but it avoids the tablet picker logic
// where when you ONLY specify the PRIMARY type it then picks the
// shard's primary and ignores any cell settings.
args = append(args, "--tablet-types=primary,replica", "--tablet-types-in-preference-order", "--source-cells="+cells)
}
if len(extraFlags) > 0 {
args = append(args, extraFlags...)
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ func TestPerformVDiffAction(t *testing.T) {
// available cells, so this additional cell should then show up in the
// created vdiff record.
preFunc: func() error {
return tstenv.TopoServ.CreateCellInfo(ctx, "zone100_test", &topodatapb.CellInfo{})
return tstenv.TopoServ.CreateCellInfo(ctx, "cell1_test", &topodatapb.CellInfo{})
},
expectQueries: []string{
fmt.Sprintf("select id as id from _vt.vdiff where vdiff_uuid = %s", encodeString(uuid)),
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,zone100_test\",\"target_cell\":\"cell1,zone100_test\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"cell1,cell1_test,cell2,cell3\",\"target_cell\":\"cell1,cell1_test,cell2,cell3\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
},
postFunc: func() error {
return tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true)
return tstenv.TopoServ.DeleteCellInfo(ctx, "cell1_test", true)
},
},
{
Expand All @@ -94,10 +94,10 @@ func TestPerformVDiffAction(t *testing.T) {
},
// Add a second cell and create an cell alias that contains it.
preFunc: func() error {
if err := tstenv.TopoServ.CreateCellInfo(ctx, "zone100_test", &topodatapb.CellInfo{}); err != nil {
if err := tstenv.TopoServ.CreateCellInfo(ctx, "cell1_test", &topodatapb.CellInfo{}); err != nil {
return err
}
cells := append(tstenv.Cells, "zone100_test")
cells := append(tstenv.Cells, "cell1_test")
return tstenv.TopoServ.CreateCellsAlias(ctx, "all", &topodatapb.CellsAlias{
Cells: cells,
})
Expand All @@ -107,7 +107,7 @@ func TestPerformVDiffAction(t *testing.T) {
fmt.Sprintf(`insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values('', '', 'pending', '{\"picker_options\":{\"source_cell\":\"all\",\"target_cell\":\"all\"}}', '0', 'vt_vttest', %s)`, encodeString(uuid)),
},
postFunc: func() error {
if err := tstenv.TopoServ.DeleteCellInfo(ctx, "zone100_test", true); err != nil {
if err := tstenv.TopoServ.DeleteCellInfo(ctx, "cell1_test", true); err != nil {
return err
}
return tstenv.TopoServ.DeleteCellsAlias(ctx, "all")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vdiff
import (
"context"
"fmt"
"strings"
"testing"

"github.com/google/uuid"
Expand Down Expand Up @@ -107,8 +108,8 @@ func TestVDiff(t *testing.T) {
MaxRows: 100,
},
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
SourceCell: tstenv.Cells[0],
TargetCell: tstenv.Cells[0],
SourceCell: strings.Join(tstenv.Cells, ","),
TargetCell: strings.Join(tstenv.Cells, ","),
TabletTypes: "primary",
},
ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{
Expand Down
51 changes: 29 additions & 22 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -120,7 +121,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

if err := td.selectTablets(ctx, td.wd.opts.PickerOptions.SourceCell, td.wd.opts.PickerOptions.TabletTypes); err != nil {
if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
Expand Down Expand Up @@ -198,16 +199,22 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err
return allErrors.AggrError(vterrors.Aggregate)
}

func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes string) error {
var wg sync.WaitGroup
ct := td.wd.ct
var err1, err2 error
func (td *tableDiffer) selectTablets(ctx context.Context) error {
var (
wg sync.WaitGroup
sourceErr, targetErr error
targetTablet *topodata.Tablet
)

// The cells from the vdiff record are a comma separated list.
sourceCells := strings.Split(td.wd.opts.PickerOptions.SourceCell, ",")
targetCells := strings.Split(td.wd.opts.PickerOptions.TargetCell, ",")

// For Mount+Migrate, the source tablets will be in a different
// Vitess cluster with its own TopoServer.
sourceTopoServer := ct.ts
if ct.externalCluster != "" {
extTS, err := ct.ts.OpenExternalVitessClusterServer(ctx, ct.externalCluster)
sourceTopoServer := td.wd.ct.ts
if td.wd.ct.externalCluster != "" {
extTS, err := td.wd.ct.ts.OpenExternalVitessClusterServer(ctx, td.wd.ct.externalCluster)
if err != nil {
return err
}
Expand All @@ -216,39 +223,39 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri
wg.Add(1)
go func() {
defer wg.Done()
err1 = td.forEachSource(func(source *migrationSource) error {
tablet, err := pickTablet(ctx, sourceTopoServer, cell, ct.vde.thisTablet.Alias.Cell, ct.sourceKeyspace, source.shard, tabletTypes)
sourceErr = td.forEachSource(func(source *migrationSource) error {
sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes)
if err != nil {
return err
}
source.tablet = tablet
source.tablet = sourceTablet
return nil
})
}()

wg.Add(1)
go func() {
defer wg.Done()
tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Alias.Cell, ct.vde.thisTablet.Keyspace,
ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if err2 != nil {
targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.vde.thisTablet.Keyspace,
td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if targetErr != nil {
return
}
ct.targetShardStreamer = &shardStreamer{
tablet: tablet,
shard: tablet.Shard,
td.wd.ct.targetShardStreamer = &shardStreamer{
tablet: targetTablet,
shard: targetTablet.Shard,
}
}()

wg.Wait()
if err1 != nil {
return err1
if sourceErr != nil {
return sourceErr
}
return err2
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cell, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, []string{cell}, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func Init(ctx context.Context) (*Env, error) {
te := &Env{
KeyspaceName: "vttest",
ShardName: "0",
Cells: []string{"cell1"},
Cells: []string{"cell1", "cell2", "cell3"},
}

te.TopoServ = memorytopo.NewServer(ctx, te.Cells...)
Expand Down
Loading