diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index f997dc6ac0a..6538175dc14 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -294,6 +294,7 @@ CREATE TABLE vitess_keyspace ( keyspace varchar(128) NOT NULL, keyspace_type smallint(5) NOT NULL, durability_policy varchar(512) NOT NULL, + updated_timestamp timestamp NOT NULL, PRIMARY KEY (keyspace) )`, ` @@ -305,24 +306,31 @@ CREATE TABLE vitess_shard ( shard varchar(128) NOT NULL, primary_alias varchar(512) NOT NULL, primary_timestamp varchar(512) NOT NULL, + updated_timestamp timestamp NOT NULL, PRIMARY KEY (keyspace, shard) )`, ` -CREATE INDEX source_host_port_idx_database_instance_database_instance on database_instance (source_host, source_port) +CREATE INDEX source_host_port_idx_database_instance_database_instance ON database_instance (source_host, source_port) `, ` -CREATE INDEX keyspace_shard_idx_topology_recovery on topology_recovery (keyspace, shard) +CREATE INDEX keyspace_shard_idx_topology_recovery ON topology_recovery (keyspace, shard) `, ` -CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recovery) +CREATE INDEX end_recovery_idx_topology_recovery ON topology_recovery (end_recovery) `, ` -CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp) +CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog ON database_instance_analysis_changelog (alias, analysis_timestamp) `, ` -CREATE INDEX detection_idx_topology_recovery on topology_recovery (detection_id) +CREATE INDEX detection_idx_topology_recovery ON topology_recovery (detection_id) `, ` -CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id) +CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps (recovery_id) + `, + ` +CREATE INDEX updated_timestamp_idx_vitess_keyspace ON vitess_keyspace (updated_timestamp) + `, + ` +CREATE INDEX updated_timestamp_idx_vitess_shard ON vitess_shard (updated_timestamp) `, } diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index c061d54ebb3..00ed4d4e9df 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -41,8 +41,8 @@ var ( `INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, `INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, `INSERT INTO vitess_tablet VALUES('zone2-0000000200','localhost',6756,'ks','0','zone2',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653222207569643a3230307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363735357d20706f72745f6d61703a7b6b65793a227674222076616c75653a363735347d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363735362064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, - `INSERT INTO vitess_shard VALUES('ks','0','zone1-0000000101','2022-12-28 07:23:25.129898+00:00');`, - `INSERT INTO vitess_keyspace VALUES('ks',0,'semi_sync');`, + `INSERT INTO vitess_shard VALUES('ks','0','zone1-0000000101','2022-12-28 07:23:25.129898+00:00','2022-12-28 07:23:25.129898+00:00');`, + `INSERT INTO vitess_keyspace VALUES('ks',0,'semi_sync','2022-12-28 07:23:25.129898+00:00');`, } ) diff --git a/go/vt/vtorc/inst/keyspace_dao.go b/go/vt/vtorc/inst/keyspace_dao.go index d764e3fc56a..36810bf2d5f 100644 --- a/go/vt/vtorc/inst/keyspace_dao.go +++ b/go/vt/vtorc/inst/keyspace_dao.go @@ -18,6 +18,7 @@ package inst import ( "errors" + "time" "vitess.io/vitess/go/vt/external/golib/sqlutils" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -35,14 +36,12 @@ func ReadKeyspace(keyspaceName string) (*topo.KeyspaceInfo, error) { return nil, err } - query := ` - select - keyspace_type, - durability_policy - from - vitess_keyspace - where keyspace=? - ` + query := `SELECT + keyspace_type, + durability_policy + FROM + vitess_keyspace + WHERE keyspace = ?` args := sqlutils.Args(keyspaceName) keyspace := &topo.KeyspaceInfo{ Keyspace: &topodatapb.Keyspace{}, @@ -63,18 +62,28 @@ func ReadKeyspace(keyspaceName string) (*topo.KeyspaceInfo, error) { } // SaveKeyspace saves the keyspace record against the keyspace name. -func SaveKeyspace(keyspace *topo.KeyspaceInfo) error { - _, err := db.ExecVTOrc(` - replace - into vitess_keyspace ( - keyspace, keyspace_type, durability_policy - ) values ( - ?, ?, ? - ) - `, +func SaveKeyspace(keyspace *topo.KeyspaceInfo, updatedTimestamp time.Time) error { + _, err := db.ExecVTOrc(`REPLACE + INTO vitess_keyspace ( + keyspace, keyspace_type, durability_policy, updated_timestamp + ) VALUES ( + ?, ?, ?, DATETIME(?, 'unixepoch') + )`, keyspace.KeyspaceName(), int(keyspace.KeyspaceType), keyspace.GetDurabilityPolicy(), + updatedTimestamp.Unix(), + ) + return err +} + +// DeleteStaleKeyspaces deletes keyspace records that have not been updated since a provided time. +func DeleteStaleKeyspaces(staleTime time.Time) error { + _, err := db.ExecVTOrc(`DELETE FROM vitess_keyspace + WHERE + updated_timestamp <= DATETIME(?, 'unixepoch') + `, + staleTime.Unix(), ) return err } diff --git a/go/vt/vtorc/inst/keyspace_dao_test.go b/go/vt/vtorc/inst/keyspace_dao_test.go index dda3ffaa9d2..fea625ce1b7 100644 --- a/go/vt/vtorc/inst/keyspace_dao_test.go +++ b/go/vt/vtorc/inst/keyspace_dao_test.go @@ -18,6 +18,7 @@ package inst import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -124,3 +125,37 @@ func TestSaveAndReadKeyspace(t *testing.T) { }) } } + +func TestDeleteStaleKeyspaces(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + keyspaceInfo := &topo.KeyspaceInfo{ + Keyspace: &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: "none", + BaseKeyspace: "baseKeyspace", + }, + } + keyspaceInfo.SetKeyspaceName(t.Name()) + err := SaveKeyspace(keyspaceInfo) + require.NoError(t, err) + + readKeyspaceInfo, err := ReadKeyspace(t.Name()) + require.NoError(t, err) + require.NotNil(t, readKeyspaceInfo) + + // test a staletime before save causes no delete + require.NoError(t, DeleteStaleKeyspaces(time.Now().Add(-time.Hour))) + readKeyspaceInfo, err = ReadKeyspace(t.Name()) + require.NoError(t, err) + require.NotNil(t, readKeyspaceInfo) + + // test statetime of now deletes everything + require.NoError(t, DeleteStaleKeyspaces(time.Now())) + readKeyspaceInfo, err = ReadKeyspace(t.Name()) + require.Error(t, err) + require.Nil(t, readKeyspaceInfo) +} diff --git a/go/vt/vtorc/inst/shard_dao.go b/go/vt/vtorc/inst/shard_dao.go index a90eed0f509..0c26d8d196b 100644 --- a/go/vt/vtorc/inst/shard_dao.go +++ b/go/vt/vtorc/inst/shard_dao.go @@ -18,6 +18,7 @@ package inst import ( "errors" + "time" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/vt/external/golib/sqlutils" @@ -38,13 +39,12 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s return } - query := ` - select + query := `SELECT primary_alias, primary_timestamp - from + FROM vitess_shard - where keyspace=? and shard=? - ` + WHERE + keyspace = ? AND shard = ?` args := sqlutils.Args(keyspaceName, shardName) shardFound := false err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { @@ -62,20 +62,56 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s return primaryAlias, primaryTimestamp, nil } +// GetAllShardNames returns the names of all keyspace/shards. +func GetAllShardNames() (map[string][]string, error) { + shards := make(map[string][]string, 0) + query := `SELECT keyspace, shard FROM vitess_shard` + err := db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error { + keyspace := row.GetString("keyspace") + shards[keyspace] = append(shards[keyspace], row.GetString("shard")) + return nil + }) + return shards, err +} + +// GetKeyspaceShardNames returns the names of all shards in a keyspace. +func GetKeyspaceShardNames(keyspaceName string) ([]string, error) { + shards := make([]string, 0) + query := `SELECT shard FROM vitess_shard WHERE keyspace = ?` + args := sqlutils.Args(keyspaceName) + err := db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { + shards = append(shards, row.GetString("shard")) + return nil + }) + return shards, err +} + // SaveShard saves the shard record against the shard name. -func SaveShard(shard *topo.ShardInfo) error { +func SaveShard(shard *topo.ShardInfo, updatedTimestamp time.Time) error { _, err := db.ExecVTOrc(` - replace - into vitess_shard ( - keyspace, shard, primary_alias, primary_timestamp - ) values ( - ?, ?, ?, ? + REPLACE + INTO vitess_shard ( + keyspace, shard, primary_alias, primary_timestamp, updated_timestamp + ) VALUES ( + ?, ?, ?, ?, DATETIME(?, 'unixepoch') ) `, shard.Keyspace(), shard.ShardName(), getShardPrimaryAliasString(shard), getShardPrimaryTermStartTimeString(shard), + updatedTimestamp.Unix(), + ) + return err +} + +// DeleteStaleShards deletes shard records that have not been updated since a provided time. +func DeleteStaleShards(staleTime time.Time) error { + _, err := db.ExecVTOrc(`DELETE FROM vitess_shard + WHERE + updated_timestamp <= DATETIME(?, 'unixepoch') + `, + staleTime.Unix(), ) return err } diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index 84f6aef7a4a..2d96b042d2d 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -104,3 +104,63 @@ func TestSaveAndReadShard(t *testing.T) { }) } } + +func TestGetAllShardNames(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + shardInfo := topo.NewShardInfo("ks1", "-80", &topodatapb.Shard{}, nil) + err := SaveShard(shardInfo) + require.NoError(t, err) + + shardNames, err := GetAllShardNames() + require.NoError(t, err) + require.Equal(t, map[string][]string{ + "ks1": {"-80"}, + }, shardNames) +} + +func TestGetKeyspaceShardNames(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + for _, shardName := range []string{"-80", "80-"} { + shardInfo := topo.NewShardInfo("ks1", shardName, &topodatapb.Shard{}, nil) + err := SaveShard(shardInfo) + require.NoError(t, err) + } + + shardNames, err := GetKeyspaceShardNames("ks1") + require.NoError(t, err) + require.Equal(t, []string{"-80", "80-"}, shardNames) +} + +func TestDeleteStaleShards(t *testing.T) { + // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. + defer func() { + db.ClearVTOrcDatabase() + }() + + shardInfo := topo.NewShardInfo("ks1", "-80", &topodatapb.Shard{}, nil) + err := SaveShard(shardInfo) + require.NoError(t, err) + shards, err := GetAllShardNames() + require.NoError(t, err) + require.Len(t, shards, 1) + + // test a staletime before save causes no delete + require.NoError(t, DeleteStaleShards(time.Now().Add(-time.Hour))) + shards, err = GetAllShardNames() + require.NoError(t, err) + require.Len(t, shards, 1) + + // test statetime of now deletes everything + require.NoError(t, DeleteStaleShards(time.Now())) + shards, err = GetAllShardNames() + require.NoError(t, err) + require.Len(t, shards, 0) +} diff --git a/go/vt/vtorc/inst/tablet_dao.go b/go/vt/vtorc/inst/tablet_dao.go index f48f2b97370..79ec0f98eed 100644 --- a/go/vt/vtorc/inst/tablet_dao.go +++ b/go/vt/vtorc/inst/tablet_dao.go @@ -61,8 +61,7 @@ func ReadTablet(tabletAlias string) (*topodatapb.Tablet, error) { FROM vitess_tablet WHERE - alias = ? - ` + alias = ?` args := sqlutils.Args(tabletAlias) tablet := &topodatapb.Tablet{} opts := prototext.UnmarshalOptions{DiscardUnknown: true} diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 0dd17cb65fd..02eb88309ca 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -20,23 +20,49 @@ import ( "context" "sort" "strings" - "sync" + "time" - "vitess.io/vitess/go/vt/log" + "golang.org/x/sync/errgroup" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtorc/inst" ) +var lastAllKeyspaceShardsRefreshTime time.Time + +var statsKeyspaceShardsWatched = stats.NewGaugesFuncWithMultiLabels("KeyspaceShardsWatched", + "The keyspace/shards watched by VTOrc", + []string{"Keyspace", "Shard"}, + getKeyspaceShardsStats, +) + +// getKeyspaceShardsStats returns the current keyspace/shards watched in stats format. +func getKeyspaceShardsStats() map[string]int64 { + ksShardNames, err := inst.GetAllShardNames() + if err != nil { + log.Errorf("Failed to get shards from backend: %+v", err) + return nil + } + stats := make(map[string]int64, 0) + for keyspace, shards := range ksShardNames { + for _, shard := range shards { + stats[keyspace+"."+shard] = 1 + } + } + return stats +} + // RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. func RefreshAllKeyspacesAndShards(ctx context.Context) error { + var err error var keyspaces []string if len(clustersToWatch) == 0 { // all known keyspaces - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - var err error + getCtx, getCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer getCancel() // Get all the keyspaces - keyspaces, err = ts.GetKeyspaces(ctx) + keyspaces, err = ts.GetKeyspaces(getCtx) if err != nil { return err } @@ -61,9 +87,11 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error { // Sort the list of keyspaces. // The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace sort.Strings(keyspaces) + refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() - var wg sync.WaitGroup + + eg, refreshCtx := errgroup.WithContext(refreshCtx) for idx, keyspace := range keyspaces { // Check if the current keyspace name is the same as the last one. // If it is, then we know we have already refreshed its information. @@ -71,17 +99,27 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error { if idx != 0 && keyspace == keyspaces[idx-1] { continue } - wg.Add(2) - go func(keyspace string) { - defer wg.Done() - _ = refreshKeyspaceHelper(refreshCtx, keyspace) - }(keyspace) - go func(keyspace string) { - defer wg.Done() - _ = refreshAllShards(refreshCtx, keyspace) - }(keyspace) + eg.Go(func() error { + return refreshKeyspaceHelper(refreshCtx, keyspace) + }) + eg.Go(func() error { + return refreshAllShards(refreshCtx, keyspace) + }) + } + + if err = eg.Wait(); err == nil { + // delete stale records from the previous success or older + now := time.Now() + if staleTime := lastAllKeyspaceShardsRefreshTime; !staleTime.IsZero() && now.Unix() > staleTime.Unix() { + if err := inst.DeleteStaleShards(staleTime); err != nil { + return err + } + if err := inst.DeleteStaleKeyspaces(staleTime); err != nil { + return err + } + } + lastAllKeyspaceShardsRefreshTime = now } - wg.Wait() return nil } @@ -116,7 +154,7 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { log.Error(err) return err } - err = inst.SaveKeyspace(keyspaceInfo) + err = inst.SaveKeyspace(keyspaceInfo, time.Now() /* updated_timestamp */) if err != nil { log.Error(err) } @@ -136,7 +174,7 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { return err } for _, shardInfo := range shardInfos { - err = inst.SaveShard(shardInfo) + err = inst.SaveShard(shardInfo, time.Now() /* updated_timestamp */) if err != nil { log.Error(err) return err @@ -152,7 +190,7 @@ func refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardNam log.Error(err) return err } - err = inst.SaveShard(shardInfo) + err = inst.SaveShard(shardInfo, time.Now() /* updated_timestamp */) if err != nil { log.Error(err) } diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 5cbe139728b..9763551b928 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -51,6 +51,25 @@ var ( } ) +func TestGetKeyspaceShardsStats(t *testing.T) { + db.ClearVTOrcDatabase() + defer func() { + db.ClearVTOrcDatabase() + }() + + for _, shardName := range []string{"-80", "80-"} { + shardInfo := topo.NewShardInfo("ks1", shardName, &topodatapb.Shard{}, nil) + err := inst.SaveShard(shardInfo) + require.NoError(t, err) + } + + // test using the metric var that calls getKeyspaceShardsStats() + require.Equal(t, map[string]int64{ + "ks1.-80": 1, + "ks1.80-": 1, + }, statsKeyspaceShardsWatched.Counts()) +} + func TestRefreshAllKeyspaces(t *testing.T) { // Store the old flags and restore on test completion oldTs := ts @@ -119,6 +138,15 @@ func TestRefreshAllKeyspaces(t *testing.T) { verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "") verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "") + // Confirm GetAllShardNames + keyspaceShardNames, err := inst.GetAllShardNames() + require.NoError(t, err) + require.Equal(t, map[string][]string{ + "ks1": {"-80", "80-"}, + "ks2": {"-80", "80-"}, + "ks3": {"-80", "80-"}, + "ks4": {"-80", "80-"}, + }, keyspaceShardNames) } func TestRefreshKeyspace(t *testing.T) { diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 7066229ab06..6641db5ecde 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -113,13 +113,9 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f input := strings.Split(ks, "/") keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]}) } else { - // Assume this is a keyspace and find all shards in keyspace - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - shards, err := ts.GetShardNames(ctx, ks) + shards, err := inst.GetKeyspaceShardNames(ks) if err != nil { - // Log the errr and continue - log.Errorf("Error fetching shards for keyspace: %v", ks) + log.Errorf("Failed to get shards for ks %s: %+v", ks, err) continue } if len(shards) == 0 {