Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify distributed transaction commit flow #16468

Merged
merged 7 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test: warning and transaction status testing
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Jul 25, 2024
commit bec8da64969f5996b328abac3505bd49bc30f1dc
115 changes: 77 additions & 38 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/callerid"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// TestDTCommit tests distributed transaction commit for insert, update and delete operations
Expand Down Expand Up @@ -580,6 +582,10 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After MM commit")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during metadata manager commit; transaction will be committed/roll based on the state on recovery",
false, "COMMIT", "ks:40-80,ks:-40")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -656,6 +662,10 @@ func TestDTResolveAfterRMPrepare(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After RM prepared")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery",
true /* transaction concluded */, "", "")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -714,6 +724,10 @@ func TestDTResolveDuringRMPrepare(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail During RM prepare")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction prepare phase; prepare transaction rollback attempted; conclude on recovery",
true, "", "")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -776,6 +790,10 @@ func TestDTResolveDuringRMCommit(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail During RM commit")

testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during resource manager commit; transaction will be committed on recovery",
false, "COMMIT", "ks:40-80,ks:-40")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
Expand Down Expand Up @@ -851,18 +869,9 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) {
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After TR created")

t.Run("ReadTransactionState", func(t *testing.T) {
errStr := err.Error()
indx := strings.Index(errStr, "Fail")
require.Greater(t, indx, 0)
dtid := errStr[0 : indx-2]
res, err := conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil)
require.NoError(t, err)
resStr := fmt.Sprintf("%v", res.Rows)
require.Contains(t, resStr, `[[VARCHAR("ks:80-`)
require.Contains(t, resStr, `VARCHAR("PREPARE") DATETIME("`)
require.Contains(t, resStr, `+0000 UTC") VARCHAR("ks:40-80")]]`)
})
testWarningAndTransactionStatus(t, conn,
"distributed transaction ID failed during transaction record creation; rollback attempted; conclude on recovery",
false, "PREPARE", "ks:40-80")

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
tableMap := make(map[string][]*querypb.Field)
Expand All @@ -883,36 +892,66 @@ func TestDTResolveAfterTransactionRecord(t *testing.T) {
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
}

// TestDTWarningAfterMMCommit tests that failure after MM commit returns a warning.
func TestDTWarningAfterMMCommit(t *testing.T) {
defer cleanup(t)
type warn struct {
level string
code uint16
msg string
}

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
defer vtgateConn.Close()
func toWarn(row sqltypes.Row) warn {
code, _ := row[1].ToUint16()
return warn{
level: row[0].ToString(),
code: code,
msg: row[2].ToString(),
}
}

conn := vtgateConn.Session("", nil)
qCtx, cancel := context.WithCancel(context.Background())
defer cancel()
// Insert into multiple shards
_, err = conn.Execute(qCtx, "begin", nil)
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil)
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil)
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(9,'baz')", nil)
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil)
type txStatus struct {
dtid string
state string
rTime string
participants string
}

func toTxStatus(row sqltypes.Row) txStatus {
return txStatus{
dtid: row[0].ToString(),
state: row[1].ToString(),
rTime: row[2].ToString(),
participants: row[3].ToString(),
}
}

func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSession, warnMsg string,
txConcluded bool, txState string, txParticipants string) {
t.Helper()

qr, err := conn.Execute(context.Background(), "show warnings", nil)
require.NoError(t, err)
require.Len(t, qr.Rows, 1)

// The caller ID is used to simulate the failure at the desired point.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil)
_, err = conn.Execute(newCtx, "commit", nil)
require.ErrorContains(t, err, "Fail After MM commit")
// validate warning output
w := toWarn(qr.Rows[0])
assert.Equal(t, "Warning", w.level)
assert.EqualValues(t, 302, w.code)
assert.Contains(t, w.msg, warnMsg)

// extract transaction ID
indx := strings.Index(w.msg, " ")
require.Greater(t, indx, 0)
dtid := w.msg[:indx]

qr, err := conn.Execute(qCtx, "show warnings", nil)
qr, err = conn.Execute(context.Background(), fmt.Sprintf(`show transaction status for '%v'`, dtid), nil)
require.NoError(t, err)
require.Contains(t, fmt.Sprintf("%v", qr.Rows), `[[VARCHAR("Warning") UINT16(302) VARCHAR("ks:80-:`)
require.Contains(t, fmt.Sprintf("%v", qr.Rows), `distributed transaction ID failed during metadata manager commit`)

// validate transaction status
if txConcluded {
require.Empty(t, qr.Rows)
} else {
tx := toTxStatus(qr.Rows[0])
assert.Equal(t, dtid, tx.dtid)
assert.Equal(t, txState, tx.state)
assert.Equal(t, txParticipants, tx.participants)
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/transaction_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t *TransactionStatus) TryExecute(ctx context.Context, vcursor VCursor, bin
if wantfields {
res.Fields = t.getFields()
}
if transactionState != nil {
if transactionState != nil && transactionState.Dtid != "" {
var participantString []string
for _, participant := range transactionState.Participants {
participantString = append(participantString, fmt.Sprintf("%s:%s", participant.Keyspace, participant.Shard))
Expand Down
Loading