From de469c1fef337c9cdd01e143d4fcb7ba77c1fb82 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 18 Feb 2024 11:19:30 -0500 Subject: [PATCH 01/25] Include a reference to a `*topo.Server` impl Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/topo_process.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index 776ed7da27e..0272b2cf065 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -33,6 +33,11 @@ import ( "vitess.io/vitess/go/vt/log" vtopo "vitess.io/vitess/go/vt/topo" + + // Register topo server implementations + _ "vitess.io/vitess/go/vt/topo/consultopo" + _ "vitess.io/vitess/go/vt/topo/etcd2topo" + _ "vitess.io/vitess/go/vt/topo/zk2topo" ) // TopoProcess is a generic handle for a running Topo service . @@ -51,6 +56,7 @@ type TopoProcess struct { PeerURL string ZKPorts string Client interface{} + Server *vtopo.Server proc *exec.Cmd exit chan error @@ -60,15 +66,22 @@ type TopoProcess struct { func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) { switch topoFlavor { case "zk2": - return topo.SetupZookeeper(cluster) + err = topo.SetupZookeeper(cluster) case "consul": - return topo.SetupConsul(cluster) + err = topo.SetupConsul(cluster) default: // Override any inherited ETCDCTL_API env value to // ensure that we use the v3 API and storage. os.Setenv("ETCDCTL_API", "3") - return topo.SetupEtcd() + err = topo.SetupEtcd() + } + + if err != nil { + return err } + + topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), "/vitess/global") + return err } // SetupEtcd spawns a new etcd service and initializes it with the defaults. @@ -289,6 +302,10 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) { // TearDown shutdowns the running topo service. func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error { + if topo.Server != nil { + topo.Server.Close() + } + if topo.Client != nil { switch cli := topo.Client.(type) { case *clientv3.Client: From 6f0c5d080fc0e18e51ead16a8a98b6678eb0c04b Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 18 Feb 2024 11:20:18 -0500 Subject: [PATCH 02/25] Add `InitTablet` method that uses the topo server directly Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/cluster_process.go | 35 +++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 98218bcf3fb..31e3f0a0f56 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -319,6 +319,41 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames return nil } +// InitTablet initializes a tablet record in the topo server. It does not start the tablet process. +func (cluster *LocalProcessCluster) InitTablet(tablet *Vttablet, keyspace string, shard string) error { + tabletpb := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: tablet.Cell, + Uid: uint32(tablet.TabletUID), + }, + Hostname: cluster.Hostname, + Type: topodatapb.TabletType_REPLICA, + PortMap: map[string]int32{ + "vt": int32(tablet.HTTPPort), + }, + Keyspace: keyspace, + Shard: shard, + } + + if tablet.Type == "rdonly" { + tabletpb.Type = topodatapb.TabletType_RDONLY + } + + if tablet.MySQLPort > 0 { + tabletpb.PortMap["mysql"] = int32(tablet.MySQLPort) + } + + if tablet.GrpcPort > 0 { + tabletpb.PortMap["grpc"] = int32(tablet.GrpcPort) + } + + allowPrimaryOverride := false + createShardAndKeyspace := true + allowUpdate := true + + return cluster.TopoProcess.Server.InitTablet(context.Background(), tabletpb, allowPrimaryOverride, createShardAndKeyspace, allowUpdate) +} + // StartKeyspace starts required number of shard and the corresponding tablets // keyspace : struct containing keyspace name, Sqlschema to apply, VSchema to apply // shardName : list of shard names From 5835f6152d63a527baed3e9cca417243b0fc253d Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 18 Feb 2024 11:38:11 -0500 Subject: [PATCH 03/25] Replace all `InitTablet` invocations to not use deprecated command Signed-off-by: Andrew Mason --- go/test/endtoend/backup/vtbackup/backup_only_test.go | 4 ++-- go/test/endtoend/backup/vtctlbackup/backup_utils.go | 8 ++++---- go/test/endtoend/reparent/utils/utils.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index 3f5389d2726..ecb04741d7b 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -167,7 +167,7 @@ func firstBackupTest(t *testing.T, tabletType string) { mysqlctl.CompressionEngineName = "lz4" defer func() { mysqlctl.CompressionEngineName = "pgzip" }() // now bring up the other replica, letting it restore from backup. - err = localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shardName) + err = localCluster.InitTablet(replica2, keyspaceName, shardName) require.Nil(t, err) restore(t, replica2, "replica", "SERVING") // Replica2 takes time to serve. Sleeping for 5 sec. @@ -266,7 +266,7 @@ func removeBackups(t *testing.T) { func initTablets(t *testing.T, startTablet bool, initShardPrimary bool) { // Initialize tablets for _, tablet := range []cluster.Vttablet{*primary, *replica1} { - err := localCluster.VtctlclientProcess.InitTablet(&tablet, cell, keyspaceName, hostname, shardName) + err := localCluster.InitTablet(&tablet, keyspaceName, shardName) require.Nil(t, err) if startTablet { diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index c20ab70e652..28afda30e10 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -228,13 +228,13 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp replica2 = shard.Vttablets[2] replica3 = shard.Vttablets[3] - if err := localCluster.VtctlclientProcess.InitTablet(primary, cell, keyspaceName, hostname, shard.Name); err != nil { + if err := localCluster.InitTablet(primary, keyspaceName, shard.Name); err != nil { return 1, err } - if err := localCluster.VtctlclientProcess.InitTablet(replica1, cell, keyspaceName, hostname, shard.Name); err != nil { + if err := localCluster.InitTablet(replica1, keyspaceName, shard.Name); err != nil { return 1, err } - if err := localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shard.Name); err != nil { + if err := localCluster.InitTablet(replica2, keyspaceName, shard.Name); err != nil { return 1, err } vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory) @@ -746,7 +746,7 @@ func restartPrimaryAndReplica(t *testing.T) { proc.Wait() } for _, tablet := range []*cluster.Vttablet{primary, replica1} { - err := localCluster.VtctlclientProcess.InitTablet(tablet, cell, keyspaceName, hostname, shardName) + err := localCluster.InitTablet(tablet, keyspaceName, shardName) require.Nil(t, err) err = tablet.VttabletProcess.Setup() require.Nil(t, err) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 790fd0028e2..a73be83f232 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -510,7 +510,7 @@ func RestartTablet(t *testing.T, clusterInstance *cluster.LocalProcessCluster, t tab.MysqlctlProcess.InitMysql = false err := tab.MysqlctlProcess.Start() require.NoError(t, err) - err = clusterInstance.VtctlclientProcess.InitTablet(tab, tab.Cell, KeyspaceName, Hostname, ShardName) + err = clusterInstance.InitTablet(tab, KeyspaceName, ShardName) require.NoError(t, err) } @@ -519,7 +519,7 @@ func ResurrectTablet(ctx context.Context, t *testing.T, clusterInstance *cluster tab.MysqlctlProcess.InitMysql = false err := tab.MysqlctlProcess.Start() require.NoError(t, err) - err = clusterInstance.VtctlclientProcess.InitTablet(tab, tab.Cell, KeyspaceName, Hostname, ShardName) + err = clusterInstance.InitTablet(tab, KeyspaceName, ShardName) require.NoError(t, err) // As there is already a primary the new replica will come directly in SERVING state From f88ef3f1c258e181e9d90397398c8100e15db2e4 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 18 Feb 2024 12:01:35 -0500 Subject: [PATCH 04/25] more InitTablet Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/cluster_process.go | 5 ++++- go/test/endtoend/keyspace/keyspace_test.go | 13 +++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 31e3f0a0f56..b906c81d411 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -335,8 +335,11 @@ func (cluster *LocalProcessCluster) InitTablet(tablet *Vttablet, keyspace string Shard: shard, } - if tablet.Type == "rdonly" { + switch tablet.Type { + case "rdonly": tabletpb.Type = topodatapb.TabletType_RDONLY + case "primary": + tabletpb.Type = topodatapb.TabletType_PRIMARY } if tablet.MySQLPort > 0 { diff --git a/go/test/endtoend/keyspace/keyspace_test.go b/go/test/endtoend/keyspace/keyspace_test.go index 7f7d4198135..26af0538b54 100644 --- a/go/test/endtoend/keyspace/keyspace_test.go +++ b/go/test/endtoend/keyspace/keyspace_test.go @@ -229,7 +229,11 @@ func TestDeleteKeyspace(t *testing.T) { defer cluster.PanicHandler(t) _ = clusterForKSTest.VtctldClientProcess.CreateKeyspace("test_delete_keyspace", sidecar.DefaultName) _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace/0") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--", "--keyspace=test_delete_keyspace", "--shard=0", "zone1-0000000100", "primary") + _ = clusterForKSTest.InitTablet(&cluster.Vttablet{ + Type: "primary", + TabletUID: 100, + Cell: "zone1", + }, "test_delete_keyspace", "0") // Can't delete keyspace if there are shards present. err := clusterForKSTest.VtctldClientProcess.ExecuteCommand("DeleteKeyspace", "test_delete_keyspace") @@ -247,7 +251,12 @@ func TestDeleteKeyspace(t *testing.T) { // Start over and this time use recursive DeleteKeyspace to do everything. _ = clusterForKSTest.VtctldClientProcess.CreateKeyspace("test_delete_keyspace", sidecar.DefaultName) _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace/0") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--", "--port=1234", "--bind-address=127.0.0.1", "--keyspace=test_delete_keyspace", "--shard=0", "zone1-0000000100", "primary") + _ = clusterForKSTest.InitTablet(&cluster.Vttablet{ + Type: "primary", + TabletUID: 100, + Cell: "zone1", + HTTPPort: 1234, + }, "test_delete_keyspace", "0") // Create the serving/replication entries and check that they exist, // so we can later check they're deleted. From 8923506d52fee2f1c92ebb7628c0b8f7855e559d Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 18 Feb 2024 17:25:28 -0500 Subject: [PATCH 05/25] Replace old `GetShardReplication` commands with topo call Signed-off-by: Andrew Mason --- go/test/endtoend/keyspace/keyspace_test.go | 9 +++++--- go/test/endtoend/reparent/utils/utils.go | 18 +++------------ .../endtoend/tabletmanager/commands_test.go | 22 +++++-------------- 3 files changed, 15 insertions(+), 34 deletions(-) diff --git a/go/test/endtoend/keyspace/keyspace_test.go b/go/test/endtoend/keyspace/keyspace_test.go index 26af0538b54..66d9c304cc0 100644 --- a/go/test/endtoend/keyspace/keyspace_test.go +++ b/go/test/endtoend/keyspace/keyspace_test.go @@ -17,6 +17,7 @@ limitations under the License. package sequence import ( + "context" "encoding/binary" "encoding/json" "flag" @@ -261,8 +262,8 @@ func TestDeleteKeyspace(t *testing.T) { // Create the serving/replication entries and check that they exist, // so we can later check they're deleted. _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", cell, "test_delete_keyspace/0") - _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspaces", "test_delete_keyspace", cell) + _, _ = clusterForKSTest.TopoProcess.Server.GetShardReplication(context.Background(), cell, "test_delete_keyspace", "0") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", cell, "test_delete_keyspace") // Recursive DeleteKeyspace _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("DeleteKeyspace", "--recursive", "test_delete_keyspace") @@ -274,7 +275,9 @@ func TestDeleteKeyspace(t *testing.T) { require.Error(t, err) err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetTablet", "zone1-0000000100") require.Error(t, err) - err = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", cell, "test_delete_keyspace/0") + _, err = clusterForKSTest.TopoProcess.Server.GetShardReplication(context.Background(), cell, "test_delete_keyspace", "0") + require.Error(t, err) + err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", cell, "test_delete_keyspace") require.Error(t, err) ksMap, err := clusterForKSTest.VtctldClientProcess.GetSrvKeyspaces("test_delete_keyspace", cell) require.NoError(t, err) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index a73be83f232..6d483e43b69 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -18,12 +18,10 @@ package utils import ( "context" - "encoding/json" "fmt" "os" "os/exec" "path" - "reflect" "strings" "testing" "time" @@ -608,12 +606,12 @@ func CheckReplicaStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttab // CheckReparentFromOutside checks that cluster was reparented from outside func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, downPrimary bool, baseTime int64) { - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell1, KeyspaceShard) + result, err := clusterInstance.TopoProcess.Server.GetShardReplication(context.Background(), cell1, KeyspaceName, ShardName) require.Nil(t, err, "error should be Nil") if !downPrimary { - assertNodeCount(t, result, int(3)) + assert.Len(t, result.Nodes, 3) } else { - assertNodeCount(t, result, int(2)) + assert.Len(t, result.Nodes, 2) } // make sure the primary status page says it's the primary @@ -658,16 +656,6 @@ func positionAtLeast(t *testing.T, tablet *cluster.Vttablet, a string, b string) return isAtleast } -func assertNodeCount(t *testing.T, result string, want int) { - resultMap := make(map[string]any) - err := json.Unmarshal([]byte(result), &resultMap) - require.NoError(t, err) - - nodes := reflect.ValueOf(resultMap["nodes"]) - got := nodes.Len() - assert.Equal(t, want, got) -} - // CheckDBvar checks the db var func CheckDBvar(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, variable string, status string) { tabletParams := getMysqlConnParam(tablet) diff --git a/go/test/endtoend/tabletmanager/commands_test.go b/go/test/endtoend/tabletmanager/commands_test.go index d23413e0269..8b5d9391ddb 100644 --- a/go/test/endtoend/tabletmanager/commands_test.go +++ b/go/test/endtoend/tabletmanager/commands_test.go @@ -188,23 +188,23 @@ func runHookAndAssert(t *testing.T, params []string, expectedStatus string, expe func TestShardReplicationFix(t *testing.T) { // make sure the replica is in the replication graph, 2 nodes: 1 primary, 1 replica defer cluster.PanicHandler(t) - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell, keyspaceShard) + result, err := clusterInstance.TopoProcess.Server.GetShardReplication(context.Background(), cell, keyspaceName, shardName) require.Nil(t, err, "error should be Nil") - assertNodeCount(t, result, int(3)) + assert.Len(t, result.Nodes, 3) // Manually add a bogus entry to the replication graph, and check it is removed by ShardReplicationFix err = clusterInstance.VtctldClientProcess.ExecuteCommand("ShardReplicationAdd", keyspaceShard, fmt.Sprintf("%s-9000", cell)) require.Nil(t, err, "error should be Nil") - result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell, keyspaceShard) + result, err = clusterInstance.TopoProcess.Server.GetShardReplication(context.Background(), cell, keyspaceName, shardName) require.Nil(t, err, "error should be Nil") - assertNodeCount(t, result, int(4)) + assert.Len(t, result.Nodes, 4) err = clusterInstance.VtctldClientProcess.ExecuteCommand("ShardReplicationFix", cell, keyspaceShard) require.Nil(t, err, "error should be Nil") - result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell, keyspaceShard) + result, err = clusterInstance.TopoProcess.Server.GetShardReplication(context.Background(), cell, keyspaceName, shardName) require.Nil(t, err, "error should be Nil") - assertNodeCount(t, result, int(3)) + assert.Len(t, result.Nodes, 3) } func TestGetSchema(t *testing.T) { @@ -220,13 +220,3 @@ func TestGetSchema(t *testing.T) { v1Create := gjson.Get(res, "table_definitions.#(name==\"v1\").schema") assert.Equal(t, getSchemaV1Results, v1Create.String()) } - -func assertNodeCount(t *testing.T, result string, want int) { - resultMap := make(map[string]any) - err := json.Unmarshal([]byte(result), &resultMap) - require.Nil(t, err) - - nodes := reflect.ValueOf(resultMap["nodes"]) - got := nodes.Len() - assert.Equal(t, want, got) -} From 803eba694042ff0888a393918ec8a02350d184df Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Mon, 19 Feb 2024 12:15:49 -0500 Subject: [PATCH 06/25] ShardReplicationPosition e2e Signed-off-by: Andrew Mason --- go/test/endtoend/reparent/utils/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 6d483e43b69..35c53f276e7 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -559,7 +559,7 @@ func GetNewPrimary(t *testing.T, clusterInstance *cluster.LocalProcessCluster) * // GetShardReplicationPositions gets the shards replication positions. // This should not generally be called directly, instead use the WaitForReplicationToCatchup method. func GetShardReplicationPositions(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName, shardName string, doPrint bool) []string { - output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput( + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( "ShardReplicationPositions", fmt.Sprintf("%s/%s", keyspaceName, shardName)) require.NoError(t, err) strArray := strings.Split(output, "\n") From 9431514a5e388eaf9f73df5110e986c722037b62 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Mon, 19 Feb 2024 12:16:05 -0500 Subject: [PATCH 07/25] ExecuteHook e2e Signed-off-by: Andrew Mason --- .../endtoend/tabletmanager/commands_test.go | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/go/test/endtoend/tabletmanager/commands_test.go b/go/test/endtoend/tabletmanager/commands_test.go index 8b5d9391ddb..9ac5e3fb16b 100644 --- a/go/test/endtoend/tabletmanager/commands_test.go +++ b/go/test/endtoend/tabletmanager/commands_test.go @@ -23,6 +23,7 @@ import ( "reflect" "testing" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/test/endtoend/utils" "github.com/stretchr/testify/require" @@ -32,6 +33,8 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) var ( @@ -143,44 +146,39 @@ func TestHook(t *testing.T) { // test a regular program works defer cluster.PanicHandler(t) runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "test.sh", "--flag1", "--param1=hello"}, "0", false, "") + "ExecuteHook", primaryTablet.Alias, "test.sh", "--", "--flag1", "--param1=hello"}, 0, false, "") // test stderr output runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "test.sh", "--to-stderr"}, "0", false, "ERR: --to-stderr\n") + "ExecuteHook", primaryTablet.Alias, "test.sh", "--", "--to-stderr"}, 0, false, "ERR: --to-stderr\n") // test commands that fail runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "test.sh", "--exit-error"}, "1", false, "ERROR: exit status 1\n") + "ExecuteHook", primaryTablet.Alias, "test.sh", "--", "--exit-error"}, 1, false, "ERROR: exit status 1\n") // test hook that is not present runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "not_here.sh", "--exit-error"}, "-1", false, "missing hook") + "ExecuteHook", primaryTablet.Alias, "not_here.sh", "--", "--exit-error"}, -1, false, "missing hook") // test hook with invalid name runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "/bin/ls"}, "-1", true, "hook name cannot have") + "ExecuteHook", primaryTablet.Alias, "/bin/ls"}, -1, true, "hook name cannot have") } -func runHookAndAssert(t *testing.T, params []string, expectedStatus string, expectedError bool, expectedStderr string) { - - hr, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput(params...) +func runHookAndAssert(t *testing.T, params []string, expectedStatus int64, expectedError bool, expectedStderr string) { + hr, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(params...) if expectedError { assert.Error(t, err, "Expected error") } else { require.Nil(t, err) - resultMap := make(map[string]any) - err = json.Unmarshal([]byte(hr), &resultMap) + var resp vtctldatapb.ExecuteHookResponse + err = json2.Unmarshal([]byte(hr), &resp) require.Nil(t, err) - exitStatus := reflect.ValueOf(resultMap["ExitStatus"]).Float() - status := fmt.Sprintf("%.0f", exitStatus) - assert.Equal(t, expectedStatus, status) - - stderr := reflect.ValueOf(resultMap["Stderr"]).String() - assert.Contains(t, stderr, expectedStderr) + assert.Equal(t, expectedStatus, resp.HookResult.ExitStatus) + assert.Contains(t, resp.HookResult.Stderr, expectedStderr) } } From d6847d9ab13d04a851d51ee3474f836d4088a863 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Tue, 20 Feb 2024 06:23:24 -0500 Subject: [PATCH 08/25] backup output Signed-off-by: Andrew Mason --- go/test/endtoend/backup/vtctlbackup/backup_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 28afda30e10..7fc872e934c 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -449,7 +449,7 @@ func primaryBackup(t *testing.T) { }() verifyInitialReplication(t) - output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("Backup", primary.Alias) + output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", primary.Alias) require.Error(t, err) assert.Contains(t, output, "type PRIMARY cannot take backup. if you really need to do this, rerun the backup command with --allow_primary") From 3a58ce4fcd6e71fdd55f6ba09d022df8553f4de6 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Tue, 20 Feb 2024 16:37:11 -0500 Subject: [PATCH 09/25] replace ChangeTabletType and GetTablet with new versions Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/cluster_process.go | 21 +++---------------- .../endtoend/cluster/vtctldclient_process.go | 9 ++++++++ go/test/endtoend/reparent/utils/utils.go | 2 +- go/test/endtoend/tabletgateway/vtgate_test.go | 2 +- 4 files changed, 14 insertions(+), 20 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index b906c81d411..7f652aeeffb 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -38,7 +38,6 @@ import ( "time" "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/syscallutil" @@ -894,7 +893,7 @@ func (cluster *LocalProcessCluster) ExecOnTablet(ctx context.Context, vttablet * return nil, err } - tablet, err := cluster.VtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias) if err != nil { return nil, err } @@ -937,7 +936,7 @@ func (cluster *LocalProcessCluster) ExecOnVTGate(ctx context.Context, addr strin // returns the responses. It returns an error if the stream ends with fewer than // `count` responses. func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vttablet *Vttablet, count int) (responses []*querypb.StreamHealthResponse, err error) { - tablet, err := cluster.VtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias) if err != nil { return nil, err } @@ -972,7 +971,7 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta // StreamTabletHealthUntil invokes a HealthStream on a local cluster Vttablet and // returns the responses. It waits until a certain condition is met. The amount of time to wait is an input that it takes. func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, vttablet *Vttablet, timeout time.Duration, condition func(shr *querypb.StreamHealthResponse) bool) error { - tablet, err := cluster.VtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias) if err != nil { return err } @@ -1009,20 +1008,6 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, return err } -func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) { - result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias) - if err != nil { - return nil, err - } - - var ti topodatapb.Tablet - if err := json2.Unmarshal([]byte(result), &ti); err != nil { - return nil, err - } - - return &ti, nil -} - func (cluster *LocalProcessCluster) VtctlclientChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error { _, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("ChangeTabletType", "--", tablet.Alias, tabletType.String()) return err diff --git a/go/test/endtoend/cluster/vtctldclient_process.go b/go/test/endtoend/cluster/vtctldclient_process.go index c5afd8f1220..24ec6f69fe3 100644 --- a/go/test/endtoend/cluster/vtctldclient_process.go +++ b/go/test/endtoend/cluster/vtctldclient_process.go @@ -152,6 +152,15 @@ func (vtctldclient *VtctldClientProcess) ApplyVSchema(keyspace string, json stri ) } +// ChangeTabletType changes the type of the given tablet. +func (vtctldclient *VtctldClientProcess) ChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error { + return vtctldclient.ExecuteCommand( + "ChangeTabletType", + tablet.Alias, + tabletType.String(), + ) +} + // GetSrvKeyspaces returns a mapping of cell to srv keyspace for the given keyspace. func (vtctldclient *VtctldClientProcess) GetSrvKeyspaces(keyspace string, cells ...string) (ksMap map[string]*topodatapb.SrvKeyspace, err error) { args := append([]string{"GetSrvKeyspaces", keyspace}, cells...) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 35c53f276e7..f8a4f2625dc 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -706,7 +706,7 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V } func WaitForTabletToBeServing(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, timeout time.Duration) { - vTablet, err := clusterInstance.VtctlclientGetTablet(tablet) + vTablet, err := clusterInstance.VtctldClientProcess.GetTablet(tablet.Alias) require.NoError(t, err) tConn, err := tabletconn.GetDialer()(vTablet, false) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index d9cedc04b69..44de751d30e 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -120,7 +120,7 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { // change the RDONLY tablet to SPARE rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() - err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE) + err = clusterInstance.VtctldClientProcess.ChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE) require.NoError(t, err) // Change it back to RDONLY afterward as the cluster is re-used. defer func() { From 485daeb2ef8c733de7abb46c64dc50fe6bc26b5e Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 23 Feb 2024 08:41:54 -0500 Subject: [PATCH 10/25] nil out server after close Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/topo_process.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index 0272b2cf065..db1645bda38 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -304,6 +304,7 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) { func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error { if topo.Server != nil { topo.Server.Close() + topo.Server = nil } if topo.Client != nil { From 30ec7f73851217b8345dceb6cd52e2a2c6c19351 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 23 Feb 2024 14:58:58 -0500 Subject: [PATCH 11/25] do not hardcode topo root Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/topo_process.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index db1645bda38..1a2409fcdb2 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -64,11 +64,13 @@ type TopoProcess struct { // Setup starts a new topo service func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) { + root := "/" + topo.Name switch topoFlavor { case "zk2": err = topo.SetupZookeeper(cluster) case "consul": err = topo.SetupConsul(cluster) + root = strings.TrimPrefix(root, "/") default: // Override any inherited ETCDCTL_API env value to // ensure that we use the v3 API and storage. @@ -80,7 +82,7 @@ func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) return err } - topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), "/vitess/global") + topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), root) return err } From d0e11a8407c0ac22e64f7dff81774a4a06eb10bf Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 28 Feb 2024 08:09:48 -0500 Subject: [PATCH 12/25] only open the server after we have established a global root Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/cluster_process.go | 5 +++++ go/test/endtoend/cluster/topo_process.go | 8 +++++--- go/test/endtoend/vreplication/cluster_test.go | 1 + 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 7f652aeeffb..959e5760ceb 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -245,6 +245,11 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) { cluster.VtctlProcess.LogDir = cluster.TmpDirectory } + if err = cluster.TopoProcess.OpenServer(*topoFlavor, cluster.VtctlProcess.TopoGlobalRoot); err != nil { + log.Error(err.Error()) + return + } + cluster.VtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(), cluster.TopoProcess.Port, cluster.Hostname, cluster.TmpDirectory) log.Infof("Starting vtctld server on port: %d", cluster.VtctldProcess.Port) diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index 1a2409fcdb2..57d300e6fb2 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -64,13 +64,11 @@ type TopoProcess struct { // Setup starts a new topo service func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) { - root := "/" + topo.Name switch topoFlavor { case "zk2": err = topo.SetupZookeeper(cluster) case "consul": err = topo.SetupConsul(cluster) - root = strings.TrimPrefix(root, "/") default: // Override any inherited ETCDCTL_API env value to // ensure that we use the v3 API and storage. @@ -82,10 +80,14 @@ func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) return err } - topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), root) return err } +func (topo *TopoProcess) OpenServer(topoFlavor string, globalRoot string) (err error) { + topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), globalRoot) + return +} + // SetupEtcd spawns a new etcd service and initializes it with the defaults. // The service is kept running in the background until TearDown() is called. func (topo *TopoProcess) SetupEtcd() (err error) { diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 6fd63edb200..8018685680b 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -396,6 +396,7 @@ func NewVitessCluster(t *testing.T, opts *clusterOptions) *VitessCluster { vc.setupVtctld() vc.setupVtctl() + require.NoError(t, vc.Topo.OpenServer("etcd2", vc.Vtctl.TopoGlobalRoot)) vc.setupVtctlClient() vc.setupVtctldClient() From bdccfb7fd9b2ab7d0bc1dd35ad430440ca65ddbd Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 28 Feb 2024 09:09:10 -0500 Subject: [PATCH 13/25] also last ChangeTabletType Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/cluster_process.go | 5 ----- go/test/endtoend/tabletgateway/vtgate_test.go | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 959e5760ceb..de88915d97c 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -1013,11 +1013,6 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, return err } -func (cluster *LocalProcessCluster) VtctlclientChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error { - _, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("ChangeTabletType", "--", tablet.Alias, tabletType.String()) - return err -} - // Teardown brings down the cluster by invoking teardown for individual processes func (cluster *LocalProcessCluster) Teardown() { PanicHandler(nil) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index 44de751d30e..de4546d5d0d 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -124,7 +124,7 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { require.NoError(t, err) // Change it back to RDONLY afterward as the cluster is re-used. defer func() { - err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", rdOnlyTablet.Alias, "rdonly") require.NoError(t, err) }() From 5be7ffd91709c7a16b1108c0e39ee7b6b2c30ffe Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 28 Feb 2024 12:31:36 -0500 Subject: [PATCH 14/25] re-simplify TopoProcess.Setup Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/topo_process.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index 57d300e6fb2..271fb114de1 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -66,21 +66,15 @@ type TopoProcess struct { func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) { switch topoFlavor { case "zk2": - err = topo.SetupZookeeper(cluster) + return topo.SetupZookeeper(cluster) case "consul": - err = topo.SetupConsul(cluster) + return topo.SetupConsul(cluster) default: // Override any inherited ETCDCTL_API env value to // ensure that we use the v3 API and storage. os.Setenv("ETCDCTL_API", "3") - err = topo.SetupEtcd() + return topo.SetupEtcd() } - - if err != nil { - return err - } - - return err } func (topo *TopoProcess) OpenServer(topoFlavor string, globalRoot string) (err error) { From b1375dee1d3da185096f0c3f5a1598027b844cd9 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Thu, 29 Feb 2024 11:01:33 -0500 Subject: [PATCH 15/25] rewrite with vtctldclient command Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/vtctldclient_process.go | 13 +++++++++++++ go/test/endtoend/keyspace/keyspace_test.go | 5 ++--- go/test/endtoend/reparent/utils/utils.go | 7 ++++--- go/test/endtoend/tabletmanager/commands_test.go | 15 +++++++++------ 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/go/test/endtoend/cluster/vtctldclient_process.go b/go/test/endtoend/cluster/vtctldclient_process.go index 24ec6f69fe3..971e929ebfc 100644 --- a/go/test/endtoend/cluster/vtctldclient_process.go +++ b/go/test/endtoend/cluster/vtctldclient_process.go @@ -161,6 +161,19 @@ func (vtctldclient *VtctldClientProcess) ChangeTabletType(tablet *Vttablet, tabl ) } +// GetShardReplication returns a mapping of cell to shard replication for the given keyspace and shard. +func (vtctldclient *VtctldClientProcess) GetShardReplication(keyspace string, shard string, cells ...string) (replication map[string]*topodatapb.ShardReplication, err error) { + args := append([]string{"GetShardReplication", keyspace + "/" + shard}, cells...) + out, err := vtctldclient.ExecuteCommandWithOutput(args...) + if err != nil { + return nil, err + } + + replication = map[string]*topodatapb.ShardReplication{} + err = json2.Unmarshal([]byte(out), &replication) + return replication, err +} + // GetSrvKeyspaces returns a mapping of cell to srv keyspace for the given keyspace. func (vtctldclient *VtctldClientProcess) GetSrvKeyspaces(keyspace string, cells ...string) (ksMap map[string]*topodatapb.SrvKeyspace, err error) { args := append([]string{"GetSrvKeyspaces", keyspace}, cells...) diff --git a/go/test/endtoend/keyspace/keyspace_test.go b/go/test/endtoend/keyspace/keyspace_test.go index 66d9c304cc0..11632b10d72 100644 --- a/go/test/endtoend/keyspace/keyspace_test.go +++ b/go/test/endtoend/keyspace/keyspace_test.go @@ -17,7 +17,6 @@ limitations under the License. package sequence import ( - "context" "encoding/binary" "encoding/json" "flag" @@ -262,7 +261,7 @@ func TestDeleteKeyspace(t *testing.T) { // Create the serving/replication entries and check that they exist, // so we can later check they're deleted. _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace") - _, _ = clusterForKSTest.TopoProcess.Server.GetShardReplication(context.Background(), cell, "test_delete_keyspace", "0") + _, _ = clusterForKSTest.VtctldClientProcess.GetShardReplication("test_delete_keyspace/0", cell) _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", cell, "test_delete_keyspace") // Recursive DeleteKeyspace @@ -275,7 +274,7 @@ func TestDeleteKeyspace(t *testing.T) { require.Error(t, err) err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetTablet", "zone1-0000000100") require.Error(t, err) - _, err = clusterForKSTest.TopoProcess.Server.GetShardReplication(context.Background(), cell, "test_delete_keyspace", "0") + _, err = clusterForKSTest.VtctldClientProcess.GetShardReplication("test_delete_keyspace/0", cell) require.Error(t, err) err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", cell, "test_delete_keyspace") require.Error(t, err) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index f8a4f2625dc..2ff290f2def 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -606,12 +606,13 @@ func CheckReplicaStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttab // CheckReparentFromOutside checks that cluster was reparented from outside func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, downPrimary bool, baseTime int64) { - result, err := clusterInstance.TopoProcess.Server.GetShardReplication(context.Background(), cell1, KeyspaceName, ShardName) + result, err := clusterInstance.VtctldClientProcess.GetShardReplication(KeyspaceName, ShardName, cell1) require.Nil(t, err, "error should be Nil") + require.NotNil(t, result[cell1], "result should not be nil") if !downPrimary { - assert.Len(t, result.Nodes, 3) + assert.Len(t, result[cell1].Nodes, 3) } else { - assert.Len(t, result.Nodes, 2) + assert.Len(t, result[cell1].Nodes, 2) } // make sure the primary status page says it's the primary diff --git a/go/test/endtoend/tabletmanager/commands_test.go b/go/test/endtoend/tabletmanager/commands_test.go index 9ac5e3fb16b..e91773e1ca4 100644 --- a/go/test/endtoend/tabletmanager/commands_test.go +++ b/go/test/endtoend/tabletmanager/commands_test.go @@ -186,23 +186,26 @@ func runHookAndAssert(t *testing.T, params []string, expectedStatus int64, expec func TestShardReplicationFix(t *testing.T) { // make sure the replica is in the replication graph, 2 nodes: 1 primary, 1 replica defer cluster.PanicHandler(t) - result, err := clusterInstance.TopoProcess.Server.GetShardReplication(context.Background(), cell, keyspaceName, shardName) + result, err := clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceShard, cell) require.Nil(t, err, "error should be Nil") - assert.Len(t, result.Nodes, 3) + require.NotNil(t, result[cell], "result should not be Nil") + assert.Len(t, result[cell].Nodes, 3) // Manually add a bogus entry to the replication graph, and check it is removed by ShardReplicationFix err = clusterInstance.VtctldClientProcess.ExecuteCommand("ShardReplicationAdd", keyspaceShard, fmt.Sprintf("%s-9000", cell)) require.Nil(t, err, "error should be Nil") - result, err = clusterInstance.TopoProcess.Server.GetShardReplication(context.Background(), cell, keyspaceName, shardName) + result, err = clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceShard, cell) require.Nil(t, err, "error should be Nil") - assert.Len(t, result.Nodes, 4) + require.NotNil(t, result[cell], "result should not be Nil") + assert.Len(t, result[cell].Nodes, 4) err = clusterInstance.VtctldClientProcess.ExecuteCommand("ShardReplicationFix", cell, keyspaceShard) require.Nil(t, err, "error should be Nil") - result, err = clusterInstance.TopoProcess.Server.GetShardReplication(context.Background(), cell, keyspaceName, shardName) + result, err = clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceShard, cell) require.Nil(t, err, "error should be Nil") - assert.Len(t, result.Nodes, 3) + require.NotNil(t, result[cell], "result should not be Nil") + assert.Len(t, result[cell].Nodes, 3) } func TestGetSchema(t *testing.T) { From a67e1364f6bdcc1ecdd2891ad1b90dc10006e253 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Thu, 29 Feb 2024 12:43:32 -0500 Subject: [PATCH 16/25] compat shim Signed-off-by: Andrew Mason --- go/test/endtoend/reparent/utils/utils.go | 36 +++++++++++++++++++----- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 2ff290f2def..fb782e69ea4 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -18,10 +18,12 @@ package utils import ( "context" + "encoding/json" "fmt" "os" "os/exec" "path" + "reflect" "strings" "testing" "time" @@ -606,13 +608,23 @@ func CheckReplicaStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttab // CheckReparentFromOutside checks that cluster was reparented from outside func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, downPrimary bool, baseTime int64) { - result, err := clusterInstance.VtctldClientProcess.GetShardReplication(KeyspaceName, ShardName, cell1) - require.Nil(t, err, "error should be Nil") - require.NotNil(t, result[cell1], "result should not be nil") - if !downPrimary { - assert.Len(t, result[cell1].Nodes, 3) + if clusterInstance.VtctlMajorVersion > 19 { // TODO: (ajm188) remove else clause after next release + result, err := clusterInstance.VtctldClientProcess.GetShardReplication(KeyspaceName, ShardName, cell1) + require.Nil(t, err, "error should be Nil") + require.NotNil(t, result[cell1], "result should not be nil") + if !downPrimary { + assert.Len(t, result[cell1].Nodes, 3) + } else { + assert.Len(t, result[cell1].Nodes, 2) + } } else { - assert.Len(t, result[cell1].Nodes, 2) + result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell1, KeyspaceShard) + require.Nil(t, err, "error should be Nil") + if !downPrimary { + assertNodeCount(t, result, int(3)) + } else { + assertNodeCount(t, result, int(2)) + } } // make sure the primary status page says it's the primary @@ -621,7 +633,7 @@ func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProces // make sure the primary health stream says it's the primary too // (health check is disabled on these servers, force it first) - err = clusterInstance.VtctldClientProcess.ExecuteCommand("RunHealthCheck", tablet.Alias) + err := clusterInstance.VtctldClientProcess.ExecuteCommand("RunHealthCheck", tablet.Alias) require.NoError(t, err) shrs, err := clusterInstance.StreamTabletHealth(context.Background(), tablet, 1) @@ -632,6 +644,16 @@ func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProces assert.True(t, streamHealthResponse.PrimaryTermStartTimestamp >= baseTime) } +func assertNodeCount(t *testing.T, result string, want int) { + resultMap := make(map[string]any) + err := json.Unmarshal([]byte(result), &resultMap) + require.NoError(t, err) + + nodes := reflect.ValueOf(resultMap["nodes"]) + got := nodes.Len() + assert.Equal(t, want, got) +} + // WaitForReplicationPosition waits for tablet B to catch up to the replication position of tablet A. func WaitForReplicationPosition(t *testing.T, tabletA *cluster.Vttablet, tabletB *cluster.Vttablet) error { posA, _ := cluster.GetPrimaryPosition(t, *tabletA, Hostname) From 3c24c3607a4cdc7fbd70c16019ab1b2a3df3fa25 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 1 Mar 2024 10:28:14 -0500 Subject: [PATCH 17/25] dumb dumb Signed-off-by: Andrew Mason --- go/test/endtoend/keyspace/keyspace_test.go | 4 ++-- go/test/endtoend/tabletmanager/commands_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/keyspace/keyspace_test.go b/go/test/endtoend/keyspace/keyspace_test.go index 11632b10d72..96f069a209e 100644 --- a/go/test/endtoend/keyspace/keyspace_test.go +++ b/go/test/endtoend/keyspace/keyspace_test.go @@ -261,7 +261,7 @@ func TestDeleteKeyspace(t *testing.T) { // Create the serving/replication entries and check that they exist, // so we can later check they're deleted. _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace") - _, _ = clusterForKSTest.VtctldClientProcess.GetShardReplication("test_delete_keyspace/0", cell) + _, _ = clusterForKSTest.VtctldClientProcess.GetShardReplication("test_delete_keyspace", "0", cell) _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", cell, "test_delete_keyspace") // Recursive DeleteKeyspace @@ -274,7 +274,7 @@ func TestDeleteKeyspace(t *testing.T) { require.Error(t, err) err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetTablet", "zone1-0000000100") require.Error(t, err) - _, err = clusterForKSTest.VtctldClientProcess.GetShardReplication("test_delete_keyspace/0", cell) + _, err = clusterForKSTest.VtctldClientProcess.GetShardReplication("test_delete_keyspace", "0", cell) require.Error(t, err) err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", cell, "test_delete_keyspace") require.Error(t, err) diff --git a/go/test/endtoend/tabletmanager/commands_test.go b/go/test/endtoend/tabletmanager/commands_test.go index e91773e1ca4..b18fb5c39d1 100644 --- a/go/test/endtoend/tabletmanager/commands_test.go +++ b/go/test/endtoend/tabletmanager/commands_test.go @@ -186,7 +186,7 @@ func runHookAndAssert(t *testing.T, params []string, expectedStatus int64, expec func TestShardReplicationFix(t *testing.T) { // make sure the replica is in the replication graph, 2 nodes: 1 primary, 1 replica defer cluster.PanicHandler(t) - result, err := clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceShard, cell) + result, err := clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceName, shardName, cell) require.Nil(t, err, "error should be Nil") require.NotNil(t, result[cell], "result should not be Nil") assert.Len(t, result[cell].Nodes, 3) @@ -195,14 +195,14 @@ func TestShardReplicationFix(t *testing.T) { err = clusterInstance.VtctldClientProcess.ExecuteCommand("ShardReplicationAdd", keyspaceShard, fmt.Sprintf("%s-9000", cell)) require.Nil(t, err, "error should be Nil") - result, err = clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceShard, cell) + result, err = clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceName, shardName, cell) require.Nil(t, err, "error should be Nil") require.NotNil(t, result[cell], "result should not be Nil") assert.Len(t, result[cell].Nodes, 4) err = clusterInstance.VtctldClientProcess.ExecuteCommand("ShardReplicationFix", cell, keyspaceShard) require.Nil(t, err, "error should be Nil") - result, err = clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceShard, cell) + result, err = clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceName, shardName, cell) require.Nil(t, err, "error should be Nil") require.NotNil(t, result[cell], "result should not be Nil") assert.Len(t, result[cell].Nodes, 3) From 9ac33e289b65b07057fc75c4a165fcb03f5c367e Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 1 Mar 2024 12:16:50 -0500 Subject: [PATCH 18/25] whoops Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/vtctldclient_process.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/cluster/vtctldclient_process.go b/go/test/endtoend/cluster/vtctldclient_process.go index 971e929ebfc..57cb0cc4f45 100644 --- a/go/test/endtoend/cluster/vtctldclient_process.go +++ b/go/test/endtoend/cluster/vtctldclient_process.go @@ -162,16 +162,16 @@ func (vtctldclient *VtctldClientProcess) ChangeTabletType(tablet *Vttablet, tabl } // GetShardReplication returns a mapping of cell to shard replication for the given keyspace and shard. -func (vtctldclient *VtctldClientProcess) GetShardReplication(keyspace string, shard string, cells ...string) (replication map[string]*topodatapb.ShardReplication, err error) { +func (vtctldclient *VtctldClientProcess) GetShardReplication(keyspace string, shard string, cells ...string) (map[string]*topodatapb.ShardReplication, error) { args := append([]string{"GetShardReplication", keyspace + "/" + shard}, cells...) out, err := vtctldclient.ExecuteCommandWithOutput(args...) if err != nil { return nil, err } - replication = map[string]*topodatapb.ShardReplication{} - err = json2.Unmarshal([]byte(out), &replication) - return replication, err + var resp vtctldatapb.GetShardReplicationResponse + err = json2.Unmarshal([]byte(out), &resp) + return resp.ShardReplicationByCell, err } // GetSrvKeyspaces returns a mapping of cell to srv keyspace for the given keyspace. From b3bfa45484141377603b11a4764cb5a03e4e626a Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 28 Feb 2024 15:14:49 -0500 Subject: [PATCH 19/25] tidy imports Signed-off-by: Andrew Mason --- go/test/endtoend/tabletmanager/commands_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/tabletmanager/commands_test.go b/go/test/endtoend/tabletmanager/commands_test.go index b18fb5c39d1..ca0b3b15818 100644 --- a/go/test/endtoend/tabletmanager/commands_test.go +++ b/go/test/endtoend/tabletmanager/commands_test.go @@ -23,16 +23,14 @@ import ( "reflect" "testing" - "vitess.io/vitess/go/json2" - "vitess.io/vitess/go/test/endtoend/utils" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" - "github.com/stretchr/testify/assert" - + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) From 9728dea5fb0fc0379e5eaffcb8fe2595e3579cc6 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 28 Feb 2024 15:47:00 -0500 Subject: [PATCH 20/25] thread globalroot through a little more cleanly Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/cluster_process.go | 5 ----- go/test/endtoend/cluster/topo_process.go | 14 ++++++++------ go/test/endtoend/cluster/vtctl_process.go | 14 +++++++++++--- go/test/endtoend/vreplication/cluster_test.go | 1 - 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index de88915d97c..0233b2ac1ea 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -245,11 +245,6 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) { cluster.VtctlProcess.LogDir = cluster.TmpDirectory } - if err = cluster.TopoProcess.OpenServer(*topoFlavor, cluster.VtctlProcess.TopoGlobalRoot); err != nil { - log.Error(err.Error()) - return - } - cluster.VtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(), cluster.TopoProcess.Port, cluster.Hostname, cluster.TmpDirectory) log.Infof("Starting vtctld server on port: %d", cluster.VtctldProcess.Port) diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index 271fb114de1..dfc09eda1dd 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -66,19 +66,21 @@ type TopoProcess struct { func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) { switch topoFlavor { case "zk2": - return topo.SetupZookeeper(cluster) + err = topo.SetupZookeeper(cluster) case "consul": - return topo.SetupConsul(cluster) + err = topo.SetupConsul(cluster) default: // Override any inherited ETCDCTL_API env value to // ensure that we use the v3 API and storage. os.Setenv("ETCDCTL_API", "3") - return topo.SetupEtcd() + err = topo.SetupEtcd() + } + + if err != nil { + return } -} -func (topo *TopoProcess) OpenServer(topoFlavor string, globalRoot string) (err error) { - topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), globalRoot) + topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), TopoGlobalRoot(topoFlavor)) return } diff --git a/go/test/endtoend/cluster/vtctl_process.go b/go/test/endtoend/cluster/vtctl_process.go index 9b3d1a5f4e1..c0274800a29 100644 --- a/go/test/endtoend/cluster/vtctl_process.go +++ b/go/test/endtoend/cluster/vtctl_process.go @@ -111,6 +111,16 @@ func (vtctl *VtctlProcess) ExecuteCommand(args ...string) (err error) { return tmpProcess.Run() } +// TopoGlobalRoot returns the global root for the given topo flavor. +func TopoGlobalRoot(flavor string) string { + switch flavor { + case "consul": + return "global" + default: + return "/vitess/global" + } +} + // VtctlProcessInstance returns a VtctlProcess handle for vtctl process // configured with the given Config. // The process must be manually started by calling setup() @@ -118,7 +128,6 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess { // Default values for etcd2 topo server. topoImplementation := "etcd2" - topoGlobalRoot := "/vitess/global" topoRootPath := "/" // Checking and resetting the parameters for required topo server. @@ -127,7 +136,6 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess { topoImplementation = "zk2" case "consul": topoImplementation = "consul" - topoGlobalRoot = "global" // For consul we do not need "/" in the path topoRootPath = "" } @@ -142,7 +150,7 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess { Binary: "vtctl", TopoImplementation: topoImplementation, TopoGlobalAddress: fmt.Sprintf("%s:%d", hostname, topoPort), - TopoGlobalRoot: topoGlobalRoot, + TopoGlobalRoot: TopoGlobalRoot(*topoFlavor), TopoServerAddress: fmt.Sprintf("%s:%d", hostname, topoPort), TopoRootPath: topoRootPath, VtctlMajorVersion: version, diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 8018685680b..6fd63edb200 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -396,7 +396,6 @@ func NewVitessCluster(t *testing.T, opts *clusterOptions) *VitessCluster { vc.setupVtctld() vc.setupVtctl() - require.NoError(t, vc.Topo.OpenServer("etcd2", vc.Vtctl.TopoGlobalRoot)) vc.setupVtctlClient() vc.setupVtctldClient() From 2f8ab0cd90d3dc83653f0162effc236d7770614e Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 28 Feb 2024 15:52:22 -0500 Subject: [PATCH 21/25] move this around Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/topo_process.go | 10 ++++++++++ go/test/endtoend/cluster/vtctl_process.go | 10 ---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index dfc09eda1dd..d5d5c8482a0 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -455,3 +455,13 @@ func TopoProcessInstance(port int, peerPort int, hostname string, flavor string, topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort) return topo } + +// TopoGlobalRoot returns the global root for the given topo flavor. +func TopoGlobalRoot(flavor string) string { + switch flavor { + case "consul": + return "global" + default: + return "/vitess/global" + } +} diff --git a/go/test/endtoend/cluster/vtctl_process.go b/go/test/endtoend/cluster/vtctl_process.go index c0274800a29..b9d8a5b46ce 100644 --- a/go/test/endtoend/cluster/vtctl_process.go +++ b/go/test/endtoend/cluster/vtctl_process.go @@ -111,16 +111,6 @@ func (vtctl *VtctlProcess) ExecuteCommand(args ...string) (err error) { return tmpProcess.Run() } -// TopoGlobalRoot returns the global root for the given topo flavor. -func TopoGlobalRoot(flavor string) string { - switch flavor { - case "consul": - return "global" - default: - return "/vitess/global" - } -} - // VtctlProcessInstance returns a VtctlProcess handle for vtctl process // configured with the given Config. // The process must be manually started by calling setup() From eb5acd0a1456b95c924bf49a1946c37520b3d9ff Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sat, 2 Mar 2024 06:42:21 -0500 Subject: [PATCH 22/25] more GetSrvKeyspace Signed-off-by: Andrew Mason --- go/test/endtoend/cluster/cluster_util.go | 18 +++++------------- go/test/endtoend/keyspace/keyspace_test.go | 22 +++++++++------------- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_util.go b/go/test/endtoend/cluster/cluster_util.go index 9fcefba3892..5d7869a421e 100644 --- a/go/test/endtoend/cluster/cluster_util.go +++ b/go/test/endtoend/cluster/cluster_util.go @@ -36,7 +36,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/vtgate/vtgateconn" @@ -360,7 +359,11 @@ func GetPasswordUpdateSQL(localCluster *LocalProcessCluster) string { // CheckSrvKeyspace confirms that the cell and keyspace contain the expected // shard mappings. func CheckSrvKeyspace(t *testing.T, cell string, ksname string, expectedPartition map[topodatapb.TabletType][]string, ci LocalProcessCluster) { - srvKeyspace := GetSrvKeyspace(t, cell, ksname, ci) + srvKeyspaces, err := ci.VtctldClientProcess.GetSrvKeyspaces(ksname, cell) + require.NoError(t, err) + + srvKeyspace := srvKeyspaces[cell] + require.NotNil(t, srvKeyspace, "srvKeyspace is nil for %s", cell) currentPartition := map[topodatapb.TabletType][]string{} @@ -374,17 +377,6 @@ func CheckSrvKeyspace(t *testing.T, cell string, ksname string, expectedPartitio assert.True(t, reflect.DeepEqual(currentPartition, expectedPartition)) } -// GetSrvKeyspace returns the SrvKeyspace structure for the cell and keyspace. -func GetSrvKeyspace(t *testing.T, cell string, ksname string, ci LocalProcessCluster) *topodatapb.SrvKeyspace { - output, err := ci.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, ksname) - require.Nil(t, err) - var srvKeyspace topodatapb.SrvKeyspace - - err = json2.Unmarshal([]byte(output), &srvKeyspace) - require.Nil(t, err) - return &srvKeyspace -} - // ExecuteOnTablet executes a query on the specified vttablet. // It should always be called with a primary tablet for a keyspace/shard. func ExecuteOnTablet(t *testing.T, query string, vttablet Vttablet, ks string, expectFail bool) { diff --git a/go/test/endtoend/keyspace/keyspace_test.go b/go/test/endtoend/keyspace/keyspace_test.go index 96f069a209e..2a665c66214 100644 --- a/go/test/endtoend/keyspace/keyspace_test.go +++ b/go/test/endtoend/keyspace/keyspace_test.go @@ -18,7 +18,6 @@ package sequence import ( "encoding/binary" - "encoding/json" "flag" "os" "testing" @@ -211,12 +210,9 @@ func TestGetSrvKeyspacePartitions(t *testing.T) { func TestShardNames(t *testing.T) { defer cluster.PanicHandler(t) - output, err := clusterForKSTest.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, keyspaceShardedName) - require.Nil(t, err) - var srvKeyspace topodatapb.SrvKeyspace - - err = json.Unmarshal([]byte(output), &srvKeyspace) - require.Nil(t, err) + output, err := clusterForKSTest.VtctldClientProcess.GetSrvKeyspaces(keyspaceShardedName, cell) + require.NoError(t, err) + require.NotNil(t, output[cell], "no srvkeyspace for cell %s", cell) } func TestGetKeyspace(t *testing.T) { @@ -430,11 +426,11 @@ func packKeyspaceID(keyspaceID uint64) []byte { } func getSrvKeyspace(t *testing.T, cell string, ksname string) *topodatapb.SrvKeyspace { - output, err := clusterForKSTest.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, ksname) - require.Nil(t, err) - var srvKeyspace topodatapb.SrvKeyspace + output, err := clusterForKSTest.VtctldClientProcess.GetSrvKeyspaces(ksname, cell) + require.NoError(t, err) - err = json.Unmarshal([]byte(output), &srvKeyspace) - require.Nil(t, err) - return &srvKeyspace + srvKeyspace := output[cell] + require.NotNil(t, srvKeyspace, "no srvkeyspace for cell %s", cell) + + return srvKeyspace } From ffaa7dcb4afcc63e499501ceadc8de9117a4b214 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 3 Mar 2024 06:54:28 -0500 Subject: [PATCH 23/25] update test usage for query execs Signed-off-by: Andrew Mason --- go/test/endtoend/clustertest/vtctld_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/clustertest/vtctld_test.go b/go/test/endtoend/clustertest/vtctld_test.go index c1b341ccd73..3a27fdbf735 100644 --- a/go/test/endtoend/clustertest/vtctld_test.go +++ b/go/test/endtoend/clustertest/vtctld_test.go @@ -164,7 +164,7 @@ func testExecuteAsDba(t *testing.T) { } for _, tcase := range tcases { t.Run(tcase.query, func(t *testing.T) { - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query) + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query) if tcase.expectErr { assert.Error(t, err) } else { @@ -176,7 +176,7 @@ func testExecuteAsDba(t *testing.T) { } func testExecuteAsApp(t *testing.T) { - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`) + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`) require.NoError(t, err) assert.Equal(t, result, oneTableOutput) } From 10596589afaa90d440ec93f20b0b05bfcaca666a Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 3 Mar 2024 06:58:47 -0500 Subject: [PATCH 24/25] vtctldclient Workflow show output Signed-off-by: Andrew Mason --- .../recovery/pitr/shardedpitr_test.go | 21 ++++++++++++------- .../buffer/reshard/sharded_buffer_test.go | 18 +++++++++------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/go/test/endtoend/recovery/pitr/shardedpitr_test.go b/go/test/endtoend/recovery/pitr/shardedpitr_test.go index 0aed6573337..03fcf76b07c 100644 --- a/go/test/endtoend/recovery/pitr/shardedpitr_test.go +++ b/go/test/endtoend/recovery/pitr/shardedpitr_test.go @@ -22,18 +22,21 @@ import ( "os" "os/exec" "path" + "strings" "testing" "time" - "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/log" + + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) var ( @@ -305,7 +308,7 @@ func performResharding(t *testing.T) { shard0Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, sidecar.DefaultName, waitTimeout) shard1Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, sidecar.DefaultName, waitTimeout) - waitForNoWorkflowLag(t, clusterInstance, "ks.reshardWorkflow") + waitForNoWorkflowLag(t, clusterInstance, "ks", "reshardWorkflow") err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "SwitchTraffic", "--tablet-types=rdonly", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") require.NoError(t, err) @@ -573,22 +576,26 @@ func launchRecoveryTablet(t *testing.T, tablet *cluster.Vttablet, binlogServer * // waitForNoWorkflowLag waits for the VReplication workflow's MaxVReplicationTransactionLag // value to be 0. -func waitForNoWorkflowLag(t *testing.T, vc *cluster.LocalProcessCluster, ksWorkflow string) { - lag := int64(0) +func waitForNoWorkflowLag(t *testing.T, vc *cluster.LocalProcessCluster, ks string, workflow string) { + var lag int64 timer := time.NewTimer(defaultTimeout) defer timer.Stop() for { - output, err := vc.VtctlclientProcess.ExecuteCommandWithOutput("Workflow", "--", ksWorkflow, "show") + output, err := vc.VtctldClientProcess.ExecuteCommandWithOutput("Workflow", "--keyspace", ks, "show", "--workflow", workflow) require.NoError(t, err) - lag, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag") + + var resp vtctldatapb.GetWorkflowsResponse + err = json2.Unmarshal([]byte(output), &resp) require.NoError(t, err) + require.GreaterOrEqual(t, len(resp.Workflows), 1, "responce should have at least one workflow") + lag = resp.Workflows[0].MaxVReplicationTransactionLag if lag == 0 { return } select { case <-timer.C: require.FailNow(t, fmt.Sprintf("workflow %q did not eliminate VReplication lag before the timeout of %s; last seen MaxVReplicationTransactionLag: %d", - ksWorkflow, defaultTimeout, lag)) + strings.Join([]string{ks, workflow}, "."), defaultTimeout, lag)) default: time.Sleep(defaultTick) } diff --git a/go/test/endtoend/tabletgateway/buffer/reshard/sharded_buffer_test.go b/go/test/endtoend/tabletgateway/buffer/reshard/sharded_buffer_test.go index d58d8901165..5e439cc9fff 100644 --- a/go/test/endtoend/tabletgateway/buffer/reshard/sharded_buffer_test.go +++ b/go/test/endtoend/tabletgateway/buffer/reshard/sharded_buffer_test.go @@ -21,16 +21,15 @@ import ( "testing" "time" - "github.com/buger/jsonparser" - - "vitess.io/vitess/go/vt/log" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/tabletgateway/buffer" + "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/test/endtoend/cluster" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) const ( @@ -43,11 +42,16 @@ func waitForLowLag(t *testing.T, clusterInstance *cluster.LocalProcessCluster, k waitDuration := 500 * time.Millisecond duration := maxWait for duration > 0 { - output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", keyspace, workflow), "Show") + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("Workflow", "--keyspace", keyspace, "show", "--workflow", workflow) require.NoError(t, err) - lagSeconds, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag") + var resp vtctldatapb.GetWorkflowsResponse + err = json2.Unmarshal([]byte(output), &resp) require.NoError(t, err) + require.GreaterOrEqual(t, len(resp.Workflows), 1, "responce should have at least one workflow") + lagSeconds := resp.Workflows[0].MaxVReplicationTransactionLag + + require.NoError(t, err, output) if lagSeconds <= acceptableLagSeconds { log.Infof("waitForLowLag acceptable for workflow %s, keyspace %s, current lag is %d", workflow, keyspace, lagSeconds) break From f283cf0c94b59b3b706dd808d9b7e02ad25a4bec Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 3 Mar 2024 09:54:53 -0500 Subject: [PATCH 25/25] vtctldclient GetTablets Signed-off-by: Andrew Mason --- go/test/endtoend/clustertest/vtctld_test.go | 12 +++++++----- .../endtoend/topoconncache/topo_conn_cache_test.go | 6 +++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/clustertest/vtctld_test.go b/go/test/endtoend/clustertest/vtctld_test.go index 3a27fdbf735..bb1bcdf2237 100644 --- a/go/test/endtoend/clustertest/vtctld_test.go +++ b/go/test/endtoend/clustertest/vtctld_test.go @@ -84,7 +84,7 @@ func testTopoDataAPI(t *testing.T, url string) { func testListAllTablets(t *testing.T) { // first w/o any filters, aside from cell - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets", clusterInstance.Cell) + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets", "--cell", clusterInstance.Cell) require.NoError(t, err) tablets := getAllTablets() @@ -102,10 +102,12 @@ func testListAllTablets(t *testing.T) { // now filtering with the first keyspace and tablet type of primary, in // addition to the cell - result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput( - "ListAllTablets", "--", "--keyspace", clusterInstance.Keyspaces[0].Name, - "--tablet_type", "primary", - clusterInstance.Cell) + result, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( + "GetTablets", + "--keyspace", clusterInstance.Keyspaces[0].Name, + "--tablet-type", "primary", + "--cell", clusterInstance.Cell, + ) require.NoError(t, err) // We should only return a single primary tablet per shard in the first keyspace diff --git a/go/test/endtoend/topoconncache/topo_conn_cache_test.go b/go/test/endtoend/topoconncache/topo_conn_cache_test.go index 4ffcc309e29..082ecc5717f 100644 --- a/go/test/endtoend/topoconncache/topo_conn_cache_test.go +++ b/go/test/endtoend/topoconncache/topo_conn_cache_test.go @@ -51,7 +51,7 @@ func TestVtctldListAllTablets(t *testing.T) { func testListAllTablets(t *testing.T) { // first w/o any filters, aside from cell - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets") + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets") require.NoError(t, err) tablets := getAllTablets() @@ -84,7 +84,7 @@ func deleteCell(t *testing.T) { clusterInstance.Keyspaces[0].Shards = []cluster.Shard{shard1, shard2} // Now list all tablets - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets") + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets") require.NoError(t, err) tablets := getAllTablets() @@ -184,7 +184,7 @@ func addCellback(t *testing.T) { shard2.Vttablets = append(shard2.Vttablets, shard2Replica) shard2.Vttablets = append(shard2.Vttablets, shard1Rdonly) - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets") + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets") require.NoError(t, err) tablets := getAllTablets()