Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic Transactions handling with Online DDL #16585

Merged
merged 14 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
"--twopc_abandon_age", "1",
"--migration_check_interval", "2s",
)

// Start keyspace
Expand Down
102 changes: 91 additions & 11 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,25 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
)

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
disruptionName string
commitDelayTime string
disruption func() error
disruption func(t *testing.T) error
}{
{
disruptionName: "No Disruption",
commitDelayTime: "1",
disruption: func() error {
disruption: func(t *testing.T) error {
return nil
},
},
Expand All @@ -68,6 +71,11 @@ func TestDisruptions(t *testing.T) {
commitDelayTime: "5",
disruption: vttabletRestartShard3,
},
{
disruptionName: "OnlineDDL",
commitDelayTime: "20",
disruption: onlineDDL,
},
{
disruptionName: "EmergencyReparentShard",
commitDelayTime: "5",
Expand Down Expand Up @@ -119,7 +127,7 @@ func TestDisruptions(t *testing.T) {
}()
}
// Run the disruption.
err := tt.disruption()
err := tt.disruption(t)
require.NoError(t, err)
// Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error.
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
Expand All @@ -145,6 +153,7 @@ func threadToWrite(t *testing.T, ctx context.Context, id int) {
continue
}
_, _ = utils.ExecAllowError(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, %d)", id, rand.Intn(10000)))
conn.Close()
}
}

Expand All @@ -170,11 +179,13 @@ func waitForResults(t *testing.T, query string, resultExpected string, waitTime
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err == nil {
res := utils.Exec(t, conn, query)
res, _ := utils.ExecAllowError(t, conn, query)
conn.Close()
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
if res != nil {
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
}
}
}
time.Sleep(100 * time.Millisecond)
Expand All @@ -187,29 +198,29 @@ Cluster Level Disruptions for the fuzzer
*/

// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func prsShard3() error {
func prsShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias)
}

// ersShard3 runs a ERS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func ersShard3() error {
func ersShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias)
return err
}

// vttabletRestartShard3 restarts the first vttablet of the third shard.
func vttabletRestartShard3() error {
func vttabletRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
tablet := shard.Vttablets[0]
return tablet.RestartOnlyTablet()
}

// mysqlRestartShard3 restarts MySQL on the first tablet of the third shard.
func mysqlRestartShard3() error {
func mysqlRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
vttablets := shard.Vttablets
tablet := vttablets[0]
Expand All @@ -227,3 +238,72 @@ func mysqlRestartShard3() error {
}
return syscallutil.Kill(pid, syscall.SIGKILL)
}

var randomDDL = []string{
"alter table twopc_t1 add column extra_col1 varchar(20)",
"alter table twopc_t1 add column extra_col2 varchar(20)",
"alter table twopc_t1 add column extra_col3 varchar(20)",
"alter table twopc_t1 add column extra_col4 varchar(20)",
}

var count = 0

// onlineDDL runs a DDL statement.
func onlineDDL(t *testing.T) error {
output, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(keyspaceName, randomDDL[count], cluster.ApplySchemaParams{
DDLStrategy: "vitess --force-cut-over-after=1ms",
})
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
}
query := fmt.Sprintf("show vitess_migrations like '%s'", uuid)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for {
countMatchedShards := 0
conn, err := mysql.Connect(ctx, vtParams)
if err != nil {
continue
}
r := utils.Exec(t, conn, query)
for _, row := range r.Named().Rows {
shardName := row["shard"].ToString()
if !shardNames[shardName] {
// irrelevant shard
continue
}
lastKnownStatus = row["migration_status"].ToString()
if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
return
}
_, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false)
conn.Close()
if err != nil {
fmt.Printf("Error in cleanup deletion - %v\n", err)
conn.Close()
time.Sleep(100 * time.Millisecond)
continue
}
Expand Down
21 changes: 21 additions & 0 deletions go/vt/sqlparser/ast_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2818,3 +2818,24 @@ func (lock Lock) GetHighestOrderLock(newLock Lock) Lock {
func Clone[K SQLNode](x K) K {
return CloneSQLNode(x).(K)
}

// ExtractAllTables returns all the table names in the SQLNode
func ExtractAllTables(node SQLNode) []string {
var tables []string
tableMap := make(map[string]any)
_ = Walk(func(node SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *AliasedTableExpr:
if tblName, ok := node.Expr.(TableName); ok {
name := String(tblName)
if _, exists := tableMap[name]; !exists {
tableMap[name] = nil
tables = append(tables, name)
}
return false, nil
}
}
return true, nil
}, node)
return tables
}
47 changes: 47 additions & 0 deletions go/vt/sqlparser/ast_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,50 @@ func TestColumns_Indexes(t *testing.T) {
})
}
}

// TestExtractTables verifies the functionality of extracting all the tables from the SQLNode.
func TestExtractTables(t *testing.T) {
tcases := []struct {
sql string
expected []string
}{{
sql: "select 1 from a",
expected: []string{"a"},
}, {
sql: "select 1 from a, b",
expected: []string{"a", "b"},
}, {
sql: "select 1 from a join b on a.id = b.id",
expected: []string{"a", "b"},
}, {
sql: "select 1 from a join b on a.id = b.id join c on b.id = c.id",
expected: []string{"a", "b", "c"},
}, {
sql: "select 1 from a join (select id from b) as c on a.id = c.id",
expected: []string{"a", "b"},
}, {
sql: "(select 1 from a) union (select 1 from b)",
expected: []string{"a", "b"},
}, {
sql: "select 1 from a where exists (select 1 from (select id from c) b where a.id = b.id)",
expected: []string{"a", "c"},
}, {
sql: "select 1 from k.a join k.b on a.id = b.id",
expected: []string{"k.a", "k.b"},
}, {
sql: "select 1 from k.a join l.a on k.a.id = l.a.id",
expected: []string{"k.a", "l.a"},
}, {
sql: "select 1 from a join (select id from a) as c on a.id = c.id",
expected: []string{"a"},
}}
parser := NewTestParser()
for _, tcase := range tcases {
t.Run(tcase.sql, func(t *testing.T) {
stmt, err := parser.Parse(tcase.sql)
require.NoError(t, err)
tables := ExtractAllTables(stmt)
require.Equal(t, tcase.expected, tables)
})
}
}
13 changes: 12 additions & 1 deletion go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type Executor struct {
ts *topo.Server
lagThrottler *throttle.Throttler
toggleBufferTableFunc func(cancelCtx context.Context, tableName string, timeout time.Duration, bufferQueries bool)
IsPreparedPoolEmpty func(tableName string) bool
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
requestGCChecksFunc func()
tabletAlias *topodatapb.TabletAlias

Expand Down Expand Up @@ -238,6 +239,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top
tabletTypeFunc func() topodatapb.TabletType,
toggleBufferTableFunc func(cancelCtx context.Context, tableName string, timeout time.Duration, bufferQueries bool),
requestGCChecksFunc func(),
isPreparedPoolEmpty func(tableName string) bool,
) *Executor {
// sanitize flags
if maxConcurrentOnlineDDLs < 1 {
Expand All @@ -255,6 +257,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top
ts: ts,
lagThrottler: lagThrottler,
toggleBufferTableFunc: toggleBufferTableFunc,
IsPreparedPoolEmpty: isPreparedPoolEmpty,
requestGCChecksFunc: requestGCChecksFunc,
ticks: timer.NewTimer(migrationCheckInterval),
// Gracefully return an error if any caller tries to execute
Expand Down Expand Up @@ -870,7 +873,10 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
threadId := row.AsInt64("trx_mysql_thread_id", 0)
log.Infof("killTableLockHoldersAndAccessors: killing connection %v with transaction on table", threadId)
killConnection := fmt.Sprintf("KILL %d", threadId)
_, _ = conn.Conn.ExecuteFetch(killConnection, 1, false)
_, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
if err != nil {
log.Errorf("Unable to kill the connection %d: %v", threadId, err)
}
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -1102,6 +1108,11 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
time.Sleep(100 * time.Millisecond)

if shouldForceCutOver {
// We should only proceed with forceful cut over if there is no pending atomic transaction for the table.
// This will help in keeping the atomicity guarantee of a prepared transaction.
if !e.IsPreparedPoolEmpty(onlineDDL.Table) {
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot force cut-over on non-empty prepared pool for table: %s", onlineDDL.Table)
}
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil {
return err
}
Expand Down
Loading
Loading