Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Enable VPlayerBatching in unit tests #17339

Merged
merged 5 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) {

expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab1(id,val) values (1,_binary'a')",
"insert into tab1(id,val) values (2,_binary'b')",
"insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')",
"/update _vt.vreplication set pos=",
"commit",
}, pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) {
resetBinlogClient()

vttablet.InitVReplicationConfigDefaults()
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the flags were disabled for all of the unit tests and I did not notice it previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want this PR to be able to stand on its own, I could change this line to vttablet.DefaultVReplicationConfig.ExperimentalFlags = 7 rather than removing it.


// Engines cannot be initialized in testenv because it introduces circular dependencies.
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0])
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,14 @@ func testPlayerCopyBigTable(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

// The test is written to match the behavior w/o
// VReplicationExperimentalFlagOptimizeInserts enabled.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
Expand Down Expand Up @@ -814,6 +822,14 @@ func testPlayerCopyWildcardRule(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

// The test is written to match the behavior w/o
// VReplicationExperimentalFlagOptimizeInserts enabled.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
Expand Down
21 changes: 12 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,19 @@ func (vc *vdbClient) Begin() error {
if vc.InTransaction {
return nil
}
if err := vc.DBClient.Begin(); err != nil {
return err
if vc.maxBatchSize == 0 {
// We're not batching so we BEGIN the transaction here.
if err := vc.DBClient.Begin(); err != nil {
return err
}
} else {
// If we're batching, we batch the contents of the
// transaction, which starts with the BEGIN and ends with
// the COMMIT, so we do not send a BEGIN down the wire
// ahead of time.
vc.queriesPos = int64(len(vc.queries))
vc.batchSize = 6 // begin and semicolon
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}

// If we're batching, we only batch the contents of the
// transaction, which starts with the begin and ends with
// the commit.
vc.queriesPos = int64(len(vc.queries))
vc.batchSize = 6 // begin and semicolon

vc.queries = append(vc.queries, "begin")
vc.InTransaction = true
vc.startTime = time.Now()
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
return vr.dbClient.Commit()
}
batchMode := false
if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
// We only do batching in the running/replicating phase.
if len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
batchMode = true
}
if batchMode {
Expand Down
31 changes: 21 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) {

// It does not work when filter is enabled
output := qh.Expect(
"begin",
"rollback",
fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg),
)
Expand Down Expand Up @@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) {
input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')",
output: qh.Expect(
"begin",
"insert into dst4(id1,val) values (1,_binary'aaa')",
"insert into dst4(id1,val) values (3,_binary'ccc')",
"insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')",
"/update _vt.vreplication set pos=",
"commit",
),
Expand All @@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) {
input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')",
output: qh.Expect(
"begin",
"insert into dst5(id1,val) values (1,_binary'abc')",
"insert into dst5(id1,val) values (4,_binary'abc')",
"insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')",
"/update _vt.vreplication set pos=",
"commit",
),
Expand Down Expand Up @@ -1495,17 +1492,15 @@ func TestPlayerRowMove(t *testing.T) {
})
expectDBClientQueries(t, qh.Expect(
"begin",
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"/update _vt.vreplication set pos=",
"commit",
))
expectData(t, "dst", [][]string{
{"1", "1", "1"},
{"2", "5", "2"},
})
validateQueryCountStat(t, "replicate", 3)
validateQueryCountStat(t, "replicate", 1)

execStatements(t, []string{
"update src set val1=1, val2=4 where id=3",
Expand All @@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) {
{"1", "5", "2"},
{"2", "2", "1"},
})
validateQueryCountStat(t, "replicate", 5)
validateQueryCountStat(t, "replicate", 3)
}

func TestPlayerTypes(t *testing.T) {
Expand Down Expand Up @@ -2179,6 +2174,14 @@ func TestPlayerSplitTransaction(t *testing.T) {
func TestPlayerLockErrors(t *testing.T) {
defer deleteTablet(addTablet(100))

// The immediate retry behavior does not apply when doing
// VPlayer Batching.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

execStatements(t, []string{
"create table t1(id int, val varchar(128), primary key(id))",
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
Expand Down Expand Up @@ -2258,6 +2261,14 @@ func TestPlayerLockErrors(t *testing.T) {
func TestPlayerCancelOnLock(t *testing.T) {
defer deleteTablet(addTablet(100))

// The immediate retry behavior does not apply when doing
// VPlayer Batching.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

execStatements(t, []string{
"create table t1(id int, val varchar(128), primary key(id))",
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,14 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
}
vr.stats.State.Store(state.String())
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id)
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
// If we're batching and in a transaction, then include the state update
// in the current transaction batch.
if vr.dbClient.InTransaction && vr.dbClient.maxBatchSize > 0 {
vr.dbClient.queries = append(vr.dbClient.queries, query)
} else { // Otherwise, send it down the wire
mattlord marked this conversation as resolved.
Show resolved Hide resolved
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
}
if state == vr.state {
return nil
Expand Down
Loading