From d76d521b29fbf7bdff398ece764baa793b0e73cc Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 20 Dec 2024 16:36:31 +0530 Subject: [PATCH 1/6] feat: make two pc mode dynamic Signed-off-by: Manan Gupta --- go/vt/vtexplain/vtexplain_vtgate.go | 6 ++- go/vt/vtgate/dynamicconfig/config.go | 22 +++++++++ go/vt/vtgate/executor.go | 14 +++--- go/vt/vtgate/executor_framework_test.go | 8 ++-- go/vt/vtgate/executor_select_test.go | 4 +- go/vt/vtgate/executor_stream_test.go | 2 +- go/vt/vtgate/legacy_scatter_conn_test.go | 8 ++-- go/vt/vtgate/scatter_conn.go | 4 +- go/vt/vtgate/staticconfig.go | 37 +++++++++++++++ go/vt/vtgate/tx_conn.go | 9 ++-- go/vt/vtgate/tx_conn_test.go | 24 +++++----- go/vt/vtgate/viperconfig.go | 42 +++++++++++++---- go/vt/vtgate/vtgate.go | 59 ++++++++++++++++-------- 13 files changed, 174 insertions(+), 65 deletions(-) create mode 100644 go/vt/vtgate/staticconfig.go diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index f9ae8be3820..d45073cd006 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -74,7 +74,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests queryLogBufferSize := 10 plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) - vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0) + vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0, vtgate.NewDynamicViperConfig()) vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) return nil @@ -88,7 +88,9 @@ func (vte *VTExplain) newFakeResolver(ctx context.Context, opts *Options, serv s if opts.ExecutionMode == ModeTwoPC { txMode = vtgatepb.TransactionMode_TWOPC } - tc := vtgate.NewTxConn(gw, txMode) + tc := vtgate.NewTxConn(gw, &vtgate.StaticConfig{ + TxMode: txMode, + }) sc := vtgate.NewScatterConn("", tc, gw) srvResolver := srvtopo.NewResolver(serv, gw, cell) return vtgate.NewResolver(srvResolver, serv, cell, sc) diff --git a/go/vt/vtgate/dynamicconfig/config.go b/go/vt/vtgate/dynamicconfig/config.go index 5bb1d991eae..014160029cd 100644 --- a/go/vt/vtgate/dynamicconfig/config.go +++ b/go/vt/vtgate/dynamicconfig/config.go @@ -1,6 +1,28 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package dynamicconfig +import vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + type DDL interface { OnlineEnabled() bool DirectEnabled() bool } + +type TxMode interface { + TransactionMode() vtgatepb.TransactionMode +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 0bb47361f55..c85c651415c 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/pflag" vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/vtgate/dynamicconfig" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/cache/theine" @@ -136,7 +137,8 @@ type Executor struct { warmingReadsPercent int warmingReadsChannel chan bool - vConfig econtext.VCursorConfig + vConfig econtext.VCursorConfig + ddlConfig dynamicconfig.DDL } var executorOnce sync.Once @@ -168,6 +170,7 @@ func NewExecutor( noScatter bool, pv plancontext.PlannerVersion, warmingReadsPercent int, + ddlConfig dynamicconfig.DDL, ) *Executor { e := &Executor{ env: env, @@ -183,6 +186,7 @@ func NewExecutor( plans: plans, warmingReadsPercent: warmingReadsPercent, warmingReadsChannel: make(chan bool, warmingReadsConcurrency), + ddlConfig: ddlConfig, } // setting the vcursor config. e.initVConfig(warnOnShardedOnly, pv) @@ -491,7 +495,7 @@ func (e *Executor) addNeededBindVars(vcursor *econtext.VCursorImpl, bindVarNeeds case sysvars.TransactionMode.Name: txMode := session.TransactionMode if txMode == vtgatepb.TransactionMode_UNSPECIFIED { - txMode = getTxMode() + txMode = transactionMode.Get() } bindVars[key] = sqltypes.StringBindVariable(txMode.String()) case sysvars.Workload.Name: @@ -1163,11 +1167,7 @@ func (e *Executor) buildStatement( reservedVars *sqlparser.ReservedVars, bindVarNeeds *sqlparser.BindVarNeeds, ) (*engine.Plan, error) { - cfg := &dynamicViperConfig{ - onlineDDL: enableOnlineDDL, - directDDL: enableDirectDDL, - } - plan, err := planbuilder.BuildFromStmt(ctx, query, stmt, reservedVars, vcursor, bindVarNeeds, cfg) + plan, err := planbuilder.BuildFromStmt(ctx, query, stmt, reservedVars, vcursor, bindVarNeeds, e.ddlConfig) if err != nil { return nil, err } diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 2ee3425209f..43987217039 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -183,7 +183,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta // one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness. plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} @@ -232,7 +232,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex plans := DefaultPlanCache() env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion}) require.NoError(t, err) - executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -269,7 +269,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -294,7 +294,7 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context, replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 86aafaefba4..c1e80440ebe 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1644,7 +1644,7 @@ func TestSelectListArg(t *testing.T) { func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) ex.SetQueryLogger(queryLogger) return ex } @@ -3326,7 +3326,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index a8500dd59c4..8bb10aae8fb 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -68,7 +68,7 @@ func TestStreamSQLSharded(t *testing.T) { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index 0d49e7b7bd9..00f38a97f9f 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -522,7 +522,7 @@ func TestScatterConnSingleDB(t *testing.T) { assert.Contains(t, errors[0].Error(), want) // TransactionMode_SINGLE in txconn - sc.txConn.mode = vtgatepb.TransactionMode_SINGLE + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_SINGLE} session = econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true}) _, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errors) @@ -531,7 +531,7 @@ func TestScatterConnSingleDB(t *testing.T) { assert.Contains(t, errors[0].Error(), want) // TransactionMode_MULTI in txconn. Should not fail. - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} session = econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true}) _, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errors) @@ -622,6 +622,8 @@ func newTestScatterConn(ctx context.Context, hc discovery.HealthCheck, serv srvt // in '-cells_to_watch' command line parameter, which is // empty by default. So it's unused in this test, set to nil. gw := NewTabletGateway(ctx, hc, serv, cell) - tc := NewTxConn(gw, vtgatepb.TransactionMode_MULTI) + tc := NewTxConn(gw, &StaticConfig{ + TxMode: vtgatepb.TransactionMode_MULTI, + }) return NewScatterConn("", tc, gw) } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 6e2cf9ad8ba..7231d2d9c43 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -666,7 +666,7 @@ func (stc *ScatterConn) multiGoTransaction( startTime, statsKey := stc.startAction(name, rs.Target) defer stc.endAction(startTime, allErrors, statsKey, &err, session) - info, shardSession, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.mode) + info, shardSession, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.txMode.TransactionMode()) if err != nil { return } @@ -683,7 +683,7 @@ func (stc *ScatterConn) multiGoTransaction( shardSession.RowsAffected = info.rowsAffected } if info.actionNeeded != nothing && (info.transactionID != 0 || info.reservedID != 0) { - appendErr := session.AppendOrUpdate(rs.Target, info, shardSession, stc.txConn.mode) + appendErr := session.AppendOrUpdate(rs.Target, info, shardSession, stc.txConn.txMode.TransactionMode()) if appendErr != nil { err = appendErr } diff --git a/go/vt/vtgate/staticconfig.go b/go/vt/vtgate/staticconfig.go new file mode 100644 index 00000000000..91ea20a2f36 --- /dev/null +++ b/go/vt/vtgate/staticconfig.go @@ -0,0 +1,37 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + +type StaticConfig struct { + OnlineDDLEnabled bool + DirectDDLEnabled bool + TxMode vtgatepb.TransactionMode +} + +func (s *StaticConfig) OnlineEnabled() bool { + return s.OnlineDDLEnabled +} + +func (s *StaticConfig) DirectEnabled() bool { + return s.DirectDDLEnabled +} + +func (s *StaticConfig) TransactionMode() vtgatepb.TransactionMode { + return s.TxMode +} diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index cadb1392eca..dbd76b04c7a 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -33,6 +33,7 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/dynamicconfig" econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" "vitess.io/vitess/go/vt/vttablet/queryservice" ) @@ -44,14 +45,14 @@ const nonAtomicCommitWarnMaxShards = 16 // TxConn is used for executing transactional requests. type TxConn struct { tabletGateway *TabletGateway - mode vtgatepb.TransactionMode + txMode dynamicconfig.TxMode } // NewTxConn builds a new TxConn. -func NewTxConn(gw *TabletGateway, txMode vtgatepb.TransactionMode) *TxConn { +func NewTxConn(gw *TabletGateway, txMode dynamicconfig.TxMode) *TxConn { return &TxConn{ tabletGateway: gw, - mode: txMode, + txMode: txMode, } } @@ -114,7 +115,7 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er case vtgatepb.TransactionMode_TWOPC: twopc = true case vtgatepb.TransactionMode_UNSPECIFIED: - twopc = txc.mode == vtgatepb.TransactionMode_TWOPC + twopc = txc.txMode.TransactionMode() == vtgatepb.TransactionMode_TWOPC } defer recordCommitTime(session, twopc, time.Now()) diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index 333094569c8..a0322e09591 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -72,7 +72,7 @@ func TestTxConnCommitFailure(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbcs, rssm, rssa := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 3) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"] // Sequence the executes to ensure commit order @@ -173,7 +173,7 @@ func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 18) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"] // Sequence the executes to ensure commit order @@ -227,7 +227,7 @@ func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 17) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"] // Sequence the executes to ensure commit order @@ -281,7 +281,7 @@ func TestTxConnCommitSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} // Sequence the executes to ensure commit order session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true}) @@ -334,7 +334,7 @@ func TestTxConnReservedCommitSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} // Sequence the executes to ensure commit order session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true, InReservedConn: true}) @@ -419,7 +419,7 @@ func TestTxConnReservedOn2ShardTxOn1ShardAndCommit(t *testing.T) { keyspace := "TestTxConn" sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, keyspace) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} // Sequence the executes to ensure shard session order session := econtext.NewSafeSession(&vtgatepb.Session{InReservedConn: true}) @@ -514,7 +514,7 @@ func TestTxConnReservedOn2ShardTxOn1ShardAndRollback(t *testing.T) { keyspace := "TestTxConn" sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, keyspace) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} // Sequence the executes to ensure shard session order session := econtext.NewSafeSession(&vtgatepb.Session{InReservedConn: true}) @@ -608,7 +608,7 @@ func TestTxConnCommitOrderFailure1(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{Sql: "query1"}} @@ -641,7 +641,7 @@ func TestTxConnCommitOrderFailure2(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{ Sql: "query1", @@ -675,7 +675,7 @@ func TestTxConnCommitOrderFailure3(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{ Sql: "query1", @@ -717,7 +717,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{ Sql: "query1", @@ -815,7 +815,7 @@ func TestTxConnReservedCommitOrderSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{ Sql: "query1", diff --git a/go/vt/vtgate/viperconfig.go b/go/vt/vtgate/viperconfig.go index ec77ff62d4f..c306a839be8 100644 --- a/go/vt/vtgate/viperconfig.go +++ b/go/vt/vtgate/viperconfig.go @@ -1,16 +1,42 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package vtgate -import "vitess.io/vitess/go/viperutil" +import ( + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" +) + +// DynamicViperConfig is a dynamic config that uses viper. +type DynamicViperConfig struct { +} + +// NewDynamicViperConfig creates a new dynamic viper config +func NewDynamicViperConfig() *DynamicViperConfig { + return &DynamicViperConfig{} +} -type dynamicViperConfig struct { - onlineDDL viperutil.Value[bool] - directDDL viperutil.Value[bool] +func (d *DynamicViperConfig) OnlineEnabled() bool { + return enableOnlineDDL.Get() } -func (d *dynamicViperConfig) OnlineEnabled() bool { - return d.onlineDDL.Get() +func (d *DynamicViperConfig) DirectEnabled() bool { + return enableDirectDDL.Get() } -func (d *dynamicViperConfig) DirectEnabled() bool { - return d.directDDL.Get() +func (d *DynamicViperConfig) TransactionMode() vtgatepb.TransactionMode { + return transactionMode.Get() } diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 8bab05479dd..ee9ff947333 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -29,6 +29,7 @@ import ( "time" "github.com/spf13/pflag" + "github.com/spf13/viper" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/sqltypes" @@ -60,7 +61,6 @@ import ( ) var ( - transactionMode = "MULTI" normalizeQueries = true streamBufferSize = 32 * 1024 @@ -114,6 +114,33 @@ var ( }, ) + transactionMode = viperutil.Configure( + "transaction_mode", + viperutil.Options[vtgatepb.TransactionMode]{ + FlagName: "transaction_mode", + Default: vtgatepb.TransactionMode_MULTI, + Dynamic: true, + GetFunc: func(v *viper.Viper) func(key string) vtgatepb.TransactionMode { + return func(key string) vtgatepb.TransactionMode { + txMode := v.GetString(key) + switch strings.ToLower(txMode) { + case "single": + return vtgatepb.TransactionMode_SINGLE + case "multi": + return vtgatepb.TransactionMode_MULTI + case "twopc": + return vtgatepb.TransactionMode_TWOPC + default: + fmt.Printf("Invalid option: %v\n", txMode) + fmt.Println("Usage: -transaction_mode {SINGLE | MULTI | TWOPC}") + os.Exit(1) + return -1 + } + } + }, + }, + ) + // schema tracking flags enableSchemaChangeSignal = true enableViews bool @@ -138,7 +165,7 @@ var ( ) func registerFlags(fs *pflag.FlagSet) { - fs.StringVar(&transactionMode, "transaction_mode", transactionMode, "SINGLE: disallow multi-db transactions, MULTI: allow multi-db transactions with best effort commit, TWOPC: allow multi-db transactions with 2pc commit") + fs.String("transaction_mode", "MULTI", "SINGLE: disallow multi-db transactions, MULTI: allow multi-db transactions with best effort commit, TWOPC: allow multi-db transactions with 2pc commit") fs.BoolVar(&normalizeQueries, "normalize_queries", normalizeQueries, "Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars.") fs.BoolVar(&terseErrors, "vtgate-config-terse-errors", terseErrors, "prevent bind vars from escaping in returned errors") fs.IntVar(&truncateErrorLen, "truncate-error-len", truncateErrorLen, "truncate errors sent to client if they are longer than this value (0 means do not truncate)") @@ -173,7 +200,11 @@ func registerFlags(fs *pflag.FlagSet) { fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed") fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries") - viperutil.BindFlags(fs, enableOnlineDDL, enableDirectDDL) + viperutil.BindFlags(fs, + enableOnlineDDL, + enableDirectDDL, + transactionMode, + ) } func init() { @@ -182,22 +213,7 @@ func init() { } func getTxMode() vtgatepb.TransactionMode { - switch strings.ToLower(transactionMode) { - case "single": - log.Infof("Transaction mode: '%s'", transactionMode) - return vtgatepb.TransactionMode_SINGLE - case "multi": - log.Infof("Transaction mode: '%s'", transactionMode) - return vtgatepb.TransactionMode_MULTI - case "twopc": - log.Infof("Transaction mode: '%s'", transactionMode) - return vtgatepb.TransactionMode_TWOPC - default: - fmt.Printf("Invalid option: %v\n", transactionMode) - fmt.Println("Usage: -transaction_mode {SINGLE | MULTI | TWOPC}") - os.Exit(1) - return -1 - } + return transactionMode.Get() } var ( @@ -287,6 +303,8 @@ func Init( log.Fatalf("tabletGateway.WaitForTablets failed: %v", err) } + dynamicConfig := NewDynamicViperConfig() + // If we want to filter keyspaces replace the srvtopo.Server with a // filtering server if discovery.FilteringKeyspaces() { @@ -301,7 +319,7 @@ func Init( if _, err := schema.ParseDDLStrategy(defaultDDLStrategy); err != nil { log.Fatalf("Invalid value for -ddl_strategy: %v", err.Error()) } - tc := NewTxConn(gw, getTxMode()) + tc := NewTxConn(gw, dynamicConfig) // ScatterConn depends on TxConn to perform forced rollbacks. sc := NewScatterConn("VttabletCall", tc, gw) // TxResolver depends on TxConn to complete distributed transaction. @@ -352,6 +370,7 @@ func Init( noScatter, pv, warmingReadsPercent, + dynamicConfig, ) if err := executor.defaultQueryLogger(); err != nil { From 3efe5b572702bce1a7d8971b6c8a39abec6c9344 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 20 Dec 2024 19:48:22 +0530 Subject: [PATCH 2/6] test: specify transaction mode as a config parameter Signed-off-by: Manan Gupta --- go/test/endtoend/cluster/vtgate_process.go | 31 +++++++++++++++++++ .../endtoend/transaction/twopc/main_test.go | 5 ++- go/vt/vtgate/vtgate.go | 4 --- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index 1290156a1cd..3e8c2635873 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -57,6 +57,8 @@ type VtgateProcess struct { Directory string VerifyURL string VSchemaURL string + ConfigFile string + Config VTGateConfiguration SysVarSetEnabled bool PlannerVersion plancontext.PlannerVersion // Extra Args to be set before starting the vtgate process @@ -66,6 +68,20 @@ type VtgateProcess struct { exit chan error } +type VTGateConfiguration struct { + TransactionMode string `json:"transaction_mode,omitempty"` +} + +// ToJSONString will marshal this configuration as JSON +func (config *VTGateConfiguration) ToJSONString() string { + b, _ := json.MarshalIndent(config, "", "\t") + return string(b) +} + +func (vtgate *VtgateProcess) RewriteConfiguration() error { + return os.WriteFile(vtgate.ConfigFile, []byte(vtgate.Config.ToJSONString()), 0644) +} + const defaultVtGatePlannerVersion = planbuilder.Gen4 // Setup starts Vtgate process with required arguements @@ -74,6 +90,7 @@ func (vtgate *VtgateProcess) Setup() (err error) { "--topo_implementation", vtgate.CommonArg.TopoImplementation, "--topo_global_server_address", vtgate.CommonArg.TopoGlobalAddress, "--topo_global_root", vtgate.CommonArg.TopoGlobalRoot, + "--config-file", vtgate.ConfigFile, "--log_dir", vtgate.LogDir, "--log_queries_to_file", vtgate.FileToLogQueries, "--port", fmt.Sprintf("%d", vtgate.Port), @@ -98,6 +115,19 @@ func (vtgate *VtgateProcess) Setup() (err error) { break } } + configFile, err := os.Create(vtgate.ConfigFile) + if err != nil { + log.Errorf("cannot create config file for vtgate: %v", err) + return err + } + _, err = configFile.WriteString(vtgate.Config.ToJSONString()) + if err != nil { + return err + } + err = configFile.Close() + if err != nil { + return err + } if !msvflag { version, err := mysqlctl.GetVersionString() if err != nil { @@ -287,6 +317,7 @@ func VtgateProcessInstance( Name: "vtgate", Binary: "vtgate", FileToLogQueries: path.Join(tmpDirectory, "/vtgate_querylog.txt"), + ConfigFile: path.Join(tmpDirectory, fmt.Sprintf("vtgate-config-%d.json", port)), Directory: os.Getenv("VTDATAROOT"), ServiceMap: "grpc-tabletmanager,grpc-throttler,grpc-queryservice,grpc-updatestream,grpc-vtctl,grpc-vtgateservice", LogDir: tmpDirectory, diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 58fe45547a5..37337a62167 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -77,7 +77,6 @@ func TestMain(m *testing.M) { // Set extra args for twopc clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, - "--transaction_mode", "TWOPC", "--grpc_use_effective_callerid", ) clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, @@ -103,6 +102,10 @@ func TestMain(m *testing.M) { if err := clusterInstance.StartVtgate(); err != nil { return 1 } + clusterInstance.VtgateProcess.Config.TransactionMode = "TWOPC" + if err := clusterInstance.VtgateProcess.RewriteConfiguration(); err != nil { + return 1 + } vtParams = clusterInstance.GetVTParams(keyspaceName) vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index ee9ff947333..a1dcd3219f6 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -212,10 +212,6 @@ func init() { servenv.OnParseFor("vtcombo", registerFlags) } -func getTxMode() vtgatepb.TransactionMode { - return transactionMode.Get() -} - var ( // vschemaCounters needs to be initialized before planner to // catch the initial load stats. From ea782804cd6e0f589fa6805aa6df07ece7771c1e Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 20 Dec 2024 21:01:18 +0530 Subject: [PATCH 3/6] feat: add test for verifying dynamic config works Signed-off-by: Manan Gupta --- go/test/endtoend/cluster/vtgate_process.go | 58 +++++++++++++++++++ .../endtoend/transaction/twopc/main_test.go | 3 + .../endtoend/transaction/twopc/twopc_test.go | 32 ++++++++++ 3 files changed, 93 insertions(+) diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index 3e8c2635873..4253fbb5860 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -28,6 +28,7 @@ import ( "strconv" "strings" "syscall" + "testing" "time" "vitess.io/vitess/go/vt/log" @@ -82,6 +83,63 @@ func (vtgate *VtgateProcess) RewriteConfiguration() error { return os.WriteFile(vtgate.ConfigFile, []byte(vtgate.Config.ToJSONString()), 0644) } +// WaitForConfig waits for the expectedConfig to be present in the vtgate configuration. +func (vtgate *VtgateProcess) WaitForConfig(expectedConfig string) error { + timeout := time.After(30 * time.Second) + var response string + for { + select { + case <-timeout: + return fmt.Errorf("timed out waiting for api to work. Last response - %s", response) + default: + _, response, _ = vtgate.MakeAPICall("/debug/config") + if strings.Contains(response, expectedConfig) { + return nil + } + time.Sleep(1 * time.Second) + } + } +} + +// MakeAPICall makes an API call on the given endpoint of VTOrc +func (vtgate *VtgateProcess) MakeAPICall(endpoint string) (status int, response string, err error) { + url := fmt.Sprintf("http://localhost:%d/%s", vtgate.Port, endpoint) + resp, err := http.Get(url) + if err != nil { + if resp != nil { + status = resp.StatusCode + } + return status, "", err + } + defer func() { + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + }() + + respByte, _ := io.ReadAll(resp.Body) + return resp.StatusCode, string(respByte), err +} + +// MakeAPICallRetry is used to make an API call and retries until success +func (vtgate *VtgateProcess) MakeAPICallRetry(t *testing.T, url string) { + t.Helper() + timeout := time.After(10 * time.Second) + for { + select { + case <-timeout: + t.Fatal("timed out waiting for api to work") + return + default: + status, _, err := vtgate.MakeAPICall(url) + if err == nil && status == 200 { + return + } + time.Sleep(1 * time.Second) + } + } +} + const defaultVtGatePlannerVersion = planbuilder.Gen4 // Setup starts Vtgate process with required arguements diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 37337a62167..3607beea72a 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -106,6 +106,9 @@ func TestMain(m *testing.M) { if err := clusterInstance.VtgateProcess.RewriteConfiguration(); err != nil { return 1 } + if err := clusterInstance.VtgateProcess.WaitForConfig(`"transaction_mode":"TWOPC"`); err != nil { + return 1 + } vtParams = clusterInstance.GetVTParams(keyspaceName) vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index a760cfb24b3..b7f7c11fba9 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -44,6 +44,38 @@ import ( "vitess.io/vitess/go/vt/vttablet/grpctmclient" ) +// TestDynamicConfig tests that transaction mode is dynamically configurable. +func TestDynamicConfig(t *testing.T) { + conn, closer := start(t) + defer closer() + defer conn.Close() + + // Ensure that initially running a distributed transaction is possible. + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") + utils.Exec(t, conn, "commit") + + clusterInstance.VtgateProcess.Config.TransactionMode = "SINGLE" + defer func() { + clusterInstance.VtgateProcess.Config.TransactionMode = "TWOPC" + err := clusterInstance.VtgateProcess.RewriteConfiguration() + require.NoError(t, err) + }() + err := clusterInstance.VtgateProcess.RewriteConfiguration() + require.NoError(t, err) + err = clusterInstance.VtgateProcess.WaitForConfig(`"transaction_mode":"SINGLE"`) + require.NoError(t, err) + + // After the config changes verify running a distributed transaction fails. + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(20, 4)") + _, err = utils.ExecAllowError(t, conn, "insert into twopc_t1(id, col) values(22, 4)") + require.ErrorContains(t, err, "multi-db transaction attempted") + utils.Exec(t, conn, "rollback") +} + // TestDTCommit tests distributed transaction commit for insert, update and delete operations // It verifies the binlog events for the same with transaction state changes and redo statements. func TestDTCommit(t *testing.T) { From 4d5f6e1f5c07a3cacfdb906adc59437cb6cd2a53 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Sun, 22 Dec 2024 18:52:39 +0530 Subject: [PATCH 4/6] test: fix test added due to merge from main Signed-off-by: Manan Gupta --- go/vt/vtgate/executor_select_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 23969db384d..16628729ac6 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -3326,7 +3326,7 @@ func TestStreamOrderByWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start From 34e753ce749c96720053202973f71c38713a5914 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 26 Dec 2024 07:56:52 +0530 Subject: [PATCH 5/6] feat: address review comments Signed-off-by: Manan Gupta --- go/vt/vtgate/staticconfig.go | 3 +++ go/vt/vtgate/viperconfig.go | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/staticconfig.go b/go/vt/vtgate/staticconfig.go index 91ea20a2f36..f78545ebc5b 100644 --- a/go/vt/vtgate/staticconfig.go +++ b/go/vt/vtgate/staticconfig.go @@ -18,6 +18,9 @@ package vtgate import vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" +// StaticConfig is a static configuration for vtgate. +// It is used for tests and vtexplain_vtgate where we don't want the user to +// control certain configs. type StaticConfig struct { OnlineDDLEnabled bool DirectDDLEnabled bool diff --git a/go/vt/vtgate/viperconfig.go b/go/vt/vtgate/viperconfig.go index c306a839be8..68430b7be2c 100644 --- a/go/vt/vtgate/viperconfig.go +++ b/go/vt/vtgate/viperconfig.go @@ -17,26 +17,34 @@ limitations under the License. package vtgate import ( + "vitess.io/vitess/go/viperutil" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) // DynamicViperConfig is a dynamic config that uses viper. type DynamicViperConfig struct { + onlineDDL viperutil.Value[bool] + directDDL viperutil.Value[bool] + txMode viperutil.Value[vtgatepb.TransactionMode] } // NewDynamicViperConfig creates a new dynamic viper config func NewDynamicViperConfig() *DynamicViperConfig { - return &DynamicViperConfig{} + return &DynamicViperConfig{ + onlineDDL: enableOnlineDDL, + directDDL: enableDirectDDL, + txMode: transactionMode, + } } func (d *DynamicViperConfig) OnlineEnabled() bool { - return enableOnlineDDL.Get() + return d.onlineDDL.Get() } func (d *DynamicViperConfig) DirectEnabled() bool { - return enableDirectDDL.Get() + return d.directDDL.Get() } func (d *DynamicViperConfig) TransactionMode() vtgatepb.TransactionMode { - return transactionMode.Get() + return d.txMode.Get() } From 78d2976089629f770912aec2e51eae295ed79849 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 26 Dec 2024 14:42:44 +0530 Subject: [PATCH 6/6] feat: rename the files Signed-off-by: Manan Gupta --- go/vt/vtgate/{staticconfig.go => static_config.go} | 0 go/vt/vtgate/{viperconfig.go => viper_config.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename go/vt/vtgate/{staticconfig.go => static_config.go} (100%) rename go/vt/vtgate/{viperconfig.go => viper_config.go} (100%) diff --git a/go/vt/vtgate/staticconfig.go b/go/vt/vtgate/static_config.go similarity index 100% rename from go/vt/vtgate/staticconfig.go rename to go/vt/vtgate/static_config.go diff --git a/go/vt/vtgate/viperconfig.go b/go/vt/vtgate/viper_config.go similarity index 100% rename from go/vt/vtgate/viperconfig.go rename to go/vt/vtgate/viper_config.go