Skip to content

Commit

Permalink
more tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Dec 17, 2024
1 parent 43c5eeb commit 343f434
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 19 deletions.
6 changes: 5 additions & 1 deletion go/vt/vtorc/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ CREATE TABLE vitess_keyspace (
keyspace varchar(128) NOT NULL,
keyspace_type smallint(5) NOT NULL,
durability_policy varchar(512) NOT NULL,
updated_timestamp timestamp NOT NULL,
PRIMARY KEY (keyspace)
)`,
`
Expand Down Expand Up @@ -327,6 +328,9 @@ CREATE INDEX detection_idx_topology_recovery ON topology_recovery (detection_id)
CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps (recovery_id)
`,
`
CREATE INDEX keyspace_updated_timestamp_idx_vitess_shard ON vitess_shard (keyspace, updated_timestamp)
CREATE INDEX updated_timestamp_idx_vitess_keyspace ON vitess_keyspace (updated_timestamp)
`,
`
CREATE INDEX updated_timestamp_idx_vitess_shard ON vitess_shard (updated_timestamp)
`,
}
49 changes: 31 additions & 18 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ import (
"context"
"sort"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtorc/inst"
)

var lastAllKeyspaceShardsRefreshTime time.Time

var statsKeyspaceShardsWatched = stats.NewGaugesFuncWithMultiLabels("KeyspaceShardsWatched",
"The keyspace/shards watched by VTOrc",
[]string{"Keyspace", "Shard"},
Expand All @@ -53,13 +56,13 @@ func getKeyspaceShardsStats() map[string]int64 {

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
var err error
var keyspaces []string
if len(clustersToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var err error
getCtx, getCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getCancel()
// Get all the keyspaces
keyspaces, err = ts.GetKeyspaces(ctx)
keyspaces, err = ts.GetKeyspaces(getCtx)
if err != nil {
return err
}
Expand All @@ -84,27 +87,38 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error {
// Sort the list of keyspaces.
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
sort.Strings(keyspaces)

refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup

eg, egCtx := errgroup.WithContext(refreshCtx)
for idx, keyspace := range keyspaces {
// Check if the current keyspace name is the same as the last one.
// If it is, then we know we have already refreshed its information.
// We do not need to do it again.
if idx != 0 && keyspace == keyspaces[idx-1] {
continue
}
wg.Add(2)
go func(keyspace string) {
defer wg.Done()
_ = refreshKeyspaceHelper(refreshCtx, keyspace)
}(keyspace)
go func(keyspace string) {
defer wg.Done()
_ = refreshAllShards(refreshCtx, keyspace)
}(keyspace)
eg.Go(func() error {
return refreshKeyspaceHelper(egCtx, keyspace)
})
eg.Go(func() error {
return refreshAllShards(egCtx, keyspace)
})
}

if err = eg.Wait(); err == nil {
// delete stale records from the previous success or older
if staleTime := lastAllKeyspaceShardsRefreshTime; !staleTime.IsZero() {
if err := inst.DeleteStaleShards(staleTime); err != nil {
return err
}
if err := inst.DeleteStaleKeyspaces(staleTime); err != nil {
return err
}
}
lastAllKeyspaceShardsRefreshTime = time.Now()
}
wg.Wait()

return nil
}
Expand Down Expand Up @@ -158,15 +172,14 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
log.Error(err)
return err
}
beginSaveTime := time.Now()
for _, shardInfo := range shardInfos {
err = inst.SaveShard(shardInfo)
if err != nil {
log.Error(err)
return err
}
}
return inst.DeleteStaleKeyspaceShards(keyspaceName, beginSaveTime)
return nil
}

// refreshSingleShardHelper is a helper function that refreshes the shard record of the given keyspace/shard.
Expand Down

0 comments on commit 343f434

Please sign in to comment.