From e8ada4095133ed3d6eb3079b4e1f93c76eeb8682 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 7 Oct 2024 20:32:19 +0530 Subject: [PATCH] execute savepoints after the begin method is complete in the tabletserver Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/dt_executor.go | 2 +- go/vt/vttablet/tabletserver/query_executor.go | 4 +- go/vt/vttablet/tabletserver/tabletserver.go | 106 ++++++++++++------ go/vt/vttablet/tabletserver/tx_engine.go | 12 +- go/vt/vttablet/tabletserver/tx_engine_test.go | 28 ++--- go/vt/vttablet/tabletserver/tx_pool.go | 19 +--- go/vt/vttablet/tabletserver/tx_pool_test.go | 67 +++++------ 7 files changed, 128 insertions(+), 110 deletions(-) diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 966c43a64bf..823751df638 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -313,7 +313,7 @@ func (dte *DTExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, pre } func (dte *DTExecutor) inTransaction(f func(*StatefulConnection) error) error { - conn, _, _, err := dte.te.txPool.Begin(dte.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := dte.te.txPool.Begin(dte.ctx, &querypb.ExecuteOptions{}, false, 0, nil) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 556ad5c09ae..d06953b3241 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -235,7 +235,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt return nil, errTxThrottled } - conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting) + conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, qre.setting) if err != nil { return nil, err @@ -249,7 +249,7 @@ func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*s if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) { return nil, errTxThrottled } - conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting) + conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, qre.setting) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 5ad7166d1e7..ad65f61cbfc 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -513,7 +513,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti func (tsv *TabletServer) begin( ctx context.Context, target *querypb.Target, - savepointQueries []string, + postBeginQueries []string, reservedID int64, settings []string, options *querypb.ExecuteOptions, @@ -535,12 +535,43 @@ func (tsv *TabletServer) begin( return err } } - transactionID, beginSQL, sessionStateChanges, err := tsv.te.Begin(ctx, nil, reservedID, connSetting, options) + transactionID, beginSQL, sessionStateChanges, err := tsv.te.Begin(ctx, reservedID, connSetting, options) state.TransactionID = transactionID state.SessionStateChanges = sessionStateChanges logStats.TransactionID = transactionID logStats.ReservedID = reservedID + if err != nil { + return err + } + + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + for _, query := range postBeginQueries { + plan, err := tsv.qe.GetPlan(ctx, logStats, query, true) + if err != nil { + return err + } + + qre := &QueryExecutor{ + ctx: ctx, + query: query, + connID: transactionID, + options: options, + plan: plan, + logStats: logStats, + tsv: tsv, + targetTabletType: targetType, + setting: connSetting, + } + _, err = qre.Execute() + if err != nil { + return err + } + } + // Record the actual statements that were executed in the logStats. // If nothing was actually executed, don't count the operation in // the tablet metrics, and clear out the logStats Method so that @@ -559,18 +590,6 @@ func (tsv *TabletServer) begin( return err }, ) - - if err != nil { - return - } - - for _, savepointQuery := range savepointQueries { - _, err = tsv.execute(ctx, target, savepointQuery, nil, state.TransactionID, reservedID, settings, options) - if err != nil { - return - } - } - return } @@ -997,7 +1016,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ } // BeginExecute combines Begin and Execute. -func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (queryservice.TransactionState, *sqltypes.Result, error) { +func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Target, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (queryservice.TransactionState, *sqltypes.Result, error) { // Disable hot row protection in case of reserve connection. if tsv.enableHotRowProtection && reservedID == 0 { @@ -1010,7 +1029,7 @@ func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Targe } } - state, err := tsv.begin(ctx, target, preQueries, reservedID, nil, options) + state, err := tsv.begin(ctx, target, postBeginQueries, reservedID, nil, options) if err != nil { return state, nil, err } @@ -1023,14 +1042,14 @@ func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Targe func (tsv *TabletServer) BeginStreamExecute( ctx context.Context, target *querypb.Target, - preQueries []string, + postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error, ) (queryservice.TransactionState, error) { - state, err := tsv.begin(ctx, target, preQueries, reservedID, nil, options) + state, err := tsv.begin(ctx, target, postBeginQueries, reservedID, nil, options) if err != nil { return state, err } @@ -1256,8 +1275,8 @@ func (tsv *TabletServer) VStreamResults(ctx context.Context, target *querypb.Tar } // ReserveBeginExecute implements the QueryService interface -func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (state queryservice.ReservedTransactionState, result *sqltypes.Result, err error) { - state, result, err = tsv.beginExecuteWithSettings(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options) +func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *querypb.Target, settings []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (state queryservice.ReservedTransactionState, result *sqltypes.Result, err error) { + state, result, err = tsv.beginExecuteWithSettings(ctx, target, settings, postBeginQueries, sql, bindVariables, options) // If there is an error and the error message is about allowing query in reserved connection only, // then we do not return an error from here and continue to use the reserved connection path. // This is specially for get_lock function call from vtgate that needs a reserved connection. @@ -1285,12 +1304,35 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp return err } defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) - connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) + connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, settings) + logStats.TransactionID = connID + logStats.ReservedID = connID if err != nil { return err } - logStats.TransactionID = connID - logStats.ReservedID = connID + + for _, query := range postBeginQueries { + plan, err := tsv.qe.GetPlan(ctx, logStats, query, true) + if err != nil { + return err + } + + qre := &QueryExecutor{ + ctx: ctx, + query: query, + connID: connID, + options: options, + plan: plan, + logStats: logStats, + tsv: tsv, + targetTabletType: targetType, + } + _, err = qre.Execute() + if err != nil { + return err + } + } + return nil }, ) @@ -1311,13 +1353,13 @@ func (tsv *TabletServer) ReserveBeginStreamExecute( ctx context.Context, target *querypb.Target, settings []string, - savepointQueries []string, + postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error, ) (state queryservice.ReservedTransactionState, err error) { - txState, err := tsv.begin(ctx, target, savepointQueries, 0, settings, options) + txState, err := tsv.begin(ctx, target, postBeginQueries, 0, settings, options) if err != nil { return txToReserveState(txState), err } @@ -1327,9 +1369,9 @@ func (tsv *TabletServer) ReserveBeginStreamExecute( } // ReserveExecute implements the QueryService interface -func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (state queryservice.ReservedState, result *sqltypes.Result, err error) { +func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Target, settings []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (state queryservice.ReservedState, result *sqltypes.Result, err error) { - result, err = tsv.executeWithSettings(ctx, target, preQueries, sql, bindVariables, transactionID, options) + result, err = tsv.executeWithSettings(ctx, target, settings, sql, bindVariables, transactionID, options) // If there is an error and the error message is about allowing query in reserved connection only, // then we do not return an error from here and continue to use the reserved connection path. // This is specially for get_lock function call from vtgate that needs a reserved connection. @@ -1354,7 +1396,7 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar return err } defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) - state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) + state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, settings) if err != nil { return err } @@ -1376,14 +1418,14 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar func (tsv *TabletServer) ReserveStreamExecute( ctx context.Context, target *querypb.Target, - preQueries []string, + settings []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error, ) (state queryservice.ReservedState, err error) { - return state, tsv.streamExecute(ctx, target, sql, bindVariables, transactionID, 0, preQueries, options, callback) + return state, tsv.streamExecute(ctx, target, sql, bindVariables, transactionID, 0, settings, options, callback) } // Release implements the QueryService interface @@ -1423,8 +1465,8 @@ func (tsv *TabletServer) executeWithSettings(ctx context.Context, target *queryp return tsv.execute(ctx, target, sql, bindVariables, transactionID, 0, settings, options) } -func (tsv *TabletServer) beginExecuteWithSettings(ctx context.Context, target *querypb.Target, settings []string, savepointQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) { - txState, err := tsv.begin(ctx, target, savepointQueries, 0, settings, options) +func (tsv *TabletServer) beginExecuteWithSettings(ctx context.Context, target *querypb.Target, settings []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) { + txState, err := tsv.begin(ctx, target, postBeginQueries, 0, settings, options) if err != nil { return txToReserveState(txState), nil, err } diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 42bec29dfa3..d581fb79ae4 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -270,7 +270,7 @@ func (te *TxEngine) isTxPoolAvailable(addToWaitGroup func(int)) error { // statement(s) used to execute the begin (if any). // // Subsequent statements can access the connection through the transaction id. -func (te *TxEngine) Begin(ctx context.Context, savepointQueries []string, reservedID int64, setting *smartconnpool.Setting, options *querypb.ExecuteOptions) (int64, string, string, error) { +func (te *TxEngine) Begin(ctx context.Context, reservedID int64, setting *smartconnpool.Setting, options *querypb.ExecuteOptions) (int64, string, string, error) { span, ctx := trace.NewSpan(ctx, "TxEngine.Begin") defer span.Finish() @@ -285,7 +285,7 @@ func (te *TxEngine) Begin(ctx context.Context, savepointQueries []string, reserv } defer te.beginRequests.Done() - conn, beginSQL, sessionStateChanges, err := te.txPool.Begin(ctx, options, te.state == AcceptingReadOnly, reservedID, savepointQueries, setting) + conn, beginSQL, sessionStateChanges, err := te.txPool.Begin(ctx, options, te.state == AcceptingReadOnly, reservedID, setting) if err != nil { return 0, "", "", err } @@ -516,7 +516,7 @@ func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, re // Update the state of the transaction in the redo log. // Retryable Error: Update the message with error message. // Non-retryable Error: Along with message, update the state as RedoStateFailed. - conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) if err != nil { log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) return @@ -608,7 +608,7 @@ func (te *TxEngine) stopTransactionWatcher() { } // ReserveBegin creates a reserved connection, and in it opens a transaction -func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string, savepointQueries []string) (int64, string, error) { +func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string) (int64, string, error) { span, ctx := trace.NewSpan(ctx, "TxEngine.ReserveBegin") defer span.Finish() err := te.isTxPoolAvailable(te.beginRequests.Add) @@ -622,7 +622,7 @@ func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOp return 0, "", err } defer conn.UnlockUpdateTime() - _, sessionStateChanges, err := te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn, savepointQueries) + _, sessionStateChanges, err := te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn) if err != nil { conn.Close() conn.Release(tx.ConnInitFail) @@ -720,7 +720,7 @@ func (te *TxEngine) beginNewDbaConnection(ctx context.Context) (*StatefulConnect env: te.env, } - _, _, err = te.txPool.begin(ctx, nil, false, sc, nil) + _, _, err = te.txPool.begin(ctx, nil, false, sc) return sc, err } diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index a9958525587..be2531f1a41 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -62,11 +62,11 @@ func TestTxEngineClose(t *testing.T) { // Normal close with timeout wait. te.AcceptReadWrite() - c, beginSQL, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + c, beginSQL, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) require.Equal(t, "begin", beginSQL) c.Unlock() - c, beginSQL, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + c, beginSQL, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) require.Equal(t, "begin", beginSQL) c.Unlock() @@ -78,7 +78,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close. te.AcceptReadOnly() - c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) if err != nil { t.Fatal(err) } @@ -90,7 +90,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period. te.shutdownGracePeriod = 25 * time.Millisecond te.AcceptReadWrite() - c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) c.Unlock() start = time.Now() @@ -101,7 +101,7 @@ func TestTxEngineClose(t *testing.T) { // Normal close with short grace period, but pool gets empty early. te.shutdownGracePeriod = 25 * time.Millisecond te.AcceptReadWrite() - c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) c.Unlock() go func() { @@ -117,7 +117,7 @@ func TestTxEngineClose(t *testing.T) { // Immediate close, but connection is in use. te.AcceptReadOnly() - c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) go func() { time.Sleep(100 * time.Millisecond) @@ -138,7 +138,7 @@ func TestTxEngineClose(t *testing.T) { te.AcceptReadWrite() _, err = te.Reserve(ctx, &querypb.ExecuteOptions{}, 0, nil) require.NoError(t, err) - _, _, err = te.ReserveBegin(ctx, &querypb.ExecuteOptions{}, nil, nil) + _, _, err = te.ReserveBegin(ctx, &querypb.ExecuteOptions{}, nil) require.NoError(t, err) start = time.Now() te.Close() @@ -159,11 +159,11 @@ func TestTxEngineBegin(t *testing.T) { for _, exec := range []func() (int64, string, error){ func() (int64, string, error) { - tx, _, schemaStateChanges, err := te.Begin(ctx, nil, 0, nil, &querypb.ExecuteOptions{}) + tx, _, schemaStateChanges, err := te.Begin(ctx, 0, nil, &querypb.ExecuteOptions{}) return tx, schemaStateChanges, err }, func() (int64, string, error) { - return te.ReserveBegin(ctx, &querypb.ExecuteOptions{}, nil, nil) + return te.ReserveBegin(ctx, &querypb.ExecuteOptions{}, nil) }, } { te.AcceptReadOnly() @@ -204,7 +204,7 @@ func TestTxEngineRenewFails(t *testing.T) { te := NewTxEngine(tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest"), nil) te.AcceptReadOnly() options := &querypb.ExecuteOptions{} - connID, _, err := te.ReserveBegin(ctx, options, nil, nil) + connID, _, err := te.ReserveBegin(ctx, options, nil) require.NoError(t, err) conn, err := te.txPool.GetAndLock(connID, "for test") @@ -559,7 +559,7 @@ func startTx(te *TxEngine, writeTransaction bool) error { } else { options.TransactionIsolation = querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY } - _, _, _, err := te.Begin(context.Background(), nil, 0, nil, options) + _, _, _, err := te.Begin(context.Background(), 0, nil, options) return err } @@ -577,7 +577,7 @@ func TestTxEngineFailReserve(t *testing.T) { _, err := te.Reserve(ctx, options, 0, nil) assert.EqualError(t, err, "tx engine can't accept new connections in state NotServing") - _, _, err = te.ReserveBegin(ctx, options, nil, nil) + _, _, err = te.ReserveBegin(ctx, options, nil) assert.EqualError(t, err, "tx engine can't accept new connections in state NotServing") te.AcceptReadOnly() @@ -586,14 +586,14 @@ func TestTxEngineFailReserve(t *testing.T) { _, err = te.Reserve(ctx, options, 0, []string{"dummy_query"}) assert.EqualError(t, err, "unknown error: failed executing dummy_query (errno 1105) (sqlstate HY000) during query: dummy_query") - _, _, err = te.ReserveBegin(ctx, options, []string{"dummy_query"}, nil) + _, _, err = te.ReserveBegin(ctx, options, []string{"dummy_query"}) assert.EqualError(t, err, "unknown error: failed executing dummy_query (errno 1105) (sqlstate HY000) during query: dummy_query") nonExistingID := int64(42) _, err = te.Reserve(ctx, options, nonExistingID, nil) assert.EqualError(t, err, "transaction 42: not found (potential transaction timeout)") - txID, _, _, err := te.Begin(ctx, nil, 0, nil, options) + txID, _, _, err := te.Begin(ctx, 0, nil, options) require.NoError(t, err) conn, err := te.txPool.GetAndLock(txID, "for test") require.NoError(t, err) diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index 1adf9088d2c..6d1f1dec3c2 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -230,7 +230,7 @@ func (tp *TxPool) Rollback(ctx context.Context, txConn *StatefulConnection) erro // the statements (if any) executed to initiate the transaction. In autocommit // mode the statement will be "". // The connection returned is locked for the callee and its responsibility is to unlock the connection. -func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, reservedID int64, savepointQueries []string, setting *smartconnpool.Setting) (*StatefulConnection, string, string, error) { +func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, reservedID int64, setting *smartconnpool.Setting) (*StatefulConnection, string, string, error) { span, ctx := trace.NewSpan(ctx, "TxPool.Begin") defer span.Finish() @@ -262,7 +262,7 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re if err != nil { return nil, "", "", err } - sql, sessionStateChanges, err := tp.begin(ctx, options, readOnly, conn, savepointQueries) + sql, sessionStateChanges, err := tp.begin(ctx, options, readOnly, conn) if err != nil { conn.Close() conn.Release(tx.ConnInitFail) @@ -271,24 +271,14 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re return conn, sql, sessionStateChanges, nil } -func (tp *TxPool) begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, conn *StatefulConnection, savepointQueries []string) (string, string, error) { +func (tp *TxPool) begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, conn *StatefulConnection) (string, string, error) { immediateCaller := callerid.ImmediateCallerIDFromContext(ctx) effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx) - beginQueries, autocommit, sessionStateChanges, err := createTransaction(ctx, options, conn, readOnly, savepointQueries) + beginQueries, autocommit, sessionStateChanges, err := createTransaction(ctx, options, conn, readOnly) if err != nil { return "", "", err } - conn.txProps = tp.NewTxProps(immediateCaller, effectiveCaller, autocommit) - // for _, savepoint := range savepointQueries { - // if _, err = conn.Exec(ctx, savepoint, 1, false); err != nil { - // return "", "", err - // } - // // Record the query detail for the savepoint. - // conn.txProps.RecordQueryDetail(savepoint, nil) - // beginQueries += ";" + savepoint - // } - return beginQueries, sessionStateChanges, nil } @@ -314,7 +304,6 @@ func createTransaction( options *querypb.ExecuteOptions, conn *StatefulConnection, readOnly bool, - savepointQueries []string, ) (beginQueries string, autocommitTransaction bool, sessionStateChanges string, err error) { switch options.GetTransactionIsolation() { case querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY: diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index e80f1edb17f..c03cac92878 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -48,7 +48,7 @@ func TestTxPoolExecuteCommit(t *testing.T) { sql := "select 'this is a query'" // begin a transaction and then return the connection - conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) id := conn.ReservedID() @@ -83,7 +83,7 @@ func TestTxPoolExecuteRollback(t *testing.T) { db, txPool, _, closer := setup(t) defer closer() - conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) defer conn.Release(tx.TxRollback) @@ -104,7 +104,7 @@ func TestTxPoolExecuteRollbackOnClosedConn(t *testing.T) { db, txPool, _, closer := setup(t) defer closer() - conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) defer conn.Release(tx.TxRollback) @@ -125,9 +125,9 @@ func TestTxPoolRollbackNonBusy(t *testing.T) { defer closer() // start two transactions, and mark one of them as unused - conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) - conn2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) conn2.Unlock() // this marks conn2 as NonBusy @@ -154,7 +154,7 @@ func TestTxPoolTransactionIsolation(t *testing.T) { db, txPool, _, closer := setup(t) defer closer() - c2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}, false, 0, nil, nil) + c2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}, false, 0, nil) require.NoError(t, err) c2.Release(tx.TxClose) @@ -172,7 +172,7 @@ func TestTxPoolAutocommit(t *testing.T) { // to mysql. // This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal // because is not in the list of expected queries (i.e db.AddQuery hasn't been called). - conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}, false, 0, nil, nil) + conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}, false, 0, nil) require.NoError(t, err) // run a query to see it in the query log @@ -204,7 +204,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Transient(t *testing.T) { err := db.WaitForClose(2 * time.Second) require.NoError(t, err) - txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err, "Begin should have succeeded after the retry in DBConn.Exec()") txConn.Release(tx.TxCommit) } @@ -225,7 +225,7 @@ func primeTxPoolWithConnection(t *testing.T, ctx context.Context) (*fakesqldb.DB // reused by subsequent transactions. db.AddQuery("begin", &sqltypes.Result{}) db.AddQuery("rollback", &sqltypes.Result{}) - txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) txConn.Release(tx.TxCommit) @@ -248,7 +248,7 @@ func TestTxPoolBeginWithError(t *testing.T) { } ctxWithCallerID := callerid.NewContext(ctx, ef, im) - _, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil, nil) + _, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil) require.Error(t, err) require.Contains(t, err.Error(), "error: rejected") require.Equal(t, vtrpcpb.Code_UNKNOWN, vterrors.Code(err), "wrong error code for Begin error") @@ -270,19 +270,6 @@ func TestTxPoolBeginWithError(t *testing.T) { }, limiter.Actions()) } -func TestTxPoolBeginWithPreQueryError(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - db, txPool, _, closer := setup(t) - defer closer() - db.AddRejectedQuery("pre_query", errRejected) - _, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, []string{"pre_query"}, nil) - require.Error(t, err) - require.Contains(t, err.Error(), "error: rejected") - require.Equal(t, vtrpcpb.Code_UNKNOWN, vterrors.Code(err), "wrong error code for Begin error") -} - func TestTxPoolCancelledContextError(t *testing.T) { // given db, txPool, _, closer := setup(t) @@ -291,7 +278,7 @@ func TestTxPoolCancelledContextError(t *testing.T) { cancel() // when - _, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + _, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) // then require.Error(t, err) @@ -312,12 +299,12 @@ func TestTxPoolWaitTimeoutError(t *testing.T) { defer closer() // lock the only connection in the pool. - conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) defer conn.Unlock() // try locking one more connection. - _, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + _, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) // then require.Error(t, err) @@ -337,7 +324,7 @@ func TestTxPoolRollbackFailIsPassedThrough(t *testing.T) { defer closer() db.AddRejectedQuery("rollback", errRejected) - conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) _, err = conn1.Exec(ctx, sql, 1, true) @@ -357,7 +344,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { db, txPool, _, _ := setup(t) defer db.Close() - conn1, _, _, _ := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn1, _, _, _ := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) id := conn1.ReservedID() conn1.Unlock() txPool.Close() @@ -380,7 +367,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { params := dbconfigs.New(db.ConnParams()) txPool.Open(params, params, params) - conn1, _, _, _ = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn1, _, _, _ = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) id = conn1.ReservedID() _, err := txPool.Commit(ctx, conn1) require.NoError(t, err) @@ -396,7 +383,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) { txPool.Open(params, params, params) defer txPool.Close() - conn1, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn1, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err, "unable to start transaction: %v", err) conn1.Unlock() id = conn1.ReservedID() @@ -412,7 +399,7 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) { startingStray := txPool.env.Stats().InternalErrors.Counts()["StrayTransactions"] // Start stray transaction. - conn, _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) conn.Unlock() @@ -443,7 +430,7 @@ func TestTxTimeoutKillsTransactions(t *testing.T) { ctxWithCallerID := callerid.NewContext(ctx, ef, im) // Start transaction. - conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) conn.Unlock() @@ -491,7 +478,7 @@ func TestTxTimeoutDoesNotKillShortLivedTransactions(t *testing.T) { ctxWithCallerID := callerid.NewContext(ctx, ef, im) // Start transaction. - conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) conn.Unlock() @@ -526,7 +513,7 @@ func TestTxTimeoutKillsOlapTransactions(t *testing.T) { // Start transaction. conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{ Workload: querypb.ExecuteOptions_OLAP, - }, false, 0, nil, nil) + }, false, 0, nil) require.NoError(t, err) conn.Unlock() @@ -561,11 +548,11 @@ func TestTxTimeoutNotEnforcedForZeroLengthTimeouts(t *testing.T) { ctxWithCallerID := callerid.NewContext(ctx, ef, im) // Start transactions. - conn0, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil, nil) + conn0, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil) require.NoError(t, err) conn1, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{ Workload: querypb.ExecuteOptions_OLAP, - }, false, 0, nil, nil) + }, false, 0, nil) require.NoError(t, err) conn0.Unlock() conn1.Unlock() @@ -606,7 +593,7 @@ func TestTxTimeoutReservedConn(t *testing.T) { // Start OLAP transaction and return it to pool right away. conn0, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{ Workload: querypb.ExecuteOptions_OLAP, - }, false, 0, nil, nil) + }, false, 0, nil) require.NoError(t, err) // Taint the connection. conn0.Taint(ctxWithCallerID, nil) @@ -648,14 +635,14 @@ func TestTxTimeoutReusedReservedConn(t *testing.T) { // Start OLAP transaction and return it to pool right away. conn0, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{ Workload: querypb.ExecuteOptions_OLAP, - }, false, 0, nil, nil) + }, false, 0, nil) require.NoError(t, err) // Taint the connection. conn0.Taint(ctxWithCallerID, nil) conn0.Unlock() // Reuse underlying connection in an OLTP transaction. - conn1, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, conn0.ReservedID(), nil, nil) + conn1, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, conn0.ReservedID(), nil) require.NoError(t, err) require.Equal(t, conn1.ReservedID(), conn0.ReservedID()) conn1.Unlock() @@ -786,7 +773,7 @@ func TestTxPoolBeginStatements(t *testing.T) { TransactionIsolation: tc.txIsolationLevel, TransactionAccessMode: tc.txAccessModes, } - conn, beginSQL, _, err := txPool.Begin(ctx, options, tc.readOnly, 0, nil, nil) + conn, beginSQL, _, err := txPool.Begin(ctx, options, tc.readOnly, 0, nil) if tc.expErr != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expErr)