diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index f997dc6ac0a..34bf2506de3 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -305,24 +305,28 @@ CREATE TABLE vitess_shard ( shard varchar(128) NOT NULL, primary_alias varchar(512) NOT NULL, primary_timestamp varchar(512) NOT NULL, + updated_timestamp timestamp NOT NULL, PRIMARY KEY (keyspace, shard) )`, ` -CREATE INDEX source_host_port_idx_database_instance_database_instance on database_instance (source_host, source_port) +CREATE INDEX source_host_port_idx_database_instance_database_instance ON database_instance (source_host, source_port) `, ` -CREATE INDEX keyspace_shard_idx_topology_recovery on topology_recovery (keyspace, shard) +CREATE INDEX keyspace_shard_idx_topology_recovery ON topology_recovery (keyspace, shard) `, ` -CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recovery) +CREATE INDEX end_recovery_idx_topology_recovery ON topology_recovery (end_recovery) `, ` -CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp) +CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog ON database_instance_analysis_changelog (alias, analysis_timestamp) `, ` -CREATE INDEX detection_idx_topology_recovery on topology_recovery (detection_id) +CREATE INDEX detection_idx_topology_recovery ON topology_recovery (detection_id) `, ` -CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id) +CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps (recovery_id) + `, + ` +CREATE INDEX keyspace_updated_timestamp_idx_vitess_shard ON vitess_shard (keyspace, updated_timestamp) `, } diff --git a/go/vt/vtorc/inst/shard_dao.go b/go/vt/vtorc/inst/shard_dao.go index a90eed0f509..f7fdc0dafc0 100644 --- a/go/vt/vtorc/inst/shard_dao.go +++ b/go/vt/vtorc/inst/shard_dao.go @@ -18,6 +18,7 @@ package inst import ( "errors" + "time" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/vt/external/golib/sqlutils" @@ -38,13 +39,12 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s return } - query := ` - select + query := `SELECT primary_alias, primary_timestamp - from + FROM vitess_shard - where keyspace=? and shard=? - ` + WHERE + keyspace = ? AND shard = ?` args := sqlutils.Args(keyspaceName, shardName) shardFound := false err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { @@ -62,14 +62,38 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s return primaryAlias, primaryTimestamp, nil } +// GetAllShardNames returns the names of all keyspace/shards. +func GetAllShardNames() (map[string][]string, error) { + shards := make(map[string][]string, 0) + query := `SELECT keyspace, shard FROM vitess_shard` + err := db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error { + keyspace := row.GetString("keyspace") + shards[keyspace] = append(shards[keyspace], row.GetString("shard")) + return nil + }) + return shards, err +} + +// GetKeyspaceShardNames returns the names of all shards in a keyspace. +func GetKeyspaceShardNames(keyspaceName string) ([]string, error) { + shards := make([]string, 0) + query := `SELECT shard FROM vitess_shard WHERE keyspace = ?` + args := sqlutils.Args(keyspaceName) + err := db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { + shards = append(shards, row.GetString("shard")) + return nil + }) + return shards, err +} + // SaveShard saves the shard record against the shard name. func SaveShard(shard *topo.ShardInfo) error { _, err := db.ExecVTOrc(` - replace - into vitess_shard ( - keyspace, shard, primary_alias, primary_timestamp - ) values ( - ?, ?, ?, ? + REPLACE + INTO vitess_shard ( + keyspace, shard, primary_alias, primary_timestamp, updated_timestamp + ) VALUES ( + ?, ?, ?, ?, DATETIME('now') ) `, shard.Keyspace(), @@ -80,6 +104,20 @@ func SaveShard(shard *topo.ShardInfo) error { return err } +// DeleteStaleKeyspaceShards deletes shard records that have not been updated since a provided time. +func DeleteStaleKeyspaceShards(keyspace string, staleTime time.Time) error { + _, err := db.ExecVTOrc(`DELETE FROM vitess_shard + WHERE + keyspace = ? + AND + updated_timestamp < DATETIME(?, 'unixepoch') + `, + keyspace, + staleTime.Unix(), + ) + return err +} + // getShardPrimaryAliasString gets the shard primary alias to be stored as a string in the database. func getShardPrimaryAliasString(shard *topo.ShardInfo) string { if shard.PrimaryAlias == nil { diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index 84f6aef7a4a..bbc781ae40c 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -104,3 +104,37 @@ func TestSaveAndReadShard(t *testing.T) { }) } } + +func TestGetAllShardNames(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + shardInfo := topo.NewShardInfo("ks1", "-80", &topodatapb.Shard{}, nil) + err := SaveShard(shardInfo) + require.NoError(t, err) + + shardNames, err := GetAllShardNames() + require.NoError(t, err) + require.Equal(t, map[string][]string{ + "ks1": {"-80"}, + }, shardNames) +} + +func TestGetKeyspaceShardNames(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + for _, shardName := range []string{"-80", "80-"} { + shardInfo := topo.NewShardInfo("ks1", shardName, &topodatapb.Shard{}, nil) + err := SaveShard(shardInfo) + require.NoError(t, err) + } + + shardNames, err := GetKeyspaceShardNames("ks1") + require.NoError(t, err) + require.Equal(t, []string{"-80", "80-"}, shardNames) +} diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 31b525e4665..4473e993c37 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -21,6 +21,7 @@ import ( "sort" "strings" "sync" + "time" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -28,35 +29,26 @@ import ( "vitess.io/vitess/go/vt/vtorc/inst" ) -var ( - // keyspaceShardNames stores the current names of shards by keyspace. - keyspaceShardNames = make(map[string][]string) - keyspaceShardNamesMu sync.Mutex - statsKeyspaceShardsWatched = stats.NewGaugesFuncWithMultiLabels("KeyspaceShardsWatched", - "The keyspace/shards watched by VTOrc", - []string{"Keyspace", "Shard"}, - getKeyspaceShardsStats, - ) +var statsKeyspaceShardsWatched = stats.NewGaugesFuncWithMultiLabels("KeyspaceShardsWatched", + "The keyspace/shards watched by VTOrc", + []string{"Keyspace", "Shard"}, + getKeyspaceShardsStats, ) // getKeyspaceShardsStats returns the current keyspace/shards watched in stats format. func getKeyspaceShardsStats() map[string]int64 { - keyspaceShardNamesMu.Lock() - defer keyspaceShardNamesMu.Unlock() - keyspaceShards := make(map[string]int64) - for ks, shards := range keyspaceShardNames { + ksShardNames, err := inst.GetAllShardNames() + if err != nil { + log.Errorf("Failed to get shards from backend: %+v", err) + return nil + } + stats := make(map[string]int64, 0) + for keyspace, shards := range ksShardNames { for _, shard := range shards { - keyspaceShards[ks+"."+shard] = 1 + stats[keyspace+"."+shard] = 1 } } - return keyspaceShards -} - -// GetKeyspaceShardNames returns the names of the shards in a given keyspace. -func GetKeyspaceShardNames(keyspaceName string) []string { - keyspaceShardNamesMu.Lock() - defer keyspaceShardNamesMu.Unlock() - return keyspaceShardNames[keyspaceName] + return stats } // RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. @@ -166,23 +158,15 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { log.Error(err) return err } - - shardNames := make([]string, 0, len(shardInfos)) - for shardName, shardInfo := range shardInfos { + beginSaveTime := time.Now() + for _, shardInfo := range shardInfos { err = inst.SaveShard(shardInfo) if err != nil { log.Error(err) return err } - shardNames = append(shardNames, shardName) } - sort.Strings(shardNames) - - keyspaceShardNamesMu.Lock() - defer keyspaceShardNamesMu.Unlock() - keyspaceShardNames[keyspaceName] = shardNames - - return nil + return inst.DeleteStaleKeyspaceShards(keyspace, beginSaveTime) } // refreshSingleShardHelper is a helper function that refreshes the shard record of the given keyspace/shard. diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 82370560561..9763551b928 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -51,13 +51,26 @@ var ( } ) -func TestRefreshAllKeyspaces(t *testing.T) { - // reset keyspaceShardNames - keyspaceShardNames = make(map[string][]string) +func TestGetKeyspaceShardsStats(t *testing.T) { + db.ClearVTOrcDatabase() defer func() { - keyspaceShardNames = make(map[string][]string) + db.ClearVTOrcDatabase() }() + for _, shardName := range []string{"-80", "80-"} { + shardInfo := topo.NewShardInfo("ks1", shardName, &topodatapb.Shard{}, nil) + err := inst.SaveShard(shardInfo) + require.NoError(t, err) + } + + // test using the metric var that calls getKeyspaceShardsStats() + require.Equal(t, map[string]int64{ + "ks1.-80": 1, + "ks1.80-": 1, + }, statsKeyspaceShardsWatched.Counts()) +} + +func TestRefreshAllKeyspaces(t *testing.T) { // Store the old flags and restore on test completion oldTs := ts oldClustersToWatch := clustersToWatch @@ -125,17 +138,15 @@ func TestRefreshAllKeyspaces(t *testing.T) { verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "") verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "") - // Confirm caching of shard names + // Confirm GetAllShardNames + keyspaceShardNames, err := inst.GetAllShardNames() + require.NoError(t, err) require.Equal(t, map[string][]string{ "ks1": {"-80", "80-"}, "ks2": {"-80", "80-"}, "ks3": {"-80", "80-"}, "ks4": {"-80", "80-"}, }, keyspaceShardNames) - for _, ksName := range keyspaceNames { - require.Equal(t, []string{"-80", "80-"}, GetKeyspaceShardNames(ksName)) - } - require.Len(t, GetKeyspaceShardNames("does-not-exist"), 0) } func TestRefreshKeyspace(t *testing.T) { diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index ab3cc2ae44b..6641db5ecde 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -113,7 +113,11 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f input := strings.Split(ks, "/") keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]}) } else { - shards := GetKeyspaceShardNames(ks) + shards, err := inst.GetKeyspaceShardNames(ks) + if err != nil { + log.Errorf("Failed to get shards for ks %s: %+v", ks, err) + continue + } if len(shards) == 0 { log.Errorf("Topo has no shards for ks: %v", ks) continue