Skip to content

Commit

Permalink
Rebasing to self to get CI to run all tests. I removed the Skip CI la…
Browse files Browse the repository at this point in the history
…bel, but all tests were then skipped.

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Nov 1, 2023
1 parent c71f17e commit 39e2e72
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 43 deletions.
79 changes: 45 additions & 34 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
Expand Down Expand Up @@ -135,6 +136,7 @@ type TabletPicker struct {
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
options TabletPickerOptions
}

// NewTabletPicker returns a TabletPicker.
Expand Down Expand Up @@ -228,6 +230,7 @@ func NewTabletPicker(
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
options: options,
}, nil
}

Expand Down Expand Up @@ -281,6 +284,40 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo
return candidates
}

func (tp *TabletPicker) sortCandidates(ctx context.Context, candidates []*topo.TabletInfo) []*topo.TabletInfo {
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates.
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
return candidates
}

// PickForStreaming picks a tablet that is healthy and serving.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
Expand All @@ -294,36 +331,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
default:
}
candidates := tp.GetMatchingTablets(ctx)
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates.
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
candidates = tp.sortCandidates(ctx, candidates)
if len(candidates) == 0 {
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
Expand All @@ -338,7 +346,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
continue
}
log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String())
log.Infof("Tablet picker found a healthy tablet for streaming: %s", candidates[0].Tablet.String())
return candidates[0].Tablet, nil
}
}
Expand Down Expand Up @@ -428,7 +436,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" {
if shr != nil &&
(shr.Serving || tp.options.IncludeNonServingTablets) &&
shr.RealtimeStats != nil &&
shr.RealtimeStats.HealthError == "" {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
Expand Down
71 changes: 71 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
contextTimeout = 5 * time.Second
numTestIterations = 50
)

func TestPickPrimary(t *testing.T) {
defer utils.EnsureNoLeaks(t)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
Expand Down Expand Up @@ -568,6 +573,72 @@ func TestPickFallbackType(t *testing.T) {
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)
}

// TestPickNonServingTablets validates that non serving tablets are included when the
// IncludeNonServingTablets option is set. Unhealthy tablets should not be picked, irrespective of this option.
func TestPickNonServingTablets(t *testing.T) {
ctx := utils.LeakCheckContext(t)

cells := []string{"cell1", "cell2"}
localCell := cells[0]
tabletTypes := "replica,primary"
options := TabletPickerOptions{}
te := newPickerTestEnv(t, ctx, cells)

// Tablet should be selected as it is healthy and serving.
primaryTablet := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Tablet should not be selected as it is unhealthy.
replicaTablet := addTablet(ctx, te, 200, topodatapb.TabletType_REPLICA, localCell, false, false)
defer deleteTablet(t, te, replicaTablet)

// Tablet should be selected because the IncludeNonServingTablets option is set and it is healthy.
replicaTablet2 := addTablet(ctx, te, 300, topodatapb.TabletType_REPLICA, localCell, false, true)
defer deleteTablet(t, te, replicaTablet2)

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, contextTimeout)
defer cancel()
_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryTablet.Alias
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(ctx, contextTimeout)
defer cancel2()
tablet, err := tp.PickForStreaming(ctx2)
require.NoError(t, err)
// IncludeNonServingTablets is false: only the healthy serving tablet should be picked.
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)

options.IncludeNonServingTablets = true
tp, err = NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx3, cancel3 := context.WithTimeout(ctx, contextTimeout)
defer cancel3()
var picked1, picked2, picked3 bool
// IncludeNonServingTablets is true: both the healthy tablets should be picked even though one is not serving.
for i := 0; i < numTestIterations; i++ {
tablet, err := tp.PickForStreaming(ctx3)
require.NoError(t, err)
if proto.Equal(tablet, primaryTablet) {
picked1 = true
}
if proto.Equal(tablet, replicaTablet) {
picked2 = true
}
if proto.Equal(tablet, replicaTablet2) {
picked3 = true
}
}
assert.True(t, picked1)
assert.False(t, picked2)
assert.True(t, picked3)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type controller struct {
id int64 // id from row in _vt.vdiff
uuid string
workflow string
workflowType binlogdatapb.VReplicationWorkflowType
cancel context.CancelFunc
dbClientFactory func() binlogplayer.DBClient
ts *topo.Server
Expand Down Expand Up @@ -227,6 +228,12 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
ct.sourceKeyspace = bls.Keyspace
ct.filter = bls.Filter
}

workflowType, err := row["workflow_type"].ToInt64()
if err != nil {
return err
}
ct.workflowType = binlogdatapb.VReplicationWorkflowType(workflowType)
}

if err := ct.validate(); err != nil {
Expand Down
22 changes: 17 additions & 5 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,13 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
}
sourceTopoServer = extTS
}
tabletPickerOptions := discovery.TabletPickerOptions{}
wg.Add(1)
go func() {
defer wg.Done()
sourceErr = td.forEachSource(func(source *migrationSource) error {
sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes)
sourceTablet, err := td.pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.sourceKeyspace,
source.shard, td.wd.opts.PickerOptions.TabletTypes, tabletPickerOptions)
if err != nil {
return err
}
Expand All @@ -245,8 +247,15 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.vde.thisTablet.Keyspace,
td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if td.wd.ct.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard {
// For resharding, the target shards could be non-serving if traffic has already been switched once.
// When shards are created their IsPrimaryServing attribute is set to true. However, when the traffic is switched
// it is set to false for the shards we are switching from. We don't have a way to know if we have
// switched or not, so we just include non-serving tablets for all reshards.
tabletPickerOptions.IncludeNonServingTablets = true
}
targetTablet, targetErr = td.pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Keyspace,
td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes, tabletPickerOptions)
if targetErr != nil {
return
}
Expand All @@ -263,8 +272,11 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
func (td *tableDiffer) pickTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace,
shard, tabletTypes string, options discovery.TabletPickerOptions) (*topodatapb.Tablet, error) {

tp, err := discovery.NewTabletPicker(ctx, ts, cells, td.wd.ct.vde.thisTablet.Alias.Cell, keyspace,
shard, tabletTypes, options)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) ||
isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() {

log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err)
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil {
log.Errorf("INTERNAL: unable to setState() in controller. Attempting to set error text: [%v]; setState() error is: %v", err, errSetState)
log.Errorf("INTERNAL: unable to setState() in controller: %v. Could not set error text to: %v.", errSetState, err)
return err // yes, err and not errSetState.
}
log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err)
return nil // this will cause vreplicate to quit the workflow
}
return err
Expand Down
15 changes: 13 additions & 2 deletions go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,8 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
if ts.ExternalTopo() != nil {
sourceTopo = ts.ExternalTopo()
}
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell,
df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
if err != nil {
return err
}
Expand All @@ -827,8 +828,18 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
wg.Add(1)
go func() {
defer wg.Done()
includeNonServingTablets := false
if df.ts.workflowType == binlogdatapb.VReplicationWorkflowType_Reshard {
// For resharding, the target shards could be non-serving if traffic has already been switched once.
// When shards are created their IsPrimaryServing attribute is set to true. However, when the traffic is switched
// it is set to false for the shards we are switching from. We don't have a way to know if we have
// switched or not, so we just include non-serving tablets for all reshards.
includeNonServingTablets = true
}
err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error {
tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell,
df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr,
discovery.TabletPickerOptions{IncludeNonServingTablets: includeNonServingTablets})
if err != nil {
return err
}
Expand Down

0 comments on commit 39e2e72

Please sign in to comment.