Skip to content

Commit

Permalink
Support settings changes with atomic transactions (#16974)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Oct 18, 2024
1 parent a0f8cde commit e881b9f
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 78 deletions.
6 changes: 3 additions & 3 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func TestMain(m *testing.M) {
"--twopc_enable",
"--twopc_abandon_age", "1",
"--migration_check_interval", "2s",
"--onterm_timeout", "1s",
"--onclose_timeout", "1s",
)

// Start keyspace
Expand Down Expand Up @@ -123,8 +125,6 @@ func start(t *testing.T) (*mysql.Conn, func()) {

func cleanup(t *testing.T) {
cluster.PanicHandler(t)

utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert")
utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update")
utils.ClearOutTable(t, vtParams, "twopc_t1")
utils.ClearOutTable(t, vtParams, "twopc_settings")
}
15 changes: 3 additions & 12 deletions go/test/endtoend/transaction/twopc/stress/schema.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
create table twopc_fuzzer_update (
create table twopc_t1 (
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;

create table twopc_fuzzer_insert (
id bigint,
updateSet bigint,
threadId bigint,
col bigint auto_increment,
key(col),
primary key (id, col)
) Engine=InnoDB;

create table twopc_t1 (
create table twopc_settings (
id bigint,
col bigint,
col varchar(50),
primary key (id)
) Engine=InnoDB;
161 changes: 139 additions & 22 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,109 @@ import (
"vitess.io/vitess/go/vt/schema"
)

var (
// idVals are the primary key values to use while creating insert queries that ensures all the three shards get an insert.
idVals = [3]int{
4, // 4 maps to 0x20 and ends up in the first shard (-40)
6, // 6 maps to 0x60 and ends up in the second shard (40-80)
9, // 9 maps to 0x90 and ends up in the third shard (80-)
}
)

func TestSettings(t *testing.T) {
testcases := []struct {
name string
commitDelayTime string
queries []string
verifyFunc func(t *testing.T, vtParams *mysql.ConnParams)
}{
{
name: "No settings changes",
commitDelayTime: "5",
queries: append([]string{"begin"}, getMultiShardInsertQueries()...),
verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) {
// There is nothing to verify.
},
},
{
name: "Settings changes before begin",
commitDelayTime: "5",
queries: append(
append([]string{`set @@time_zone="+10:30"`, "begin"}, getMultiShardInsertQueries()...),
"insert into twopc_settings(id, col) values(9, now())"),
verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) {
// We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone.
ctx := context.Background()
conn, err := mysql.Connect(ctx, vtParams)
require.NoError(t, err)
defer conn.Close()
utils.Exec(t, conn, `set @@time_zone="+7:00"`)
utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),now()))`, `[[INT64(3)]]`)
},
},
{
name: "Settings changes during transaction",
commitDelayTime: "5",
queries: append(
append([]string{"begin"}, getMultiShardInsertQueries()...),
`set @@time_zone="+10:30"`,
"insert into twopc_settings(id, col) values(9, now())"),
verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) {
// We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone.
ctx := context.Background()
conn, err := mysql.Connect(ctx, vtParams)
require.NoError(t, err)
defer conn.Close()
utils.Exec(t, conn, `set @@time_zone="+7:00"`)
utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),now()))`, `[[INT64(3)]]`)
},
},
{
name: "Settings changes before begin and during transaction",
commitDelayTime: "5",
queries: append(
append([]string{`set @@time_zone="+10:30"`, "begin"}, getMultiShardInsertQueries()...),
"insert into twopc_settings(id, col) values(9, now())",
`set @@time_zone="+7:00"`,
"insert into twopc_settings(id, col) values(25, now())"),
verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) {
// We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone.
ctx := context.Background()
conn, err := mysql.Connect(ctx, vtParams)
require.NoError(t, err)
defer conn.Close()
utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),(select col from twopc_settings where id = 25)))`, `[[INT64(3)]]`)
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
// Reparent all the shards to first tablet being the primary.
reparentToFirstTablet(t)
// cleanup all the old data.
conn, closer := start(t)
defer closer()
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
var wg sync.WaitGroup
runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, tt.queries)
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
// Run the vttablet restart to ensure that the transaction needs to be redone.
err := vttabletRestartShard3(t)
require.NoError(t, err)
// Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error.
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
wg.Wait()
// Wair for the data in the table to see that the transaction was committed.
twopcutil.WaitForResults(t, &vtParams, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second)
tt.verifyFunc(t, &vtParams)
})
}

}

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
Expand Down Expand Up @@ -112,30 +215,10 @@ func TestDisruptions(t *testing.T) {
// cleanup all the old data.
conn, closer := start(t)
defer closer()
// Start an atomic transaction.
utils.Exec(t, conn, "begin")
// Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits
// it is very easy to figure out what value will end up in which shard.
idVals := []int{4, 6, 9}
for _, val := range idVals {
utils.Exec(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val))
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-")
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, tt.commitDelayTime)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, append([]string{"begin"}, getMultiShardInsertQueries()...))
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
writeCtx, writeCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -167,6 +250,38 @@ func TestDisruptions(t *testing.T) {
}
}

// getMultiShardInsertQueries gets the queries that will cause one insert on all the shards.
func getMultiShardInsertQueries() []string {
var queries []string
// Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits
// it is very easy to figure out what value will end up in which shard.
for _, val := range idVals {
queries = append(queries, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val))
}
return queries
}

// runMultiShardCommitWithDelay runs a multi shard commit and configures it to wait for a certain amount of time in the commit phase.
func runMultiShardCommitWithDelay(t *testing.T, conn *mysql.Conn, commitDelayTime string, wg *sync.WaitGroup, queries []string) {
// Run all the queries to start the transaction.
for _, query := range queries {
utils.Exec(t, conn, query)
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-")
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, commitDelayTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
}

func mergeShards(t *testing.T) error {
return twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-80,80-", "40-")
}
Expand Down Expand Up @@ -234,7 +349,9 @@ func ersShard3(t *testing.T) error {
func vttabletRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
tablet := shard.Vttablets[0]
return tablet.RestartOnlyTablet()
_ = tablet.VttabletProcess.TearDownWithTimeout(2 * time.Second)
tablet.VttabletProcess.ServingStatus = "SERVING"
return tablet.VttabletProcess.Setup()
}

// mysqlRestartShard3 restarts MySQL on the first tablet of the third shard.
Expand Down
12 changes: 2 additions & 10 deletions go/test/endtoend/transaction/twopc/stress/vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,15 @@
}
},
"tables": {
"twopc_fuzzer_update": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"twopc_fuzzer_insert": {
"twopc_t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"twopc_t1": {
"twopc_settings": {
"column_vindexes": [
{
"column": "id",
Expand Down
21 changes: 15 additions & 6 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestDTCommit(t *testing.T) {
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo')")
utils.Exec(t, conn, "insert into twopc_user(id, name) values(8,'bar')")
utils.Exec(t, conn, `set @@time_zone="+10:30"`)
utils.Exec(t, conn, "insert into twopc_user(id, name) values(9,'baz')")
utils.Exec(t, conn, "insert into twopc_user(id, name) values(10,'apa')")
utils.Exec(t, conn, "commit")
Expand Down Expand Up @@ -89,12 +90,16 @@ func TestDTCommit(t *testing.T) {
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:-40": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"set @@time_zone = '+10:30'\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"set @@time_zone = '+10:30'\")]",
},
"ks.twopc_user:-40": {
`insert:[INT64(10) VARCHAR("apa")]`,
Expand Down Expand Up @@ -132,8 +137,10 @@ func TestDTCommit(t *testing.T) {
"delete:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]",
"insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"insert:[VARCHAR(\"dtid-2\") INT64(2) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"delete:[VARCHAR(\"dtid-2\") INT64(2) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]",
},
"ks.twopc_user:40-80": {"update:[INT64(8) VARCHAR(\"newfoo\")]"},
"ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"},
Expand Down Expand Up @@ -163,8 +170,10 @@ func TestDTCommit(t *testing.T) {
"delete:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:-40": {
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]",
},
"ks.twopc_user:-40": {"delete:[INT64(10) VARCHAR(\"apa\")]"},
"ks.twopc_user:80-": {"delete:[INT64(9) VARCHAR(\"baz\")]"},
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
// WriteTestCommunicationFile writes the content to the file with the given name.
// We use these files to coordinate with the vttablets running in the debug mode.
func WriteTestCommunicationFile(t *testing.T, fileName string, content string) {
// Delete the file just to make sure it doesn't exist before we write to it.
DeleteFile(fileName)
err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644)
require.NoError(t, err)
}
Expand Down
Loading

0 comments on commit e881b9f

Please sign in to comment.