Skip to content

Commit

Permalink
execute savepoints after the begin method is complete in the tabletse…
Browse files Browse the repository at this point in the history
…rver

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Oct 7, 2024
1 parent 4c705eb commit e8ada40
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 110 deletions.
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (dte *DTExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, pre
}

func (dte *DTExecutor) inTransaction(f func(*StatefulConnection) error) error {
conn, _, _, err := dte.te.txPool.Begin(dte.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
conn, _, _, err := dte.te.txPool.Begin(dte.ctx, &querypb.ExecuteOptions{}, false, 0, nil)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
return nil, errTxThrottled
}

conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, qre.setting)

if err != nil {
return nil, err
Expand All @@ -249,7 +249,7 @@ func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*s
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, qre.setting)
if err != nil {
return nil, err
}
Expand Down
106 changes: 74 additions & 32 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti
func (tsv *TabletServer) begin(
ctx context.Context,
target *querypb.Target,
savepointQueries []string,
postBeginQueries []string,
reservedID int64,
settings []string,
options *querypb.ExecuteOptions,
Expand All @@ -535,12 +535,43 @@ func (tsv *TabletServer) begin(
return err
}
}
transactionID, beginSQL, sessionStateChanges, err := tsv.te.Begin(ctx, nil, reservedID, connSetting, options)
transactionID, beginSQL, sessionStateChanges, err := tsv.te.Begin(ctx, reservedID, connSetting, options)
state.TransactionID = transactionID
state.SessionStateChanges = sessionStateChanges
logStats.TransactionID = transactionID
logStats.ReservedID = reservedID

if err != nil {
return err
}

targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
for _, query := range postBeginQueries {
plan, err := tsv.qe.GetPlan(ctx, logStats, query, true)
if err != nil {
return err
}

qre := &QueryExecutor{
ctx: ctx,
query: query,
connID: transactionID,
options: options,
plan: plan,
logStats: logStats,
tsv: tsv,
targetTabletType: targetType,
setting: connSetting,
}
_, err = qre.Execute()
if err != nil {
return err
}
}

// Record the actual statements that were executed in the logStats.
// If nothing was actually executed, don't count the operation in
// the tablet metrics, and clear out the logStats Method so that
Expand All @@ -559,18 +590,6 @@ func (tsv *TabletServer) begin(
return err
},
)

if err != nil {
return
}

for _, savepointQuery := range savepointQueries {
_, err = tsv.execute(ctx, target, savepointQuery, nil, state.TransactionID, reservedID, settings, options)
if err != nil {
return
}
}

return
}

Expand Down Expand Up @@ -997,7 +1016,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ
}

// BeginExecute combines Begin and Execute.
func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (queryservice.TransactionState, *sqltypes.Result, error) {
func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Target, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (queryservice.TransactionState, *sqltypes.Result, error) {

// Disable hot row protection in case of reserve connection.
if tsv.enableHotRowProtection && reservedID == 0 {
Expand All @@ -1010,7 +1029,7 @@ func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Targe
}
}

state, err := tsv.begin(ctx, target, preQueries, reservedID, nil, options)
state, err := tsv.begin(ctx, target, postBeginQueries, reservedID, nil, options)
if err != nil {
return state, nil, err
}
Expand All @@ -1023,14 +1042,14 @@ func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Targe
func (tsv *TabletServer) BeginStreamExecute(
ctx context.Context,
target *querypb.Target,
preQueries []string,
postBeginQueries []string,
sql string,
bindVariables map[string]*querypb.BindVariable,
reservedID int64,
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (queryservice.TransactionState, error) {
state, err := tsv.begin(ctx, target, preQueries, reservedID, nil, options)
state, err := tsv.begin(ctx, target, postBeginQueries, reservedID, nil, options)
if err != nil {
return state, err
}
Expand Down Expand Up @@ -1256,8 +1275,8 @@ func (tsv *TabletServer) VStreamResults(ctx context.Context, target *querypb.Tar
}

// ReserveBeginExecute implements the QueryService interface
func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (state queryservice.ReservedTransactionState, result *sqltypes.Result, err error) {
state, result, err = tsv.beginExecuteWithSettings(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options)
func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *querypb.Target, settings []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (state queryservice.ReservedTransactionState, result *sqltypes.Result, err error) {
state, result, err = tsv.beginExecuteWithSettings(ctx, target, settings, postBeginQueries, sql, bindVariables, options)
// If there is an error and the error message is about allowing query in reserved connection only,
// then we do not return an error from here and continue to use the reserved connection path.
// This is specially for get_lock function call from vtgate that needs a reserved connection.
Expand Down Expand Up @@ -1285,12 +1304,35 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, settings)
logStats.TransactionID = connID
logStats.ReservedID = connID
if err != nil {
return err
}
logStats.TransactionID = connID
logStats.ReservedID = connID

for _, query := range postBeginQueries {
plan, err := tsv.qe.GetPlan(ctx, logStats, query, true)
if err != nil {
return err
}

qre := &QueryExecutor{
ctx: ctx,
query: query,
connID: connID,
options: options,
plan: plan,
logStats: logStats,
tsv: tsv,
targetTabletType: targetType,
}
_, err = qre.Execute()
if err != nil {
return err
}
}

return nil
},
)
Expand All @@ -1311,13 +1353,13 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
ctx context.Context,
target *querypb.Target,
settings []string,
savepointQueries []string,
postBeginQueries []string,
sql string,
bindVariables map[string]*querypb.BindVariable,
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (state queryservice.ReservedTransactionState, err error) {
txState, err := tsv.begin(ctx, target, savepointQueries, 0, settings, options)
txState, err := tsv.begin(ctx, target, postBeginQueries, 0, settings, options)
if err != nil {
return txToReserveState(txState), err
}
Expand All @@ -1327,9 +1369,9 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
}

// ReserveExecute implements the QueryService interface
func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (state queryservice.ReservedState, result *sqltypes.Result, err error) {
func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Target, settings []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (state queryservice.ReservedState, result *sqltypes.Result, err error) {

result, err = tsv.executeWithSettings(ctx, target, preQueries, sql, bindVariables, transactionID, options)
result, err = tsv.executeWithSettings(ctx, target, settings, sql, bindVariables, transactionID, options)
// If there is an error and the error message is about allowing query in reserved connection only,
// then we do not return an error from here and continue to use the reserved connection path.
// This is specially for get_lock function call from vtgate that needs a reserved connection.
Expand All @@ -1354,7 +1396,7 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries)
state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, settings)
if err != nil {
return err
}
Expand All @@ -1376,14 +1418,14 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar
func (tsv *TabletServer) ReserveStreamExecute(
ctx context.Context,
target *querypb.Target,
preQueries []string,
settings []string,
sql string,
bindVariables map[string]*querypb.BindVariable,
transactionID int64,
options *querypb.ExecuteOptions,
callback func(*sqltypes.Result) error,
) (state queryservice.ReservedState, err error) {
return state, tsv.streamExecute(ctx, target, sql, bindVariables, transactionID, 0, preQueries, options, callback)
return state, tsv.streamExecute(ctx, target, sql, bindVariables, transactionID, 0, settings, options, callback)
}

// Release implements the QueryService interface
Expand Down Expand Up @@ -1423,8 +1465,8 @@ func (tsv *TabletServer) executeWithSettings(ctx context.Context, target *queryp
return tsv.execute(ctx, target, sql, bindVariables, transactionID, 0, settings, options)
}

func (tsv *TabletServer) beginExecuteWithSettings(ctx context.Context, target *querypb.Target, settings []string, savepointQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) {
txState, err := tsv.begin(ctx, target, savepointQueries, 0, settings, options)
func (tsv *TabletServer) beginExecuteWithSettings(ctx context.Context, target *querypb.Target, settings []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) {
txState, err := tsv.begin(ctx, target, postBeginQueries, 0, settings, options)
if err != nil {
return txToReserveState(txState), nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (te *TxEngine) isTxPoolAvailable(addToWaitGroup func(int)) error {
// statement(s) used to execute the begin (if any).
//
// Subsequent statements can access the connection through the transaction id.
func (te *TxEngine) Begin(ctx context.Context, savepointQueries []string, reservedID int64, setting *smartconnpool.Setting, options *querypb.ExecuteOptions) (int64, string, string, error) {
func (te *TxEngine) Begin(ctx context.Context, reservedID int64, setting *smartconnpool.Setting, options *querypb.ExecuteOptions) (int64, string, string, error) {
span, ctx := trace.NewSpan(ctx, "TxEngine.Begin")
defer span.Finish()

Expand All @@ -285,7 +285,7 @@ func (te *TxEngine) Begin(ctx context.Context, savepointQueries []string, reserv
}

defer te.beginRequests.Done()
conn, beginSQL, sessionStateChanges, err := te.txPool.Begin(ctx, options, te.state == AcceptingReadOnly, reservedID, savepointQueries, setting)
conn, beginSQL, sessionStateChanges, err := te.txPool.Begin(ctx, options, te.state == AcceptingReadOnly, reservedID, setting)
if err != nil {
return 0, "", "", err
}
Expand Down Expand Up @@ -516,7 +516,7 @@ func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, re
// Update the state of the transaction in the redo log.
// Retryable Error: Update the message with error message.
// Non-retryable Error: Along with message, update the state as RedoStateFailed.
conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
Expand Down Expand Up @@ -608,7 +608,7 @@ func (te *TxEngine) stopTransactionWatcher() {
}

// ReserveBegin creates a reserved connection, and in it opens a transaction
func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string, savepointQueries []string) (int64, string, error) {
func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string) (int64, string, error) {
span, ctx := trace.NewSpan(ctx, "TxEngine.ReserveBegin")
defer span.Finish()
err := te.isTxPoolAvailable(te.beginRequests.Add)
Expand All @@ -622,7 +622,7 @@ func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOp
return 0, "", err
}
defer conn.UnlockUpdateTime()
_, sessionStateChanges, err := te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn, savepointQueries)
_, sessionStateChanges, err := te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn)
if err != nil {
conn.Close()
conn.Release(tx.ConnInitFail)
Expand Down Expand Up @@ -720,7 +720,7 @@ func (te *TxEngine) beginNewDbaConnection(ctx context.Context) (*StatefulConnect
env: te.env,
}

_, _, err = te.txPool.begin(ctx, nil, false, sc, nil)
_, _, err = te.txPool.begin(ctx, nil, false, sc)
return sc, err
}

Expand Down
Loading

0 comments on commit e8ada40

Please sign in to comment.