Skip to content

Commit

Permalink
Improve TopoServer Performance and Efficiency For Keyspace Shards (#1…
Browse files Browse the repository at this point in the history
…5047)

Signed-off-by: Matt Lord <[email protected]>
Signed-off-by: deepthi <[email protected]>
Co-authored-by: deepthi <[email protected]>
  • Loading branch information
mattlord and deepthi authored Jan 30, 2024
1 parent 9e27038 commit c156ca2
Show file tree
Hide file tree
Showing 16 changed files with 239 additions and 57 deletions.
11 changes: 1 addition & 10 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ var (
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
refreshKnownTablets = true

// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency int64 = 32

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

Expand All @@ -107,11 +104,6 @@ const (
DefaultHealthCheckRetryDelay = 5 * time.Second
DefaultHealthCheckTimeout = 1 * time.Minute

// DefaultTopoReadConcurrency is used as the default value for the topoReadConcurrency parameter of a TopologyWatcher.
DefaultTopoReadConcurrency int = 5
// DefaultTopologyWatcherRefreshInterval is used as the default value for
// the refresh interval of a topology watcher.
DefaultTopologyWatcherRefreshInterval = 1 * time.Minute
// healthCheckTemplate is the HTML code to display a TabletsCacheStatusList, it takes a parameter for the title
// as the template can be used for both HealthCheck's cache and healthy tablets list.
healthCheckTemplate = `
Expand Down Expand Up @@ -176,7 +168,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -362,7 +353,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
4 changes: 2 additions & 2 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
concurrency int64
concurrency int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -92,7 +92,7 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
Expand Down
14 changes: 5 additions & 9 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,20 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
return nil
}
exec.keyspace = keyspace
shardNames, err := exec.ts.GetShardNames(ctx, keyspace)
shards, err := exec.ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil {
return fmt.Errorf("unable to get shard names for keyspace: %s, error: %v", keyspace, err)
return fmt.Errorf("unable to get shards for keyspace: %s, error: %v", keyspace, err)
}
exec.tablets = make([]*topodatapb.Tablet, len(shardNames))
for i, shardName := range shardNames {
shardInfo, err := exec.ts.GetShard(ctx, keyspace, shardName)
if err != nil {
return fmt.Errorf("unable to get shard info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err)
}
exec.tablets = make([]*topodatapb.Tablet, 0, len(shards))
for shardName, shardInfo := range shards {
if !shardInfo.HasPrimary() {
return fmt.Errorf("shard: %s does not have a primary", shardName)
}
tabletInfo, err := exec.ts.GetTablet(ctx, shardInfo.PrimaryAlias)
if err != nil {
return fmt.Errorf("unable to get primary tablet info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err)
}
exec.tablets[i] = tabletInfo.Tablet
exec.tablets = append(exec.tablets, tabletInfo.Tablet)
}

if len(exec.tablets) == 0 {
Expand Down
92 changes: 79 additions & 13 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ package topo
import (
"context"
"path"
"sort"
"sync"

"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/event"
Expand All @@ -34,7 +38,20 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// This file contains keyspace utility functions
// This file contains keyspace utility functions.

// Default concurrency to use in order to avoid overhwelming the topo server.
var DefaultConcurrency = 32

func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.")
}

func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
// data more context and convenience. This is the main way we interact
Expand Down Expand Up @@ -188,12 +205,60 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
opt = &FindAllShardsInKeyspaceOptions{}
}
if opt.Concurrency <= 0 {
opt.Concurrency = 1
opt.Concurrency = DefaultConcurrency
}

// First try to get all shards using List if we can.
buildResultFromList := func(kvpairs []KVInfo) (map[string]*ShardInfo, error) {
result := make(map[string]*ShardInfo, len(kvpairs))
for _, entry := range kvpairs {
// The shard key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard
shardKey := string(entry.Key)
shardName := path.Base(path.Dir(shardKey)) // The base part of the dir is "-80"
// Validate the extracted shard name.
if _, _, err := ValidateShardName(shardName); err != nil {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q",
keyspace, shardKey, shardName)
}
shard := &topodatapb.Shard{}
if err := shard.UnmarshalVT(entry.Value); err != nil {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): invalid data found for shard %q in %q",
keyspace, shardName, shardKey)
}
result[shardName] = &ShardInfo{
keyspace: keyspace,
shardName: shardName,
version: entry.Version,
Shard: shard,
}
}
return result, nil
}
shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath)
listRes, err := ts.globalCell.List(ctx, shardsPath)
if err == nil { // We have everything we need to build the result
return buildResultFromList(listRes)
}
if IsErrType(err, NoNode) {
// The path doesn't exist, let's see if the keyspace exists.
if _, kerr := ts.GetKeyspace(ctx, keyspace); kerr != nil {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace)
}
// We simply have no shards.
return make(map[string]*ShardInfo, 0), nil
}
// Currently the ZooKeeper implementation does not support index prefix
// scans so we fall back to concurrently fetching the shards one by one.
// It is also possible that the response containing all shards is too
// large in which case we also fall back to the one by one fetch.
if !IsErrType(err, NoImplementation) && !IsErrType(err, ResourceExhausted) {
return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace)
}

// Fall back to the shard by shard method.
shards, err := ts.GetShardNames(ctx, keyspace)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace)
return nil, vterrors.Wrapf(err, "failed to get list of shard names for keyspace '%s'", keyspace)
}

// Keyspaces with a large number of shards and geographically distributed
Expand All @@ -213,7 +278,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
)

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(opt.Concurrency)
eg.SetLimit(int(opt.Concurrency))

for _, shard := range shards {
shard := shard
Expand All @@ -222,7 +287,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
si, err := ts.GetShard(ctx, keyspace, shard)
switch {
case IsErrType(err, NoNode):
log.Warningf("GetShard(%v, %v) returned ErrNoNode, consider checking the topology.", keyspace, shard)
log.Warningf("GetShard(%s, %s) returned ErrNoNode, consider checking the topology.", keyspace, shard)
return nil
case err == nil:
mu.Lock()
Expand All @@ -231,7 +296,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,

return nil
default:
return vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard)
return vterrors.Wrapf(err, "GetShard(%s, %s) failed", keyspace, shard)
}
})
}
Expand All @@ -245,25 +310,26 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,

// GetServingShards returns all shards where the primary is serving.
func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) {
shards, err := ts.GetShardNames(ctx, keyspace)
shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace)
}

result := make([]*ShardInfo, 0, len(shards))
for _, shard := range shards {
si, err := ts.GetShard(ctx, keyspace, shard)
if err != nil {
return nil, vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard)
}
if !si.IsPrimaryServing {
if !shard.IsPrimaryServing {
continue
}
result = append(result, si)
result = append(result, shard)
}
if len(result) == 0 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%v has no serving shards", keyspace)
}
// Sort the shards by KeyRange for deterministic results.
sort.Slice(result, func(i, j int) bool {
return key.KeyRangeLess(result[i].KeyRange, result[j].KeyRange)
})

return result, nil
}

Expand Down
96 changes: 95 additions & 1 deletion go/vt/topo/keyspace_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package topo_test

import (
"context"
"fmt"
"slices"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func TestServerFindAllShardsInKeyspace(t *testing.T) {
Expand Down Expand Up @@ -87,3 +90,94 @@ func TestServerFindAllShardsInKeyspace(t *testing.T) {
})
}
}

func TestServerGetServingShards(t *testing.T) {
keyspace := "ks1"
errNoListImpl := topo.NewError(topo.NoImplementation, "don't be doing no listing round here")

tests := []struct {
shards int // Number of shards to create
err string // Error message we expect, if any
fallback bool // Should we fallback to the shard by shard method
}{
{
shards: 0,
err: fmt.Sprintf("%s has no serving shards", keyspace),
},
{
shards: 2,
},
{
shards: 128,
},
{
shards: 512,
fallback: true,
},
{
shards: 1024,
},
}

for _, tt := range tests {
t.Run(fmt.Sprintf("%d shards with fallback = %t", tt.shards, tt.fallback), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts, factory := memorytopo.NewServerAndFactory(ctx)
defer ts.Close()
stats := factory.GetCallStats()
require.NotNil(t, stats)

if tt.fallback {
factory.SetListError(errNoListImpl)
}

err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})
require.NoError(t, err)
var shardNames []string
if tt.shards > 0 {
shardNames, err = key.GenerateShardRanges(tt.shards)
require.NoError(t, err)
for _, shardName := range shardNames {
err = ts.CreateShard(ctx, keyspace, shardName)
require.NoError(t, err)
}
}

// Verify that we return a complete list of shards and that each
// key range is present in the output.
stats.ResetAll() // We only want the stats for GetServingShards
shardInfos, err := ts.GetServingShards(ctx, keyspace)
if tt.err != "" {
require.EqualError(t, err, tt.err)
return
}
require.NoError(t, err)
require.Len(t, shardInfos, tt.shards)
for _, shardName := range shardNames {
f := func(si *topo.ShardInfo) bool {
return key.KeyRangeString(si.Shard.KeyRange) == shardName
}
require.True(t, slices.ContainsFunc(shardInfos, f), "shard %q was not found in the results",
shardName)
}

// Now we check the stats based on the number of shards and whether or not
// we should have had a List error and fell back to the shard by shard method.
callcounts := stats.Counts()
require.NotNil(t, callcounts)
require.Equal(t, int64(1), callcounts["List"]) // We should always try
switch {
case tt.fallback: // We get the shards one by one from the list
require.Equal(t, int64(1), callcounts["ListDir"]) // GetShardNames
require.Equal(t, int64(tt.shards), callcounts["Get"]) // GetShard
case tt.shards < 1: // We use a Get to check that the keyspace exists
require.Equal(t, int64(0), callcounts["ListDir"])
require.Equal(t, int64(1), callcounts["Get"])
default: // We should not make any ListDir or Get calls
require.Equal(t, int64(0), callcounts["ListDir"])
require.Equal(t, int64(0), callcounts["Get"])
}
})
}
}
2 changes: 2 additions & 0 deletions go/vt/topo/memorytopo/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

// ListDir is part of the topo.Conn interface.
func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.DirEntry, error) {
c.factory.callstats.Add([]string{"ListDir"}, 1)

if err := c.dial(ctx); err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion go/vt/topo/memorytopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"vitess.io/vitess/go/vt/topo"
)

// NewLeaderParticipation is part of the topo.Server interface
// NewLeaderParticipation is part of the topo.Conn interface.
func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) {
c.factory.callstats.Add([]string{"NewLeaderParticipation"}, 1)

if c.closed {
return nil, ErrConnectionClosed
}
Expand Down
Loading

0 comments on commit c156ca2

Please sign in to comment.