Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reject atomic distributed transaction on savepoints and modified system settings #16835

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2780,6 +2780,76 @@ func TestExecutorPrepareExecute(t *testing.T) {
require.Error(t, err)
}

// TestExecutorRejectTwoPC test all the unsupported cases for multi-shard atomic commit.
func TestExecutorRejectTwoPC(t *testing.T) {
executor, sbc1, sbc2, _, ctx := createExecutorEnv(t)
tcases := []struct {
sqls []string
testRes []*sqltypes.Result

expErr string
}{
{
sqls: []string{
`set time_zone = "+08:00"`,
`insert into user_extra(user_id) values (1)`,
`insert into user_extra(user_id) values (2)`,
`insert into user_extra(user_id) values (3)`,
},
expErr: "VT12001: unsupported: atomic distributed transaction commit with system settings",
}, {
sqls: []string{
`update t1 set unq_col = 1 where id = 1`,
`update t1 set unq_col = 1 where id = 3`,
},
testRes: []*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|unq_col|unchanged", "int64|int64|int64"),
"1|2|0"),
},
expErr: "VT12001: unsupported: atomic distributed transaction commit with consistent lookup vindex",
}, {
sqls: []string{
`savepoint x`,
`insert into user_extra(user_id) values (1)`,
`insert into user_extra(user_id) values (3)`,
},
testRes: []*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|unq_col|unchanged", "int64|int64|int64"),
"1|2|0"),
},
expErr: "VT12001: unsupported: atomic distributed transaction commit with savepoint",
},
}

for _, tcase := range tcases {
t.Run(fmt.Sprintf("%v", tcase.sqls), func(t *testing.T) {
sbc1.SetResults(tcase.testRes)
sbc2.SetResults(tcase.testRes)

// create a new session
session := NewSafeSession(&vtgatepb.Session{
TargetString: KsTestSharded,
TransactionMode: vtgatepb.TransactionMode_TWOPC,
EnableSystemSettings: true,
})

// start transaction
_, err := executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, "begin", nil)
require.NoError(t, err)

// execute queries
for _, sql := range tcase.sqls {
_, err = executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, sql, nil)
require.NoError(t, err)
}

// commit 2pc
_, err = executor.Execute(ctx, nil, "TestExecutorRejectTwoPC", session, "commit", nil)
require.ErrorContains(t, err, tcase.expErr)
})
}
}

func TestExecutorTruncateErrors(t *testing.T) {
executor, _, _, _, ctx := createExecutorEnv(t)

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/legacy_scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,6 @@ func newTestScatterConn(ctx context.Context, hc discovery.HealthCheck, serv srvt
// in '-cells_to_watch' command line parameter, which is
// empty by default. So it's unused in this test, set to nil.
gw := NewTabletGateway(ctx, hc, serv, cell)
tc := NewTxConn(gw, vtgatepb.TransactionMode_TWOPC)
tc := NewTxConn(gw, vtgatepb.TransactionMode_MULTI)
return NewScatterConn("", tc, gw)
}
23 changes: 18 additions & 5 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,16 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error

// commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice
func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err error) {
if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 {
_ = txc.Rollback(ctx, session)
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "pre or post actions not allowed for 2PC commits")
}

// If the number of participants is one or less, then it's a normal commit.
if len(session.ShardSessions) <= 1 {
return txc.commitNormal(ctx, session)
}

if err := txc.checkValidCondition(session); err != nil {
_ = txc.Rollback(ctx, session)
return err
}

mmShard := session.ShardSessions[0]
rmShards := session.ShardSessions[1:]
dtid := dtids.New(mmShard)
Expand Down Expand Up @@ -276,6 +276,19 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err
return nil
}

func (txc *TxConn) checkValidCondition(session *SafeSession) error {
if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 {
return vterrors.VT12001("atomic distributed transaction commit with consistent lookup vindex")
}
if len(session.GetSavepoints()) != 0 {
return vterrors.VT12001("atomic distributed transaction commit with savepoint")
}
if session.GetInReservedConn() {
return vterrors.VT12001("atomic distributed transaction commit with system settings")
}
return nil
}

func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession, txPhase commitPhase, dtid string, mmShard *vtgatepb.Session_ShardSession, rmShards []*vtgatepb.Session_ShardSession) {
switch txPhase {
case Commit2pcCreateTransaction:
Expand Down
Loading