Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making Reshard work smoothly with Atomic Transactions #16844

Merged
merged 10 commits into from
Oct 3, 2024
236 changes: 122 additions & 114 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,103 +378,9 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
// Create the keyspace if it doesn't already exist.
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
for _, shardName := range shardNames {
shard := &Shard{
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
var mysqlctlProcessList []*exec.Cmd
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
tablet := &Vttablet{
TabletUID: tabletUID,
Type: "replica",
HTTPPort: cluster.GetAndReservePort(),
GrpcPort: cluster.GetAndReservePort(),
MySQLPort: cluster.GetAndReservePort(),
Alias: fmt.Sprintf("%s-%010d", cluster.Cell, tabletUID),
}
if i == 0 { // Make the first one as primary
tablet.Type = "primary"
} else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed
tablet.Type = "rdonly"
}
// Start Mysqlctl process
log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort)
mysqlctlProcess, err := MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory, !cluster.ReusingVTDATAROOT)
if err != nil {
return err
}
switch tablet.Type {
case "primary":
mysqlctlProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
case "replica":
mysqlctlProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
}
tablet.MysqlctlProcess = *mysqlctlProcess
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
return err
}
mysqlctlProcessList = append(mysqlctlProcessList, proc)

// start vttablet process
tablet.VttabletProcess = VttabletProcessInstance(
tablet.HTTPPort,
tablet.GrpcPort,
tablet.TabletUID,
cluster.Cell,
shardName,
keyspace.Name,
cluster.VtctldProcess.Port,
tablet.Type,
cluster.TopoProcess.Port,
cluster.Hostname,
cluster.TmpDirectory,
cluster.VtTabletExtraArgs,
cluster.DefaultCharset)
switch tablet.Type {
case "primary":
tablet.VttabletProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
case "replica":
tablet.VttabletProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
}
tablet.Alias = tablet.VttabletProcess.TabletPath
if cluster.ReusingVTDATAROOT {
tablet.VttabletProcess.ServingStatus = "SERVING"
}
shard.Vttablets = append(shard.Vttablets, tablet)
// Apply customizations
for _, customizer := range customizers {
if f, ok := customizer.(func(*VttabletProcess)); ok {
f(tablet.VttabletProcess)
} else {
return fmt.Errorf("type mismatch on customizer: %T", customizer)
}
}
}

// wait till all mysqlctl is instantiated
for _, proc := range mysqlctlProcessList {
if err = proc.Wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err)
return err
}
}
for _, tablet := range shard.Vttablets {
log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)

if err = tablet.VttabletProcess.Setup(); err != nil {
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
return
}
}

// Make first tablet as primary
if err = cluster.VtctldClientProcess.InitializeShard(keyspace.Name, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspace.Name, shardName, err)
return
shard, err := cluster.AddShard(keyspace.Name, shardName, totalTabletsRequired, rdonly, customizers)
if err != nil {
return err
}
keyspace.Shards = append(keyspace.Shards, *shard)
}
Expand All @@ -488,33 +394,135 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
}
if !existingKeyspace {
cluster.Keyspaces = append(cluster.Keyspaces, keyspace)
}

// Apply Schema SQL
if keyspace.SchemaSQL != "" {
if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
return
// Apply Schema SQL
if keyspace.SchemaSQL != "" {
if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
return
}
}

// Apply VSchema
if keyspace.VSchema != "" {
if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
return
}
}

log.Infof("Done creating keyspace: %v ", keyspace.Name)

err = cluster.StartVTOrc(keyspace.Name)
if err != nil {
log.Errorf("Error starting VTOrc - %v", err)
return err
}
}

// Apply VSchema
if keyspace.VSchema != "" {
if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
return
return
}

func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName string, totalTabletsRequired int, rdonly bool, customizers []any) (*Shard, error) {
shard := &Shard{
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
var mysqlctlProcessList []*exec.Cmd
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
tablet := &Vttablet{
TabletUID: tabletUID,
Type: "replica",
HTTPPort: cluster.GetAndReservePort(),
GrpcPort: cluster.GetAndReservePort(),
MySQLPort: cluster.GetAndReservePort(),
Alias: fmt.Sprintf("%s-%010d", cluster.Cell, tabletUID),
}
if i == 0 { // Make the first one as primary
tablet.Type = "primary"
} else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed
tablet.Type = "rdonly"
}
// Start Mysqlctl process
log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort)
mysqlctlProcess, err := MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory, !cluster.ReusingVTDATAROOT)
if err != nil {
return nil, err
}
switch tablet.Type {
case "primary":
mysqlctlProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
case "replica":
mysqlctlProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
}
tablet.MysqlctlProcess = *mysqlctlProcess
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
return nil, err
}
mysqlctlProcessList = append(mysqlctlProcessList, proc)

// start vttablet process
tablet.VttabletProcess = VttabletProcessInstance(
tablet.HTTPPort,
tablet.GrpcPort,
tablet.TabletUID,
cluster.Cell,
shardName,
keyspaceName,
cluster.VtctldProcess.Port,
tablet.Type,
cluster.TopoProcess.Port,
cluster.Hostname,
cluster.TmpDirectory,
cluster.VtTabletExtraArgs,
cluster.DefaultCharset)
switch tablet.Type {
case "primary":
tablet.VttabletProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
case "replica":
tablet.VttabletProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
}
tablet.Alias = tablet.VttabletProcess.TabletPath
if cluster.ReusingVTDATAROOT {
tablet.VttabletProcess.ServingStatus = "SERVING"
}
shard.Vttablets = append(shard.Vttablets, tablet)
// Apply customizations
for _, customizer := range customizers {
if f, ok := customizer.(func(*VttabletProcess)); ok {
f(tablet.VttabletProcess)
} else {
return nil, fmt.Errorf("type mismatch on customizer: %T", customizer)
}
}
}

log.Infof("Done creating keyspace: %v ", keyspace.Name)
// wait till all mysqlctl is instantiated
for _, proc := range mysqlctlProcessList {
if err := proc.Wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err)
return nil, err
}
}
for _, tablet := range shard.Vttablets {
log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)

err = cluster.StartVTOrc(keyspace.Name)
if err != nil {
log.Errorf("Error starting VTOrc - %v", err)
return err
if err := tablet.VttabletProcess.Setup(); err != nil {
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
return nil, err
}
}

return
// Make first tablet as primary
if err := cluster.VtctldClientProcess.InitializeShard(keyspaceName, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspaceName, shardName, err)
return nil, err
}
return shard, nil
}

// StartUnshardedKeyspaceLegacy starts unshared keyspace with shard name as "0"
Expand Down
102 changes: 102 additions & 0 deletions go/test/endtoend/cluster/reshard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"fmt"
"slices"
"strings"
"testing"
"time"
)

// ReshardWorkflow is used to store the information needed to run
// Reshard commands.
type ReshardWorkflow struct {
t *testing.T
clusterInstance *LocalProcessCluster
workflowName string
targetKs string
sourceShards string
targetShards string
}

// NewReshard creates a new ReshardWorkflow.
func NewReshard(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, targetShards, srcShards string) *ReshardWorkflow {
return &ReshardWorkflow{
t: t,
clusterInstance: clusterInstance,
workflowName: workflowName,
targetKs: targetKs,
sourceShards: srcShards,
targetShards: targetShards,
}
}

func (rw *ReshardWorkflow) Create() (string, error) {
args := []string{"Create"}
if rw.sourceShards != "" {
args = append(args, "--source-shards="+rw.sourceShards)
}
if rw.targetShards != "" {
args = append(args, "--target-shards="+rw.targetShards)
}

return rw.exec(args...)
}

func (rw *ReshardWorkflow) exec(args ...string) (string, error) {
args2 := []string{"Reshard", "--workflow=" + rw.workflowName, "--target-keyspace=" + rw.targetKs}
args2 = append(args2, args...)
return rw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...)
}

func (rw *ReshardWorkflow) SwitchReadsAndWrites() (string, error) {
return rw.exec("SwitchTraffic")
}

func (rw *ReshardWorkflow) ReverseReadsAndWrites() (string, error) {
return rw.exec("ReverseTraffic")
}

func (rw *ReshardWorkflow) Cancel() (string, error) {
return rw.exec("Cancel")
}

func (rw *ReshardWorkflow) Complete() (string, error) {
return rw.exec("Complete")
}

func (rw *ReshardWorkflow) Show() (string, error) {
return rw.exec("Show")
}

func (rw *ReshardWorkflow) WaitForVreplCatchup(timeToWait time.Duration) {
targetShards := strings.Split(rw.targetShards, ",")
for _, ks := range rw.clusterInstance.Keyspaces {
if ks.Name != rw.targetKs {
continue
}
for _, shard := range ks.Shards {
if !slices.Contains(targetShards, shard.Name) {
continue
}
vttablet := shard.PrimaryTablet().VttabletProcess
vttablet.WaitForVReplicationToCatchup(rw.t, rw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", timeToWait)
}
}
}
Loading
Loading