Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support settings changes with atomic transactions #16974

Merged
6 changes: 3 additions & 3 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func TestMain(m *testing.M) {
"--twopc_enable",
"--twopc_abandon_age", "1",
"--migration_check_interval", "2s",
"--onterm_timeout", "1s",
"--onclose_timeout", "1s",
)

// Start keyspace
Expand Down Expand Up @@ -123,8 +125,6 @@ func start(t *testing.T) (*mysql.Conn, func()) {

func cleanup(t *testing.T) {
cluster.PanicHandler(t)

utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert")
utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update")
utils.ClearOutTable(t, vtParams, "twopc_t1")
utils.ClearOutTable(t, vtParams, "twopc_settings")
}
15 changes: 3 additions & 12 deletions go/test/endtoend/transaction/twopc/stress/schema.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
create table twopc_fuzzer_update (
create table twopc_t1 (
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;

create table twopc_fuzzer_insert (
id bigint,
updateSet bigint,
threadId bigint,
col bigint auto_increment,
key(col),
primary key (id, col)
) Engine=InnoDB;

create table twopc_t1 (
create table twopc_settings (
id bigint,
col bigint,
col varchar(50),
primary key (id)
) Engine=InnoDB;
161 changes: 139 additions & 22 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,109 @@ import (
"vitess.io/vitess/go/vt/schema"
)

var (
// idVals are the primary key values to use while creating insert queries that ensures all the three shards get an insert.
idVals = [3]int{
4, // 4 maps to 0x20 and ends up in the first shard (-40)
6, // 6 maps to 0x60 and ends up in the second shard (40-80)
9, // 9 maps to 0x90 and ends up in the third shard (80-)
}
)

func TestSettings(t *testing.T) {
testcases := []struct {
name string
commitDelayTime string
queries []string
verifyFunc func(t *testing.T, vtParams *mysql.ConnParams)
}{
{
name: "No settings changes",
commitDelayTime: "5",
queries: append([]string{"begin"}, getMultiShardInsertQueries()...),
verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) {
// There is nothing to verify.
},
},
{
name: "Settings changes before begin",
commitDelayTime: "5",
queries: append(
append([]string{`set @@time_zone="+10:30"`, "begin"}, getMultiShardInsertQueries()...),
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
"insert into twopc_settings(id, col) values(9, now())"),
verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) {
// We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone.
ctx := context.Background()
conn, err := mysql.Connect(ctx, vtParams)
require.NoError(t, err)
defer conn.Close()
utils.Exec(t, conn, `set @@time_zone="+7:00"`)
utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),now()))`, `[[INT64(3)]]`)
},
},
{
name: "Settings changes during transaction",
commitDelayTime: "5",
queries: append(
append([]string{"begin"}, getMultiShardInsertQueries()...),
`set @@time_zone="+10:30"`,
"insert into twopc_settings(id, col) values(9, now())"),
verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) {
// We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone.
ctx := context.Background()
conn, err := mysql.Connect(ctx, vtParams)
require.NoError(t, err)
defer conn.Close()
utils.Exec(t, conn, `set @@time_zone="+7:00"`)
utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),now()))`, `[[INT64(3)]]`)
},
},
{
name: "Settings changes before begin and during transaction",
commitDelayTime: "5",
queries: append(
append([]string{`set @@time_zone="+10:30"`, "begin"}, getMultiShardInsertQueries()...),
"insert into twopc_settings(id, col) values(9, now())",
`set @@time_zone="+7:00"`,
"insert into twopc_settings(id, col) values(25, now())"),
verifyFunc: func(t *testing.T, vtParams *mysql.ConnParams) {
// We can check that the time_zone setting was taken into account by checking the diff with the time by using a different time_zone.
ctx := context.Background()
conn, err := mysql.Connect(ctx, vtParams)
require.NoError(t, err)
defer conn.Close()
utils.AssertMatches(t, conn, `select HOUR(TIMEDIFF((select col from twopc_settings where id = 9),(select col from twopc_settings where id = 25)))`, `[[INT64(3)]]`)
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
// Reparent all the shards to first tablet being the primary.
reparentToFirstTablet(t)
// cleanup all the old data.
conn, closer := start(t)
defer closer()
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
var wg sync.WaitGroup
runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, tt.queries)
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
// Run the vttablet restart to ensure that the transaction needs to be redone.
err := vttabletRestartShard3(t)
require.NoError(t, err)
// Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error.
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
wg.Wait()
// Wair for the data in the table to see that the transaction was committed.
twopcutil.WaitForResults(t, &vtParams, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second)
tt.verifyFunc(t, &vtParams)
})
}

}

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
Expand Down Expand Up @@ -112,30 +215,10 @@ func TestDisruptions(t *testing.T) {
// cleanup all the old data.
conn, closer := start(t)
defer closer()
// Start an atomic transaction.
utils.Exec(t, conn, "begin")
// Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits
// it is very easy to figure out what value will end up in which shard.
idVals := []int{4, 6, 9}
for _, val := range idVals {
utils.Exec(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val))
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-")
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, tt.commitDelayTime)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, append([]string{"begin"}, getMultiShardInsertQueries()...))
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
writeCtx, writeCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -167,6 +250,38 @@ func TestDisruptions(t *testing.T) {
}
}

// getMultiShardInsertQueries gets the queries that will cause one insert on all the shards.
func getMultiShardInsertQueries() []string {
var queries []string
// Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits
// it is very easy to figure out what value will end up in which shard.
for _, val := range idVals {
queries = append(queries, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, 4)", val))
}
return queries
}

// runMultiShardCommitWithDelay runs a multi shard commit and configures it to wait for a certain amount of time in the commit phase.
func runMultiShardCommitWithDelay(t *testing.T, conn *mysql.Conn, commitDelayTime string, wg *sync.WaitGroup, queries []string) {
// Run all the queries to start the transaction.
for _, query := range queries {
utils.Exec(t, conn, query)
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-")
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, commitDelayTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
}

func mergeShards(t *testing.T) error {
return twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-80,80-", "40-")
}
Expand Down Expand Up @@ -234,7 +349,9 @@ func ersShard3(t *testing.T) error {
func vttabletRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
tablet := shard.Vttablets[0]
return tablet.RestartOnlyTablet()
_ = tablet.VttabletProcess.TearDownWithTimeout(2 * time.Second)
tablet.VttabletProcess.ServingStatus = "SERVING"
return tablet.VttabletProcess.Setup()
}

// mysqlRestartShard3 restarts MySQL on the first tablet of the third shard.
Expand Down
12 changes: 2 additions & 10 deletions go/test/endtoend/transaction/twopc/stress/vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,15 @@
}
},
"tables": {
"twopc_fuzzer_update": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"twopc_fuzzer_insert": {
"twopc_t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"twopc_t1": {
"twopc_settings": {
"column_vindexes": [
{
"column": "id",
Expand Down
21 changes: 15 additions & 6 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestDTCommit(t *testing.T) {
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo')")
utils.Exec(t, conn, "insert into twopc_user(id, name) values(8,'bar')")
utils.Exec(t, conn, `set @@time_zone="+10:30"`)
utils.Exec(t, conn, "insert into twopc_user(id, name) values(9,'baz')")
utils.Exec(t, conn, "insert into twopc_user(id, name) values(10,'apa')")
utils.Exec(t, conn, "commit")
Expand Down Expand Up @@ -89,12 +90,16 @@ func TestDTCommit(t *testing.T) {
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:-40": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"set @@time_zone = '+10:30'\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"set @@time_zone = '+10:30'\")]",
},
"ks.twopc_user:-40": {
`insert:[INT64(10) VARCHAR("apa")]`,
Expand Down Expand Up @@ -132,8 +137,10 @@ func TestDTCommit(t *testing.T) {
"delete:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]",
"insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"insert:[VARCHAR(\"dtid-2\") INT64(2) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"delete:[VARCHAR(\"dtid-2\") INT64(2) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]",
},
"ks.twopc_user:40-80": {"update:[INT64(8) VARCHAR(\"newfoo\")]"},
"ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"},
Expand Down Expand Up @@ -163,8 +170,10 @@ func TestDTCommit(t *testing.T) {
"delete:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:-40": {
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"set @@time_zone = '+10:30'\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]",
},
"ks.twopc_user:-40": {"delete:[INT64(10) VARCHAR(\"apa\")]"},
"ks.twopc_user:80-": {"delete:[INT64(9) VARCHAR(\"baz\")]"},
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
// WriteTestCommunicationFile writes the content to the file with the given name.
// We use these files to coordinate with the vttablets running in the debug mode.
func WriteTestCommunicationFile(t *testing.T, fileName string, content string) {
// Delete the file just to make sure it doesn't exist before we write to it.
DeleteFile(fileName)
err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644)
require.NoError(t, err)
}
Expand Down
Loading
Loading