diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index 1290156a1cd..4253fbb5860 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -28,6 +28,7 @@ import ( "strconv" "strings" "syscall" + "testing" "time" "vitess.io/vitess/go/vt/log" @@ -57,6 +58,8 @@ type VtgateProcess struct { Directory string VerifyURL string VSchemaURL string + ConfigFile string + Config VTGateConfiguration SysVarSetEnabled bool PlannerVersion plancontext.PlannerVersion // Extra Args to be set before starting the vtgate process @@ -66,6 +69,77 @@ type VtgateProcess struct { exit chan error } +type VTGateConfiguration struct { + TransactionMode string `json:"transaction_mode,omitempty"` +} + +// ToJSONString will marshal this configuration as JSON +func (config *VTGateConfiguration) ToJSONString() string { + b, _ := json.MarshalIndent(config, "", "\t") + return string(b) +} + +func (vtgate *VtgateProcess) RewriteConfiguration() error { + return os.WriteFile(vtgate.ConfigFile, []byte(vtgate.Config.ToJSONString()), 0644) +} + +// WaitForConfig waits for the expectedConfig to be present in the vtgate configuration. +func (vtgate *VtgateProcess) WaitForConfig(expectedConfig string) error { + timeout := time.After(30 * time.Second) + var response string + for { + select { + case <-timeout: + return fmt.Errorf("timed out waiting for api to work. Last response - %s", response) + default: + _, response, _ = vtgate.MakeAPICall("/debug/config") + if strings.Contains(response, expectedConfig) { + return nil + } + time.Sleep(1 * time.Second) + } + } +} + +// MakeAPICall makes an API call on the given endpoint of VTOrc +func (vtgate *VtgateProcess) MakeAPICall(endpoint string) (status int, response string, err error) { + url := fmt.Sprintf("http://localhost:%d/%s", vtgate.Port, endpoint) + resp, err := http.Get(url) + if err != nil { + if resp != nil { + status = resp.StatusCode + } + return status, "", err + } + defer func() { + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + }() + + respByte, _ := io.ReadAll(resp.Body) + return resp.StatusCode, string(respByte), err +} + +// MakeAPICallRetry is used to make an API call and retries until success +func (vtgate *VtgateProcess) MakeAPICallRetry(t *testing.T, url string) { + t.Helper() + timeout := time.After(10 * time.Second) + for { + select { + case <-timeout: + t.Fatal("timed out waiting for api to work") + return + default: + status, _, err := vtgate.MakeAPICall(url) + if err == nil && status == 200 { + return + } + time.Sleep(1 * time.Second) + } + } +} + const defaultVtGatePlannerVersion = planbuilder.Gen4 // Setup starts Vtgate process with required arguements @@ -74,6 +148,7 @@ func (vtgate *VtgateProcess) Setup() (err error) { "--topo_implementation", vtgate.CommonArg.TopoImplementation, "--topo_global_server_address", vtgate.CommonArg.TopoGlobalAddress, "--topo_global_root", vtgate.CommonArg.TopoGlobalRoot, + "--config-file", vtgate.ConfigFile, "--log_dir", vtgate.LogDir, "--log_queries_to_file", vtgate.FileToLogQueries, "--port", fmt.Sprintf("%d", vtgate.Port), @@ -98,6 +173,19 @@ func (vtgate *VtgateProcess) Setup() (err error) { break } } + configFile, err := os.Create(vtgate.ConfigFile) + if err != nil { + log.Errorf("cannot create config file for vtgate: %v", err) + return err + } + _, err = configFile.WriteString(vtgate.Config.ToJSONString()) + if err != nil { + return err + } + err = configFile.Close() + if err != nil { + return err + } if !msvflag { version, err := mysqlctl.GetVersionString() if err != nil { @@ -287,6 +375,7 @@ func VtgateProcessInstance( Name: "vtgate", Binary: "vtgate", FileToLogQueries: path.Join(tmpDirectory, "/vtgate_querylog.txt"), + ConfigFile: path.Join(tmpDirectory, fmt.Sprintf("vtgate-config-%d.json", port)), Directory: os.Getenv("VTDATAROOT"), ServiceMap: "grpc-tabletmanager,grpc-throttler,grpc-queryservice,grpc-updatestream,grpc-vtctl,grpc-vtgateservice", LogDir: tmpDirectory, diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 58fe45547a5..3607beea72a 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -77,7 +77,6 @@ func TestMain(m *testing.M) { // Set extra args for twopc clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, - "--transaction_mode", "TWOPC", "--grpc_use_effective_callerid", ) clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, @@ -103,6 +102,13 @@ func TestMain(m *testing.M) { if err := clusterInstance.StartVtgate(); err != nil { return 1 } + clusterInstance.VtgateProcess.Config.TransactionMode = "TWOPC" + if err := clusterInstance.VtgateProcess.RewriteConfiguration(); err != nil { + return 1 + } + if err := clusterInstance.VtgateProcess.WaitForConfig(`"transaction_mode":"TWOPC"`); err != nil { + return 1 + } vtParams = clusterInstance.GetVTParams(keyspaceName) vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index a760cfb24b3..b7f7c11fba9 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -44,6 +44,38 @@ import ( "vitess.io/vitess/go/vt/vttablet/grpctmclient" ) +// TestDynamicConfig tests that transaction mode is dynamically configurable. +func TestDynamicConfig(t *testing.T) { + conn, closer := start(t) + defer closer() + defer conn.Close() + + // Ensure that initially running a distributed transaction is possible. + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") + utils.Exec(t, conn, "commit") + + clusterInstance.VtgateProcess.Config.TransactionMode = "SINGLE" + defer func() { + clusterInstance.VtgateProcess.Config.TransactionMode = "TWOPC" + err := clusterInstance.VtgateProcess.RewriteConfiguration() + require.NoError(t, err) + }() + err := clusterInstance.VtgateProcess.RewriteConfiguration() + require.NoError(t, err) + err = clusterInstance.VtgateProcess.WaitForConfig(`"transaction_mode":"SINGLE"`) + require.NoError(t, err) + + // After the config changes verify running a distributed transaction fails. + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(20, 4)") + _, err = utils.ExecAllowError(t, conn, "insert into twopc_t1(id, col) values(22, 4)") + require.ErrorContains(t, err, "multi-db transaction attempted") + utils.Exec(t, conn, "rollback") +} + // TestDTCommit tests distributed transaction commit for insert, update and delete operations // It verifies the binlog events for the same with transaction state changes and redo statements. func TestDTCommit(t *testing.T) { diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index f9ae8be3820..d45073cd006 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -74,7 +74,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests queryLogBufferSize := 10 plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) - vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0) + vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0, vtgate.NewDynamicViperConfig()) vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) return nil @@ -88,7 +88,9 @@ func (vte *VTExplain) newFakeResolver(ctx context.Context, opts *Options, serv s if opts.ExecutionMode == ModeTwoPC { txMode = vtgatepb.TransactionMode_TWOPC } - tc := vtgate.NewTxConn(gw, txMode) + tc := vtgate.NewTxConn(gw, &vtgate.StaticConfig{ + TxMode: txMode, + }) sc := vtgate.NewScatterConn("", tc, gw) srvResolver := srvtopo.NewResolver(serv, gw, cell) return vtgate.NewResolver(srvResolver, serv, cell, sc) diff --git a/go/vt/vtgate/dynamicconfig/config.go b/go/vt/vtgate/dynamicconfig/config.go index 5bb1d991eae..014160029cd 100644 --- a/go/vt/vtgate/dynamicconfig/config.go +++ b/go/vt/vtgate/dynamicconfig/config.go @@ -1,6 +1,28 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package dynamicconfig +import vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + type DDL interface { OnlineEnabled() bool DirectEnabled() bool } + +type TxMode interface { + TransactionMode() vtgatepb.TransactionMode +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index d3d2ba8e8fd..0281e28700f 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/pflag" vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/vtgate/dynamicconfig" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/cache/theine" @@ -136,7 +137,8 @@ type Executor struct { warmingReadsPercent int warmingReadsChannel chan bool - vConfig econtext.VCursorConfig + vConfig econtext.VCursorConfig + ddlConfig dynamicconfig.DDL } var executorOnce sync.Once @@ -168,6 +170,7 @@ func NewExecutor( noScatter bool, pv plancontext.PlannerVersion, warmingReadsPercent int, + ddlConfig dynamicconfig.DDL, ) *Executor { e := &Executor{ env: env, @@ -183,6 +186,7 @@ func NewExecutor( plans: plans, warmingReadsPercent: warmingReadsPercent, warmingReadsChannel: make(chan bool, warmingReadsConcurrency), + ddlConfig: ddlConfig, } // setting the vcursor config. e.initVConfig(warnOnShardedOnly, pv) @@ -484,7 +488,7 @@ func (e *Executor) addNeededBindVars(vcursor *econtext.VCursorImpl, bindVarNeeds case sysvars.TransactionMode.Name: txMode := session.TransactionMode if txMode == vtgatepb.TransactionMode_UNSPECIFIED { - txMode = getTxMode() + txMode = transactionMode.Get() } bindVars[key] = sqltypes.StringBindVariable(txMode.String()) case sysvars.Workload.Name: @@ -1156,11 +1160,7 @@ func (e *Executor) buildStatement( reservedVars *sqlparser.ReservedVars, bindVarNeeds *sqlparser.BindVarNeeds, ) (*engine.Plan, error) { - cfg := &dynamicViperConfig{ - onlineDDL: enableOnlineDDL, - directDDL: enableDirectDDL, - } - plan, err := planbuilder.BuildFromStmt(ctx, query, stmt, reservedVars, vcursor, bindVarNeeds, cfg) + plan, err := planbuilder.BuildFromStmt(ctx, query, stmt, reservedVars, vcursor, bindVarNeeds, e.ddlConfig) if err != nil { return nil, err } diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 2ee3425209f..43987217039 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -183,7 +183,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta // one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness. plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} @@ -232,7 +232,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex plans := DefaultPlanCache() env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion}) require.NoError(t, err) - executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -269,7 +269,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -294,7 +294,7 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context, replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 411f19bb30d..16628729ac6 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1644,7 +1644,7 @@ func TestSelectListArg(t *testing.T) { func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) ex.SetQueryLogger(queryLogger) return ex } @@ -3326,7 +3326,7 @@ func TestStreamOrderByWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start @@ -3369,7 +3369,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index a8500dd59c4..8bb10aae8fb 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -68,7 +68,7 @@ func TestStreamSQLSharded(t *testing.T) { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index e31c5ae8161..fecd6c2a8b1 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -522,7 +522,7 @@ func TestScatterConnSingleDB(t *testing.T) { assert.Contains(t, errors[0].Error(), want) // TransactionMode_SINGLE in txconn - sc.txConn.mode = vtgatepb.TransactionMode_SINGLE + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_SINGLE} session = econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true}) _, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false) require.Empty(t, errors) @@ -531,7 +531,7 @@ func TestScatterConnSingleDB(t *testing.T) { assert.Contains(t, errors[0].Error(), want) // TransactionMode_MULTI in txconn. Should not fail. - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} session = econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true}) _, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false) require.Empty(t, errors) @@ -622,6 +622,8 @@ func newTestScatterConn(ctx context.Context, hc discovery.HealthCheck, serv srvt // in '-cells_to_watch' command line parameter, which is // empty by default. So it's unused in this test, set to nil. gw := NewTabletGateway(ctx, hc, serv, cell) - tc := NewTxConn(gw, vtgatepb.TransactionMode_MULTI) + tc := NewTxConn(gw, &StaticConfig{ + TxMode: vtgatepb.TransactionMode_MULTI, + }) return NewScatterConn("", tc, gw) } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index edba48a9151..85f236a9a18 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -685,7 +685,7 @@ func (stc *ScatterConn) multiGoTransaction( startTime, statsKey := stc.startAction(name, rs.Target) defer stc.endAction(startTime, allErrors, statsKey, &err, session) - info, shardSession, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.mode) + info, shardSession, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.txMode.TransactionMode()) if err != nil { return } @@ -702,7 +702,7 @@ func (stc *ScatterConn) multiGoTransaction( shardSession.RowsAffected = info.rowsAffected } if info.actionNeeded != nothing && (info.transactionID != 0 || info.reservedID != 0) { - appendErr := session.AppendOrUpdate(rs.Target, info, shardSession, stc.txConn.mode) + appendErr := session.AppendOrUpdate(rs.Target, info, shardSession, stc.txConn.txMode.TransactionMode()) if appendErr != nil { err = appendErr } diff --git a/go/vt/vtgate/static_config.go b/go/vt/vtgate/static_config.go new file mode 100644 index 00000000000..f78545ebc5b --- /dev/null +++ b/go/vt/vtgate/static_config.go @@ -0,0 +1,40 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + +// StaticConfig is a static configuration for vtgate. +// It is used for tests and vtexplain_vtgate where we don't want the user to +// control certain configs. +type StaticConfig struct { + OnlineDDLEnabled bool + DirectDDLEnabled bool + TxMode vtgatepb.TransactionMode +} + +func (s *StaticConfig) OnlineEnabled() bool { + return s.OnlineDDLEnabled +} + +func (s *StaticConfig) DirectEnabled() bool { + return s.DirectDDLEnabled +} + +func (s *StaticConfig) TransactionMode() vtgatepb.TransactionMode { + return s.TxMode +} diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index cadb1392eca..dbd76b04c7a 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -33,6 +33,7 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/dynamicconfig" econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" "vitess.io/vitess/go/vt/vttablet/queryservice" ) @@ -44,14 +45,14 @@ const nonAtomicCommitWarnMaxShards = 16 // TxConn is used for executing transactional requests. type TxConn struct { tabletGateway *TabletGateway - mode vtgatepb.TransactionMode + txMode dynamicconfig.TxMode } // NewTxConn builds a new TxConn. -func NewTxConn(gw *TabletGateway, txMode vtgatepb.TransactionMode) *TxConn { +func NewTxConn(gw *TabletGateway, txMode dynamicconfig.TxMode) *TxConn { return &TxConn{ tabletGateway: gw, - mode: txMode, + txMode: txMode, } } @@ -114,7 +115,7 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er case vtgatepb.TransactionMode_TWOPC: twopc = true case vtgatepb.TransactionMode_UNSPECIFIED: - twopc = txc.mode == vtgatepb.TransactionMode_TWOPC + twopc = txc.txMode.TransactionMode() == vtgatepb.TransactionMode_TWOPC } defer recordCommitTime(session, twopc, time.Now()) diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index d96f0b8fccf..6d31aa4e543 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -72,7 +72,7 @@ func TestTxConnCommitFailure(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbcs, rssm, rssa := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 3) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"] // Sequence the executes to ensure commit order @@ -173,7 +173,7 @@ func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 18) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"] // Sequence the executes to ensure commit order @@ -227,7 +227,7 @@ func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 17) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"] // Sequence the executes to ensure commit order @@ -281,7 +281,7 @@ func TestTxConnCommitSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} // Sequence the executes to ensure commit order session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true}) @@ -334,7 +334,7 @@ func TestTxConnReservedCommitSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} // Sequence the executes to ensure commit order session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true, InReservedConn: true}) @@ -419,7 +419,7 @@ func TestTxConnReservedOn2ShardTxOn1ShardAndCommit(t *testing.T) { keyspace := "TestTxConn" sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, keyspace) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} // Sequence the executes to ensure shard session order session := econtext.NewSafeSession(&vtgatepb.Session{InReservedConn: true}) @@ -514,7 +514,7 @@ func TestTxConnReservedOn2ShardTxOn1ShardAndRollback(t *testing.T) { keyspace := "TestTxConn" sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, keyspace) - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} // Sequence the executes to ensure shard session order session := econtext.NewSafeSession(&vtgatepb.Session{InReservedConn: true}) @@ -608,7 +608,7 @@ func TestTxConnCommitOrderFailure1(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{Sql: "query1"}} @@ -641,7 +641,7 @@ func TestTxConnCommitOrderFailure2(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{ Sql: "query1", @@ -675,7 +675,7 @@ func TestTxConnCommitOrderFailure3(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{ Sql: "query1", @@ -717,7 +717,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{ Sql: "query1", @@ -815,7 +815,7 @@ func TestTxConnReservedCommitOrderSuccess(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - sc.txConn.mode = vtgatepb.TransactionMode_MULTI + sc.txConn.txMode = &StaticConfig{TxMode: vtgatepb.TransactionMode_MULTI} queries := []*querypb.BoundQuery{{ Sql: "query1", diff --git a/go/vt/vtgate/viper_config.go b/go/vt/vtgate/viper_config.go new file mode 100644 index 00000000000..68430b7be2c --- /dev/null +++ b/go/vt/vtgate/viper_config.go @@ -0,0 +1,50 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "vitess.io/vitess/go/viperutil" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" +) + +// DynamicViperConfig is a dynamic config that uses viper. +type DynamicViperConfig struct { + onlineDDL viperutil.Value[bool] + directDDL viperutil.Value[bool] + txMode viperutil.Value[vtgatepb.TransactionMode] +} + +// NewDynamicViperConfig creates a new dynamic viper config +func NewDynamicViperConfig() *DynamicViperConfig { + return &DynamicViperConfig{ + onlineDDL: enableOnlineDDL, + directDDL: enableDirectDDL, + txMode: transactionMode, + } +} + +func (d *DynamicViperConfig) OnlineEnabled() bool { + return d.onlineDDL.Get() +} + +func (d *DynamicViperConfig) DirectEnabled() bool { + return d.directDDL.Get() +} + +func (d *DynamicViperConfig) TransactionMode() vtgatepb.TransactionMode { + return d.txMode.Get() +} diff --git a/go/vt/vtgate/viperconfig.go b/go/vt/vtgate/viperconfig.go deleted file mode 100644 index ec77ff62d4f..00000000000 --- a/go/vt/vtgate/viperconfig.go +++ /dev/null @@ -1,16 +0,0 @@ -package vtgate - -import "vitess.io/vitess/go/viperutil" - -type dynamicViperConfig struct { - onlineDDL viperutil.Value[bool] - directDDL viperutil.Value[bool] -} - -func (d *dynamicViperConfig) OnlineEnabled() bool { - return d.onlineDDL.Get() -} - -func (d *dynamicViperConfig) DirectEnabled() bool { - return d.directDDL.Get() -} diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 8bab05479dd..a1dcd3219f6 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -29,6 +29,7 @@ import ( "time" "github.com/spf13/pflag" + "github.com/spf13/viper" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/sqltypes" @@ -60,7 +61,6 @@ import ( ) var ( - transactionMode = "MULTI" normalizeQueries = true streamBufferSize = 32 * 1024 @@ -114,6 +114,33 @@ var ( }, ) + transactionMode = viperutil.Configure( + "transaction_mode", + viperutil.Options[vtgatepb.TransactionMode]{ + FlagName: "transaction_mode", + Default: vtgatepb.TransactionMode_MULTI, + Dynamic: true, + GetFunc: func(v *viper.Viper) func(key string) vtgatepb.TransactionMode { + return func(key string) vtgatepb.TransactionMode { + txMode := v.GetString(key) + switch strings.ToLower(txMode) { + case "single": + return vtgatepb.TransactionMode_SINGLE + case "multi": + return vtgatepb.TransactionMode_MULTI + case "twopc": + return vtgatepb.TransactionMode_TWOPC + default: + fmt.Printf("Invalid option: %v\n", txMode) + fmt.Println("Usage: -transaction_mode {SINGLE | MULTI | TWOPC}") + os.Exit(1) + return -1 + } + } + }, + }, + ) + // schema tracking flags enableSchemaChangeSignal = true enableViews bool @@ -138,7 +165,7 @@ var ( ) func registerFlags(fs *pflag.FlagSet) { - fs.StringVar(&transactionMode, "transaction_mode", transactionMode, "SINGLE: disallow multi-db transactions, MULTI: allow multi-db transactions with best effort commit, TWOPC: allow multi-db transactions with 2pc commit") + fs.String("transaction_mode", "MULTI", "SINGLE: disallow multi-db transactions, MULTI: allow multi-db transactions with best effort commit, TWOPC: allow multi-db transactions with 2pc commit") fs.BoolVar(&normalizeQueries, "normalize_queries", normalizeQueries, "Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars.") fs.BoolVar(&terseErrors, "vtgate-config-terse-errors", terseErrors, "prevent bind vars from escaping in returned errors") fs.IntVar(&truncateErrorLen, "truncate-error-len", truncateErrorLen, "truncate errors sent to client if they are longer than this value (0 means do not truncate)") @@ -173,7 +200,11 @@ func registerFlags(fs *pflag.FlagSet) { fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed") fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries") - viperutil.BindFlags(fs, enableOnlineDDL, enableDirectDDL) + viperutil.BindFlags(fs, + enableOnlineDDL, + enableDirectDDL, + transactionMode, + ) } func init() { @@ -181,25 +212,6 @@ func init() { servenv.OnParseFor("vtcombo", registerFlags) } -func getTxMode() vtgatepb.TransactionMode { - switch strings.ToLower(transactionMode) { - case "single": - log.Infof("Transaction mode: '%s'", transactionMode) - return vtgatepb.TransactionMode_SINGLE - case "multi": - log.Infof("Transaction mode: '%s'", transactionMode) - return vtgatepb.TransactionMode_MULTI - case "twopc": - log.Infof("Transaction mode: '%s'", transactionMode) - return vtgatepb.TransactionMode_TWOPC - default: - fmt.Printf("Invalid option: %v\n", transactionMode) - fmt.Println("Usage: -transaction_mode {SINGLE | MULTI | TWOPC}") - os.Exit(1) - return -1 - } -} - var ( // vschemaCounters needs to be initialized before planner to // catch the initial load stats. @@ -287,6 +299,8 @@ func Init( log.Fatalf("tabletGateway.WaitForTablets failed: %v", err) } + dynamicConfig := NewDynamicViperConfig() + // If we want to filter keyspaces replace the srvtopo.Server with a // filtering server if discovery.FilteringKeyspaces() { @@ -301,7 +315,7 @@ func Init( if _, err := schema.ParseDDLStrategy(defaultDDLStrategy); err != nil { log.Fatalf("Invalid value for -ddl_strategy: %v", err.Error()) } - tc := NewTxConn(gw, getTxMode()) + tc := NewTxConn(gw, dynamicConfig) // ScatterConn depends on TxConn to perform forced rollbacks. sc := NewScatterConn("VttabletCall", tc, gw) // TxResolver depends on TxConn to complete distributed transaction. @@ -352,6 +366,7 @@ func Init( noScatter, pv, warmingReadsPercent, + dynamicConfig, ) if err := executor.defaultQueryLogger(); err != nil {