Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: retrieve unresolved transactions #16356

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rpc: added unresolved transactions api to vttablet
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
harshit-gangal committed Jul 9, 2024
commit 119d4cf01094424fe2248b6caaecdfb385f77592
6 changes: 6 additions & 0 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
@@ -553,6 +553,12 @@ func (itc *internalTabletConn) ReadTransaction(ctx context.Context, target *quer
return metadata, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// UnresolvedTransactions is part of queryservice.QueryService
func (itc *internalTabletConn) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
transactions, err = itc.tablet.qsc.QueryService().UnresolvedTransactions(ctx, target)
return transactions, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// BeginExecute is part of queryservice.QueryService
func (itc *internalTabletConn) BeginExecute(
ctx context.Context,
15 changes: 15 additions & 0 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
@@ -233,6 +233,21 @@ func (q *query) ReadTransaction(ctx context.Context, request *querypb.ReadTransa
return &querypb.ReadTransactionResponse{Metadata: result}, nil
}

// UnresolvedTransactions is part of the queryservice.QueryServer interface
func (q *query) UnresolvedTransactions(ctx context.Context, request *querypb.UnresolvedTransactionsRequest) (response *querypb.UnresolvedTransactionsResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx),
request.EffectiveCallerId,
request.ImmediateCallerId,
)
transactions, err := q.server.UnresolvedTransactions(ctx, request.Target)
if err != nil {
return nil, vterrors.ToGRPC(err)
}

return &querypb.UnresolvedTransactionsResponse{Transactions: transactions}, nil
}

// BeginExecute is part of the queryservice.QueryServer interface
func (q *query) BeginExecute(ctx context.Context, request *querypb.BeginExecuteRequest) (response *querypb.BeginExecuteResponse, err error) {
defer q.server.HandlePanic(&err)
20 changes: 20 additions & 0 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
@@ -438,6 +438,26 @@ func (conn *gRPCQueryClient) ReadTransaction(ctx context.Context, target *queryp
return response.Metadata, nil
}

// UnresolvedTransactions returns all unresolved distributed transactions.
func (conn *gRPCQueryClient) UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return nil, tabletconn.ConnClosed
}

req := &querypb.UnresolvedTransactionsRequest{
Target: target,
EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx),
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
}
response, err := conn.c.UnresolvedTransactions(ctx, req)
if err != nil {
return nil, tabletconn.ErrorFromGRPC(err)
}
return response.Transactions, nil
}

// BeginExecute starts a transaction and runs an Execute.
func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (state queryservice.TransactionState, result *sqltypes.Result, err error) {
conn.mu.RLock()
3 changes: 3 additions & 0 deletions go/vt/vttablet/queryservice/queryservice.go
Original file line number Diff line number Diff line change
@@ -76,6 +76,9 @@ type QueryService interface {
// ReadTransaction returns the metadata for the specified dtid.
ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error)

// UnresolvedTransactions returns the list of unresolved distributed transactions.
UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error)

// Execute for query execution
Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, error)
// StreamExecute for query execution with streaming
9 changes: 9 additions & 0 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
@@ -176,6 +176,15 @@ func (ws *wrappedService) ReadTransaction(ctx context.Context, target *querypb.T
return metadata, err
}

func (ws *wrappedService) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
err = ws.wrapper(ctx, target, ws.impl, "UnresolvedTransactions", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
transactions, innerErr = conn.UnresolvedTransactions(ctx, target)
return canRetry(ctx, innerErr), innerErr
})
return transactions, err
}

func (ws *wrappedService) Execute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (qr *sqltypes.Result, err error) {
inDedicatedConn := transactionID != 0 || reservedID != 0
err = ws.wrapper(ctx, target, ws.impl, "Execute", inDedicatedConn, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
6 changes: 6 additions & 0 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
@@ -427,6 +427,12 @@ func (sbc *SandboxConn) ReadTransaction(ctx context.Context, target *querypb.Tar
return nil, nil
}

// UnresolvedTransactions is part of the QueryService interface.
func (sbc *SandboxConn) UnresolvedTransactions(context.Context, *querypb.Target) ([]*querypb.TransactionMetadata, error) {
// TODO implement me
panic("implement me")
}

// BeginExecute is part of the QueryService interface.
func (sbc *SandboxConn) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (queryservice.TransactionState, *sqltypes.Result, error) {
sbc.panicIfNeeded()
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
@@ -359,6 +359,18 @@ func (f *FakeQueryService) ReadTransaction(ctx context.Context, target *querypb.
return Metadata, nil
}

// UnresolvedTransactions is part of the queryservice.QueryService interface
func (f *FakeQueryService) UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error) {
if f.HasError {
return nil, f.TabletError
}
if f.Panics {
panic(fmt.Errorf("test-triggered panic"))
}
f.checkTargetCallerID(ctx, "UnresolvedTransactions", target)
return []*querypb.TransactionMetadata{Metadata}, nil
}

// ExecuteQuery is a fake test query.
const ExecuteQuery = "executeQuery"

30 changes: 30 additions & 0 deletions go/vt/vttablet/tabletconntest/tabletconntest.go
Original file line number Diff line number Diff line change
@@ -399,6 +399,33 @@ func testReadTransactionPanics(t *testing.T, conn queryservice.QueryService, f *
})
}

func testUnresolvedTransactions(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactions")
ctx := context.Background()
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
transactions, err := conn.UnresolvedTransactions(ctx, TestTarget)
require.NoError(t, err)
require.True(t, proto.Equal(transactions[0], Metadata))
}

func testUnresolvedTransactionsError(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactionsError")
f.HasError = true
testErrorHelper(t, f, "UnresolvedTransactions", func(ctx context.Context) error {
_, err := conn.UnresolvedTransactions(ctx, TestTarget)
return err
})
f.HasError = false
}

func testUnresolvedTransactionsPanics(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactionsPanics")
testPanicHelper(t, f, "UnresolvedTransactions", func(ctx context.Context) error {
_, err := conn.UnresolvedTransactions(ctx, TestTarget)
return err
})
}

func testExecute(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testExecute")
f.ExpectedTransactionID = ExecuteTransactionID
@@ -936,6 +963,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollback,
testConcludeTransaction,
testReadTransaction,
testUnresolvedTransactions,
testExecute,
testBeginExecute,
testStreamExecute,
@@ -956,6 +984,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollbackError,
testConcludeTransactionError,
testReadTransactionError,
testUnresolvedTransactionsError,
testExecuteError,
testBeginExecuteErrorInBegin,
testBeginExecuteErrorInExecute,
@@ -979,6 +1008,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollbackPanics,
testConcludeTransactionPanics,
testReadTransactionPanics,
testUnresolvedTransactionsPanics,
testExecutePanics,
testBeginExecutePanics,
testStreamExecutePanics,
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
@@ -289,6 +289,7 @@ func (dte *DTExecutor) inTransaction(f func(*StatefulConnection) error) error {
return nil
}

// UnresolvedTransactions returns the list of unresolved distributed transactions.
func (dte *DTExecutor) UnresolvedTransactions() ([]*querypb.TransactionMetadata, error) {
return dte.te.twoPC.UnresolvedTransactions(dte.ctx, time.Now().Add(-dte.te.abandonAge))
}
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
@@ -746,15 +746,15 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta
return metadata, err
}

// UnresolvedTransaction returns the unresolved distributed transaction record.
func (tsv *TabletServer) UnresolvedTransaction(ctx context.Context, target *querypb.Target) (tmList []*querypb.TransactionMetadata, err error) {
// UnresolvedTransactions returns the unresolved distributed transaction record.
func (tsv *TabletServer) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
err = tsv.execRequest(
ctx, tsv.loadQueryTimeout(),
"UnresolvedTransaction", "unresolved_transaction", nil,
"UnresolvedTransactions", "unresolved_transaction", nil,
target, nil, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
tmList, err = txe.UnresolvedTransactions()
transactions, err = txe.UnresolvedTransactions()
return err
},
)