Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[e2e] vtctld init tablet and some output-based commands #15297

Merged
merged 25 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func firstBackupTest(t *testing.T, tabletType string) {
mysqlctl.CompressionEngineName = "lz4"
defer func() { mysqlctl.CompressionEngineName = "pgzip" }()
// now bring up the other replica, letting it restore from backup.
err = localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shardName)
err = localCluster.InitTablet(replica2, keyspaceName, shardName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK InitTablet is deprecated now for some time and thus not implemented in vtctldclient side. Can we take this opportunity to replace its usage? If not, does that mean InitTablet still has a valid use case and should be migrated to vtctldclient as well? Otherwise we're not actually getting rid of the usage, we're just moving it. It does have the nice benefit, however, of only having one place to change later on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as i understand it, InitTablet as a user command is deprecated and (deliberately) not implemented in the new client, but it's still relevant for creating tablet records without actually starting a tablet process which is useful only for tests I wonder if we could use CreateTablet instead like tabletmanager does?

require.Nil(t, err)
restore(t, replica2, "replica", "SERVING")
// Replica2 takes time to serve. Sleeping for 5 sec.
Expand Down Expand Up @@ -266,7 +266,7 @@ func removeBackups(t *testing.T) {
func initTablets(t *testing.T, startTablet bool, initShardPrimary bool) {
// Initialize tablets
for _, tablet := range []cluster.Vttablet{*primary, *replica1} {
err := localCluster.VtctlclientProcess.InitTablet(&tablet, cell, keyspaceName, hostname, shardName)
err := localCluster.InitTablet(&tablet, keyspaceName, shardName)
require.Nil(t, err)

if startTablet {
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
replica2 = shard.Vttablets[2]
replica3 = shard.Vttablets[3]

if err := localCluster.VtctlclientProcess.InitTablet(primary, cell, keyspaceName, hostname, shard.Name); err != nil {
if err := localCluster.InitTablet(primary, keyspaceName, shard.Name); err != nil {
return 1, err
}
if err := localCluster.VtctlclientProcess.InitTablet(replica1, cell, keyspaceName, hostname, shard.Name); err != nil {
if err := localCluster.InitTablet(replica1, keyspaceName, shard.Name); err != nil {
return 1, err
}
if err := localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shard.Name); err != nil {
if err := localCluster.InitTablet(replica2, keyspaceName, shard.Name); err != nil {
return 1, err
}
vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory)
Expand Down Expand Up @@ -449,7 +449,7 @@ func primaryBackup(t *testing.T) {
}()
verifyInitialReplication(t)

output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("Backup", primary.Alias)
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", primary.Alias)
require.Error(t, err)
assert.Contains(t, output, "type PRIMARY cannot take backup. if you really need to do this, rerun the backup command with --allow_primary")

Expand Down Expand Up @@ -746,7 +746,7 @@ func restartPrimaryAndReplica(t *testing.T) {
proc.Wait()
}
for _, tablet := range []*cluster.Vttablet{primary, replica1} {
err := localCluster.VtctlclientProcess.InitTablet(tablet, cell, keyspaceName, hostname, shardName)
err := localCluster.InitTablet(tablet, keyspaceName, shardName)
require.Nil(t, err)
err = tablet.VttabletProcess.Setup()
require.Nil(t, err)
Expand Down
64 changes: 41 additions & 23 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
Expand Down Expand Up @@ -319,6 +318,44 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
return nil
}

// InitTablet initializes a tablet record in the topo server. It does not start the tablet process.
func (cluster *LocalProcessCluster) InitTablet(tablet *Vttablet, keyspace string, shard string) error {
tabletpb := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: tablet.Cell,
Uid: uint32(tablet.TabletUID),
},
Hostname: cluster.Hostname,
Type: topodatapb.TabletType_REPLICA,
PortMap: map[string]int32{
"vt": int32(tablet.HTTPPort),
},
Keyspace: keyspace,
Shard: shard,
}

switch tablet.Type {
case "rdonly":
tabletpb.Type = topodatapb.TabletType_RDONLY
case "primary":
tabletpb.Type = topodatapb.TabletType_PRIMARY
}

if tablet.MySQLPort > 0 {
tabletpb.PortMap["mysql"] = int32(tablet.MySQLPort)
}

if tablet.GrpcPort > 0 {
tabletpb.PortMap["grpc"] = int32(tablet.GrpcPort)
}

allowPrimaryOverride := false
createShardAndKeyspace := true
allowUpdate := true

return cluster.TopoProcess.Server.InitTablet(context.Background(), tabletpb, allowPrimaryOverride, createShardAndKeyspace, allowUpdate)
}

// StartKeyspace starts required number of shard and the corresponding tablets
// keyspace : struct containing keyspace name, Sqlschema to apply, VSchema to apply
// shardName : list of shard names
Expand Down Expand Up @@ -856,7 +893,7 @@ func (cluster *LocalProcessCluster) ExecOnTablet(ctx context.Context, vttablet *
return nil, err
}

tablet, err := cluster.VtctlclientGetTablet(vttablet)
tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -899,7 +936,7 @@ func (cluster *LocalProcessCluster) ExecOnVTGate(ctx context.Context, addr strin
// returns the responses. It returns an error if the stream ends with fewer than
// `count` responses.
func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vttablet *Vttablet, count int) (responses []*querypb.StreamHealthResponse, err error) {
tablet, err := cluster.VtctlclientGetTablet(vttablet)
tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -934,7 +971,7 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta
// StreamTabletHealthUntil invokes a HealthStream on a local cluster Vttablet and
// returns the responses. It waits until a certain condition is met. The amount of time to wait is an input that it takes.
func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, vttablet *Vttablet, timeout time.Duration, condition func(shr *querypb.StreamHealthResponse) bool) error {
tablet, err := cluster.VtctlclientGetTablet(vttablet)
tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias)
if err != nil {
return err
}
Expand Down Expand Up @@ -971,25 +1008,6 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context,
return err
}

func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) {
result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias)
if err != nil {
return nil, err
}

var ti topodatapb.Tablet
if err := json2.Unmarshal([]byte(result), &ti); err != nil {
return nil, err
}

return &ti, nil
}

func (cluster *LocalProcessCluster) VtctlclientChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error {
_, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("ChangeTabletType", "--", tablet.Alias, tabletType.String())
return err
}

// Teardown brings down the cluster by invoking teardown for individual processes
func (cluster *LocalProcessCluster) Teardown() {
PanicHandler(nil)
Expand Down
18 changes: 5 additions & 13 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

Expand Down Expand Up @@ -360,7 +359,11 @@ func GetPasswordUpdateSQL(localCluster *LocalProcessCluster) string {
// CheckSrvKeyspace confirms that the cell and keyspace contain the expected
// shard mappings.
func CheckSrvKeyspace(t *testing.T, cell string, ksname string, expectedPartition map[topodatapb.TabletType][]string, ci LocalProcessCluster) {
srvKeyspace := GetSrvKeyspace(t, cell, ksname, ci)
srvKeyspaces, err := ci.VtctldClientProcess.GetSrvKeyspaces(ksname, cell)
require.NoError(t, err)

srvKeyspace := srvKeyspaces[cell]
require.NotNil(t, srvKeyspace, "srvKeyspace is nil for %s", cell)

currentPartition := map[topodatapb.TabletType][]string{}

Expand All @@ -374,17 +377,6 @@ func CheckSrvKeyspace(t *testing.T, cell string, ksname string, expectedPartitio
assert.True(t, reflect.DeepEqual(currentPartition, expectedPartition))
}

// GetSrvKeyspace returns the SrvKeyspace structure for the cell and keyspace.
func GetSrvKeyspace(t *testing.T, cell string, ksname string, ci LocalProcessCluster) *topodatapb.SrvKeyspace {
output, err := ci.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, ksname)
require.Nil(t, err)
var srvKeyspace topodatapb.SrvKeyspace

err = json2.Unmarshal([]byte(output), &srvKeyspace)
require.Nil(t, err)
return &srvKeyspace
}

// ExecuteOnTablet executes a query on the specified vttablet.
// It should always be called with a primary tablet for a keyspace/shard.
func ExecuteOnTablet(t *testing.T, query string, vttablet Vttablet, ks string, expectFail bool) {
Expand Down
34 changes: 31 additions & 3 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (

"vitess.io/vitess/go/vt/log"
vtopo "vitess.io/vitess/go/vt/topo"

// Register topo server implementations
_ "vitess.io/vitess/go/vt/topo/consultopo"
_ "vitess.io/vitess/go/vt/topo/etcd2topo"
_ "vitess.io/vitess/go/vt/topo/zk2topo"
)

// TopoProcess is a generic handle for a running Topo service .
Expand All @@ -51,6 +56,7 @@ type TopoProcess struct {
PeerURL string
ZKPorts string
Client interface{}
Server *vtopo.Server

proc *exec.Cmd
exit chan error
Expand All @@ -60,15 +66,22 @@ type TopoProcess struct {
func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) {
switch topoFlavor {
case "zk2":
return topo.SetupZookeeper(cluster)
err = topo.SetupZookeeper(cluster)
case "consul":
return topo.SetupConsul(cluster)
err = topo.SetupConsul(cluster)
default:
// Override any inherited ETCDCTL_API env value to
// ensure that we use the v3 API and storage.
os.Setenv("ETCDCTL_API", "3")
return topo.SetupEtcd()
err = topo.SetupEtcd()
}

if err != nil {
return
}

topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), TopoGlobalRoot(topoFlavor))
return
}

// SetupEtcd spawns a new etcd service and initializes it with the defaults.
Expand Down Expand Up @@ -289,6 +302,11 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {

// TearDown shutdowns the running topo service.
func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error {
if topo.Server != nil {
topo.Server.Close()
topo.Server = nil
}

if topo.Client != nil {
switch cli := topo.Client.(type) {
case *clientv3.Client:
Expand Down Expand Up @@ -437,3 +455,13 @@ func TopoProcessInstance(port int, peerPort int, hostname string, flavor string,
topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort)
return topo
}

// TopoGlobalRoot returns the global root for the given topo flavor.
func TopoGlobalRoot(flavor string) string {
switch flavor {
case "consul":
return "global"
default:
return "/vitess/global"
}
}
4 changes: 1 addition & 3 deletions go/test/endtoend/cluster/vtctl_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess {

// Default values for etcd2 topo server.
topoImplementation := "etcd2"
topoGlobalRoot := "/vitess/global"
topoRootPath := "/"

// Checking and resetting the parameters for required topo server.
Expand All @@ -127,7 +126,6 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess {
topoImplementation = "zk2"
case "consul":
topoImplementation = "consul"
topoGlobalRoot = "global"
// For consul we do not need "/" in the path
topoRootPath = ""
}
Expand All @@ -142,7 +140,7 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess {
Binary: "vtctl",
TopoImplementation: topoImplementation,
TopoGlobalAddress: fmt.Sprintf("%s:%d", hostname, topoPort),
TopoGlobalRoot: topoGlobalRoot,
TopoGlobalRoot: TopoGlobalRoot(*topoFlavor),
TopoServerAddress: fmt.Sprintf("%s:%d", hostname, topoPort),
TopoRootPath: topoRootPath,
VtctlMajorVersion: version,
Expand Down
22 changes: 22 additions & 0 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,28 @@ func (vtctldclient *VtctldClientProcess) ApplyVSchema(keyspace string, json stri
)
}

// ChangeTabletType changes the type of the given tablet.
func (vtctldclient *VtctldClientProcess) ChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error {
return vtctldclient.ExecuteCommand(
"ChangeTabletType",
tablet.Alias,
tabletType.String(),
)
}

// GetShardReplication returns a mapping of cell to shard replication for the given keyspace and shard.
func (vtctldclient *VtctldClientProcess) GetShardReplication(keyspace string, shard string, cells ...string) (map[string]*topodatapb.ShardReplication, error) {
args := append([]string{"GetShardReplication", keyspace + "/" + shard}, cells...)
out, err := vtctldclient.ExecuteCommandWithOutput(args...)
if err != nil {
return nil, err
}

var resp vtctldatapb.GetShardReplicationResponse
err = json2.Unmarshal([]byte(out), &resp)
return resp.ShardReplicationByCell, err
}

// GetSrvKeyspaces returns a mapping of cell to srv keyspace for the given keyspace.
func (vtctldclient *VtctldClientProcess) GetSrvKeyspaces(keyspace string, cells ...string) (ksMap map[string]*topodatapb.SrvKeyspace, err error) {
args := append([]string{"GetSrvKeyspaces", keyspace}, cells...)
Expand Down
16 changes: 9 additions & 7 deletions go/test/endtoend/clustertest/vtctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func testTopoDataAPI(t *testing.T, url string) {

func testListAllTablets(t *testing.T) {
// first w/o any filters, aside from cell
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets", clusterInstance.Cell)
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets", "--cell", clusterInstance.Cell)
require.NoError(t, err)

tablets := getAllTablets()
Expand All @@ -102,10 +102,12 @@ func testListAllTablets(t *testing.T) {

// now filtering with the first keyspace and tablet type of primary, in
// addition to the cell
result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput(
"ListAllTablets", "--", "--keyspace", clusterInstance.Keyspaces[0].Name,
"--tablet_type", "primary",
clusterInstance.Cell)
result, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"GetTablets",
"--keyspace", clusterInstance.Keyspaces[0].Name,
"--tablet-type", "primary",
"--cell", clusterInstance.Cell,
)
require.NoError(t, err)

// We should only return a single primary tablet per shard in the first keyspace
Expand Down Expand Up @@ -164,7 +166,7 @@ func testExecuteAsDba(t *testing.T) {
}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
if tcase.expectErr {
assert.Error(t, err)
} else {
Expand All @@ -176,7 +178,7 @@ func testExecuteAsDba(t *testing.T) {
}

func testExecuteAsApp(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
}
Expand Down
Loading
Loading