Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtorc: fetch shard names only every --topo-information-refresh-duration #17319

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
20 changes: 14 additions & 6 deletions go/vt/vtorc/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ CREATE TABLE vitess_keyspace (
keyspace varchar(128) NOT NULL,
keyspace_type smallint(5) NOT NULL,
durability_policy varchar(512) NOT NULL,
updated_timestamp timestamp NOT NULL,
PRIMARY KEY (keyspace)
)`,
`
Expand All @@ -305,24 +306,31 @@ CREATE TABLE vitess_shard (
shard varchar(128) NOT NULL,
primary_alias varchar(512) NOT NULL,
primary_timestamp varchar(512) NOT NULL,
updated_timestamp timestamp NOT NULL,
PRIMARY KEY (keyspace, shard)
)`,
`
CREATE INDEX source_host_port_idx_database_instance_database_instance on database_instance (source_host, source_port)
CREATE INDEX source_host_port_idx_database_instance_database_instance ON database_instance (source_host, source_port)
`,
`
CREATE INDEX keyspace_shard_idx_topology_recovery on topology_recovery (keyspace, shard)
CREATE INDEX keyspace_shard_idx_topology_recovery ON topology_recovery (keyspace, shard)
`,
`
CREATE INDEX end_recovery_idx_topology_recovery on topology_recovery (end_recovery)
CREATE INDEX end_recovery_idx_topology_recovery ON topology_recovery (end_recovery)
`,
`
CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog on database_instance_analysis_changelog (alias, analysis_timestamp)
CREATE INDEX instance_timestamp_idx_database_instance_analysis_changelog ON database_instance_analysis_changelog (alias, analysis_timestamp)
`,
`
CREATE INDEX detection_idx_topology_recovery on topology_recovery (detection_id)
CREATE INDEX detection_idx_topology_recovery ON topology_recovery (detection_id)
`,
`
CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps(recovery_id)
CREATE INDEX recovery_id_idx_topology_recovery_steps ON topology_recovery_steps (recovery_id)
`,
`
CREATE INDEX updated_timestamp_idx_vitess_keyspace ON vitess_keyspace (updated_timestamp)
`,
`
CREATE INDEX updated_timestamp_idx_vitess_shard ON vitess_shard (updated_timestamp)
`,
}
4 changes: 2 additions & 2 deletions go/vt/vtorc/inst/analysis_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ var (
`INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_tablet VALUES('zone2-0000000200','localhost',6756,'ks','0','zone2',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653222207569643a3230307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363735357d20706f72745f6d61703a7b6b65793a227674222076616c75653a363735347d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363735362064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`,
`INSERT INTO vitess_shard VALUES('ks','0','zone1-0000000101','2022-12-28 07:23:25.129898+00:00');`,
`INSERT INTO vitess_keyspace VALUES('ks',0,'semi_sync');`,
`INSERT INTO vitess_shard VALUES('ks','0','zone1-0000000101','2022-12-28 07:23:25.129898+00:00','2022-12-28 07:23:25.129898+00:00');`,
`INSERT INTO vitess_keyspace VALUES('ks',0,'semi_sync','2022-12-28 07:23:25.129898+00:00');`,
}
)

Expand Down
43 changes: 26 additions & 17 deletions go/vt/vtorc/inst/keyspace_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package inst

import (
"errors"
"time"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -35,14 +36,12 @@ func ReadKeyspace(keyspaceName string) (*topo.KeyspaceInfo, error) {
return nil, err
}

query := `
select
keyspace_type,
durability_policy
from
vitess_keyspace
where keyspace=?
`
query := `SELECT
keyspace_type,
durability_policy
FROM
vitess_keyspace
WHERE keyspace = ?`
args := sqlutils.Args(keyspaceName)
keyspace := &topo.KeyspaceInfo{
Keyspace: &topodatapb.Keyspace{},
Expand All @@ -63,18 +62,28 @@ func ReadKeyspace(keyspaceName string) (*topo.KeyspaceInfo, error) {
}

// SaveKeyspace saves the keyspace record against the keyspace name.
func SaveKeyspace(keyspace *topo.KeyspaceInfo) error {
_, err := db.ExecVTOrc(`
replace
into vitess_keyspace (
keyspace, keyspace_type, durability_policy
) values (
?, ?, ?
)
`,
func SaveKeyspace(keyspace *topo.KeyspaceInfo, updatedTimestamp time.Time) error {
_, err := db.ExecVTOrc(`REPLACE
INTO vitess_keyspace (
keyspace, keyspace_type, durability_policy, updated_timestamp
) VALUES (
?, ?, ?, DATETIME(?, 'unixepoch')
)`,
keyspace.KeyspaceName(),
int(keyspace.KeyspaceType),
keyspace.GetDurabilityPolicy(),
updatedTimestamp.Unix(),
)
return err
}

// DeleteStaleKeyspaces deletes keyspace records that have not been updated since a provided time.
func DeleteStaleKeyspaces(staleTime time.Time) error {
_, err := db.ExecVTOrc(`DELETE FROM vitess_keyspace
WHERE
updated_timestamp <= DATETIME(?, 'unixepoch')
`,
staleTime.Unix(),
)
return err
}
Expand Down
35 changes: 35 additions & 0 deletions go/vt/vtorc/inst/keyspace_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -99,7 +100,7 @@
Keyspace: tt.keyspace,
}
keyspaceInfo.SetKeyspaceName(tt.keyspaceName)
err := SaveKeyspace(keyspaceInfo)

Check failure on line 103 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

not enough arguments in call to SaveKeyspace

Check failure on line 103 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (Race)

not enough arguments in call to SaveKeyspace

Check failure on line 103 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql57)

not enough arguments in call to SaveKeyspace

Check failure on line 103 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql84)

not enough arguments in call to SaveKeyspace

Check failure on line 103 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql80)

not enough arguments in call to SaveKeyspace

Check failure on line 103 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

not enough arguments in call to SaveKeyspace
require.NoError(t, err)
}

Expand All @@ -124,3 +125,37 @@
})
}
}

func TestDeleteStaleKeyspaces(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

keyspaceInfo := &topo.KeyspaceInfo{
Keyspace: &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
DurabilityPolicy: "none",
BaseKeyspace: "baseKeyspace",
},
}
keyspaceInfo.SetKeyspaceName(t.Name())
err := SaveKeyspace(keyspaceInfo)

Check failure on line 143 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

not enough arguments in call to SaveKeyspace

Check failure on line 143 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (Race)

not enough arguments in call to SaveKeyspace

Check failure on line 143 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql57)

not enough arguments in call to SaveKeyspace

Check failure on line 143 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql84)

not enough arguments in call to SaveKeyspace

Check failure on line 143 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql80)

not enough arguments in call to SaveKeyspace

Check failure on line 143 in go/vt/vtorc/inst/keyspace_dao_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

not enough arguments in call to SaveKeyspace
require.NoError(t, err)

readKeyspaceInfo, err := ReadKeyspace(t.Name())
require.NoError(t, err)
require.NotNil(t, readKeyspaceInfo)

// test a staletime before save causes no delete
require.NoError(t, DeleteStaleKeyspaces(time.Now().Add(-time.Hour)))
readKeyspaceInfo, err = ReadKeyspace(t.Name())
require.NoError(t, err)
require.NotNil(t, readKeyspaceInfo)

// test statetime of now deletes everything
require.NoError(t, DeleteStaleKeyspaces(time.Now()))
readKeyspaceInfo, err = ReadKeyspace(t.Name())
require.Error(t, err)
require.Nil(t, readKeyspaceInfo)
}
58 changes: 47 additions & 11 deletions go/vt/vtorc/inst/shard_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package inst

import (
"errors"
"time"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
Expand All @@ -38,13 +39,12 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s
return
}

query := `
select
query := `SELECT
primary_alias, primary_timestamp
from
FROM
vitess_shard
where keyspace=? and shard=?
`
WHERE
keyspace = ? AND shard = ?`
args := sqlutils.Args(keyspaceName, shardName)
shardFound := false
err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error {
Expand All @@ -62,20 +62,56 @@ func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias s
return primaryAlias, primaryTimestamp, nil
}

// GetAllShardNames returns the names of all keyspace/shards.
func GetAllShardNames() (map[string][]string, error) {
shards := make(map[string][]string, 0)
query := `SELECT keyspace, shard FROM vitess_shard`
err := db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error {
keyspace := row.GetString("keyspace")
shards[keyspace] = append(shards[keyspace], row.GetString("shard"))
return nil
})
return shards, err
}

// GetKeyspaceShardNames returns the names of all shards in a keyspace.
func GetKeyspaceShardNames(keyspaceName string) ([]string, error) {
shards := make([]string, 0)
query := `SELECT shard FROM vitess_shard WHERE keyspace = ?`
args := sqlutils.Args(keyspaceName)
err := db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error {
shards = append(shards, row.GetString("shard"))
return nil
})
return shards, err
}

// SaveShard saves the shard record against the shard name.
func SaveShard(shard *topo.ShardInfo) error {
func SaveShard(shard *topo.ShardInfo, updatedTimestamp time.Time) error {
_, err := db.ExecVTOrc(`
replace
into vitess_shard (
keyspace, shard, primary_alias, primary_timestamp
) values (
?, ?, ?, ?
REPLACE
INTO vitess_shard (
keyspace, shard, primary_alias, primary_timestamp, updated_timestamp
) VALUES (
?, ?, ?, ?, DATETIME(?, 'unixepoch')
)
`,
shard.Keyspace(),
shard.ShardName(),
getShardPrimaryAliasString(shard),
getShardPrimaryTermStartTimeString(shard),
updatedTimestamp.Unix(),
)
return err
}

// DeleteStaleShards deletes shard records that have not been updated since a provided time.
func DeleteStaleShards(staleTime time.Time) error {
_, err := db.ExecVTOrc(`DELETE FROM vitess_shard
WHERE
updated_timestamp <= DATETIME(?, 'unixepoch')
`,
staleTime.Unix(),
)
return err
}
Expand Down
60 changes: 60 additions & 0 deletions go/vt/vtorc/inst/shard_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
t.Run(tt.name, func(t *testing.T) {
if tt.shard != nil {
shardInfo := topo.NewShardInfo(tt.keyspaceName, tt.shardName, tt.shard, nil)
err := SaveShard(shardInfo)

Check failure on line 92 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

not enough arguments in call to SaveShard

Check failure on line 92 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (Race)

not enough arguments in call to SaveShard

Check failure on line 92 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql57)

not enough arguments in call to SaveShard

Check failure on line 92 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql84)

not enough arguments in call to SaveShard

Check failure on line 92 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql80)

not enough arguments in call to SaveShard

Check failure on line 92 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

not enough arguments in call to SaveShard
require.NoError(t, err)
}

Expand All @@ -104,3 +104,63 @@
})
}
}

func TestGetAllShardNames(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

shardInfo := topo.NewShardInfo("ks1", "-80", &topodatapb.Shard{}, nil)
err := SaveShard(shardInfo)

Check failure on line 115 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

not enough arguments in call to SaveShard

Check failure on line 115 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (Race)

not enough arguments in call to SaveShard

Check failure on line 115 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql57)

not enough arguments in call to SaveShard

Check failure on line 115 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql84)

not enough arguments in call to SaveShard

Check failure on line 115 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql80)

not enough arguments in call to SaveShard

Check failure on line 115 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

not enough arguments in call to SaveShard
require.NoError(t, err)

shardNames, err := GetAllShardNames()
require.NoError(t, err)
require.Equal(t, map[string][]string{
"ks1": {"-80"},
}, shardNames)
}

func TestGetKeyspaceShardNames(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

for _, shardName := range []string{"-80", "80-"} {
shardInfo := topo.NewShardInfo("ks1", shardName, &topodatapb.Shard{}, nil)
err := SaveShard(shardInfo)

Check failure on line 133 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

not enough arguments in call to SaveShard

Check failure on line 133 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (Race)

not enough arguments in call to SaveShard

Check failure on line 133 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql57)

not enough arguments in call to SaveShard

Check failure on line 133 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql84)

not enough arguments in call to SaveShard

Check failure on line 133 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql80)

not enough arguments in call to SaveShard

Check failure on line 133 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

not enough arguments in call to SaveShard
require.NoError(t, err)
}

shardNames, err := GetKeyspaceShardNames("ks1")
require.NoError(t, err)
require.Equal(t, []string{"-80", "80-"}, shardNames)
}

func TestDeleteStaleShards(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

shardInfo := topo.NewShardInfo("ks1", "-80", &topodatapb.Shard{}, nil)
err := SaveShard(shardInfo)

Check failure on line 149 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

not enough arguments in call to SaveShard

Check failure on line 149 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (Race)

not enough arguments in call to SaveShard

Check failure on line 149 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql57)

not enough arguments in call to SaveShard

Check failure on line 149 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql84)

not enough arguments in call to SaveShard

Check failure on line 149 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Unit Test (mysql80)

not enough arguments in call to SaveShard

Check failure on line 149 in go/vt/vtorc/inst/shard_dao_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

not enough arguments in call to SaveShard
require.NoError(t, err)
shards, err := GetAllShardNames()
require.NoError(t, err)
require.Len(t, shards, 1)

// test a staletime before save causes no delete
require.NoError(t, DeleteStaleShards(time.Now().Add(-time.Hour)))
shards, err = GetAllShardNames()
require.NoError(t, err)
require.Len(t, shards, 1)

// test statetime of now deletes everything
require.NoError(t, DeleteStaleShards(time.Now()))
shards, err = GetAllShardNames()
require.NoError(t, err)
require.Len(t, shards, 0)
}
3 changes: 1 addition & 2 deletions go/vt/vtorc/inst/tablet_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func ReadTablet(tabletAlias string) (*topodatapb.Tablet, error) {
FROM
vitess_tablet
WHERE
alias = ?
`
alias = ?`
args := sqlutils.Args(tabletAlias)
tablet := &topodatapb.Tablet{}
opts := prototext.UnmarshalOptions{DiscardUnknown: true}
Expand Down
Loading
Loading