From 657c7c84ee3152735376c3864a1851d135aa995d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 24 Feb 2024 13:16:47 -0500 Subject: [PATCH 01/12] Only use replication tracker when it's enabled Signed-off-by: Matt Lord --- go/vt/vtgate/executor.go | 19 ++++++++++++++----- .../tabletserver/repltracker/poller.go | 4 ++-- .../tabletserver/repltracker/repltracker.go | 5 +++-- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 520214d65fd..30828dff897 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -61,6 +61,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" "vitess.io/vitess/go/vt/vthash" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) var ( @@ -937,7 +938,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp replIOThreadHealth := "" replSQLThreadHealth := "" replLastError := "" - replLag := int64(-1) + replLag := "-1" // A string to support NULL as a value sql := "show slave status" results, err := e.txConn.tabletGateway.Execute(ctx, ts.Target, sql, nil, 0, 0, nil) if err != nil || results == nil { @@ -948,8 +949,16 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp replIOThreadHealth = row["Slave_IO_Running"].ToString() replSQLThreadHealth = row["Slave_SQL_Running"].ToString() replLastError = row["Last_Error"].ToString() - if ts.Stats != nil { - replLag = int64(ts.Stats.ReplicationLagSeconds) + if tabletenv.NewCurrentConfig().ReplicationTracker.Mode == tabletenv.Disable { // Use the value from mysqld + if row["Seconds_Behind_Master"].IsNull() { + replLag = "NULL" // Uppercase to match mysqld's output in SHOW REPLICA STATUS + } else { + replLag = row["Seconds_Behind_Master"].ToString() + } + } else { // Use the value we get from the replication tracker + if ts.Stats != nil { + replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds) + } } } replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError) @@ -962,7 +971,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp ts.Tablet.Hostname, fmt.Sprintf("%s:%d", replSourceHost, replSourcePort), replicationHealth, - fmt.Sprintf("%d", replLag), + replLag, throttlerStatus, )) } @@ -1481,7 +1490,7 @@ func getTabletThrottlerStatus(tabletHostPort string) (string, error) { client := http.Client{ Timeout: 100 * time.Millisecond, } - resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort)) + resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort)) if err != nil { return "", err } diff --git a/go/vt/vttablet/tabletserver/repltracker/poller.go b/go/vt/vttablet/tabletserver/repltracker/poller.go index ace01dffb2d..6fc964bef57 100644 --- a/go/vt/vttablet/tabletserver/repltracker/poller.go +++ b/go/vt/vttablet/tabletserver/repltracker/poller.go @@ -21,10 +21,10 @@ import ( "time" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/mysqlctl" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var replicationLagSeconds = stats.NewGauge("replicationLagSec", "replication lag in seconds") diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker.go b/go/vt/vttablet/tabletserver/repltracker/repltracker.go index 6f504b2a445..c98005851d1 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker.go @@ -23,10 +23,11 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletserver/heartbeat" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( From 5076415862cf16f1126f2c2bc76094ce1fe1e7bc Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 24 Feb 2024 14:22:39 -0500 Subject: [PATCH 02/12] Add e2e test Signed-off-by: Matt Lord --- go/test/endtoend/tabletgateway/vtgate_test.go | 24 ++++++++++++++++++- go/vt/vtgate/executor_test.go | 11 +++++---- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index be227927981..1a81805288e 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -29,6 +29,7 @@ import ( "time" "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/topodata" "github.com/stretchr/testify/assert" @@ -59,7 +60,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { time.Sleep(2 * time.Second) verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL) ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) + conn, err := mysql.Connect(ctx, &vtParams) // VTGate require.NoError(t, err) defer conn.Close() @@ -68,6 +69,27 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { expectNumRows := 2 numRows := len(qr.Rows) assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows)) + + // Stop VTOrc so that it doesn't immediately repair/restart replication. + for _, vtorcProcess := range clusterInstance.VTOrcProcesses { + if err := vtorcProcess.TearDown(); err != nil { + log.Errorf("Error in vtorc teardown: %v", err) + } + } + // Stop replication on the non-primary tablets. + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave") + require.NoError(t, err) + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave") + require.NoError(t, err) + time.Sleep(2 * time.Second) // Build up some replication lag + res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false) + require.NoError(t, err) + expectNumRows = 2 + numRows = len(qr.Rows) + assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows)) + rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row + lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL" + assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString()) } func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 5df4c7887f6..cc4092c7a00 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -42,11 +42,6 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/discovery" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/buffer" @@ -55,6 +50,12 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func TestExecutorResultsExceeded(t *testing.T) { From f1bbe3544dd77eef0007ec71262565123d1c5893 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 24 Feb 2024 15:16:22 -0500 Subject: [PATCH 03/12] Fix tests Signed-off-by: Matt Lord --- go/test/endtoend/tabletgateway/vtgate_test.go | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index 1a81805288e..10c55a6a80d 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -29,7 +29,6 @@ import ( "time" "vitess.io/vitess/go/test/endtoend/utils" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/topodata" "github.com/stretchr/testify/assert" @@ -70,12 +69,18 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { numRows := len(qr.Rows) assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows)) - // Stop VTOrc so that it doesn't immediately repair/restart replication. + // Stop any VTOrc(s) so that it doesn't immediately repair/restart replication. for _, vtorcProcess := range clusterInstance.VTOrcProcesses { - if err := vtorcProcess.TearDown(); err != nil { - log.Errorf("Error in vtorc teardown: %v", err) - } + err := vtorcProcess.TearDown() + require.NoError(t, err) } + // Restart them afterward as the cluster is re-used. + defer func() { + for _, vtorcProcess := range clusterInstance.VTOrcProcesses { + err := vtorcProcess.Setup() + require.NoError(t, err) + } + }() // Stop replication on the non-primary tablets. _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave") require.NoError(t, err) @@ -86,7 +91,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { require.NoError(t, err) expectNumRows = 2 numRows = len(qr.Rows) - assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows)) + assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status, expected %d, got %d", expectNumRows, numRows)) rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL" assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString()) @@ -112,6 +117,11 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE) require.NoError(t, err) + // Change it back to RDONLY afterward as the cluster is re-used. + defer func() { + err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY) + require.NoError(t, err) + }() // Only returns rows for REPLICA and RDONLY tablets -- so should be 1 of them since we updated 1 to spare qr = utils.Exec(t, conn, "show vitess_replication_status like '%'") From d24a354c65277024a094f0ce0b27c7ab4fa4e644 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 24 Feb 2024 15:24:22 -0500 Subject: [PATCH 04/12] Minor changes after self review Signed-off-by: Matt Lord --- go/test/endtoend/tabletgateway/vtgate_test.go | 2 +- go/vt/vtgate/executor.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index 10c55a6a80d..a16fe6a99c3 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -81,7 +81,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { require.NoError(t, err) } }() - // Stop replication on the non-primary tablets. + // Stop replication on the non-PRIMARY tablets. _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave") require.NoError(t, err) _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave") diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 30828dff897..8877a39bb68 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -951,7 +951,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp replLastError = row["Last_Error"].ToString() if tabletenv.NewCurrentConfig().ReplicationTracker.Mode == tabletenv.Disable { // Use the value from mysqld if row["Seconds_Behind_Master"].IsNull() { - replLag = "NULL" // Uppercase to match mysqld's output in SHOW REPLICA STATUS + replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS } else { replLag = row["Seconds_Behind_Master"].ToString() } From 2d487fb50f17b7677aa222a1282e7a2c1e58b10b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 24 Feb 2024 15:54:24 -0500 Subject: [PATCH 05/12] Reset cluster state everywhere in test Signed-off-by: Matt Lord --- go/test/endtoend/tabletgateway/vtgate_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index a16fe6a99c3..98b1167f329 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -86,6 +86,13 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { require.NoError(t, err) _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave") require.NoError(t, err) + // Restart replication as the cluster is re-used. + defer func() { + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave") + require.NoError(t, err) + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave") + require.NoError(t, err) + }() time.Sleep(2 * time.Second) // Build up some replication lag res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false) require.NoError(t, err) From 6d07ac271f4e1a8d852a6d5b165c31039a5b8255 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 25 Feb 2024 11:13:14 -0500 Subject: [PATCH 06/12] Address throttler review feedback Signed-off-by: Matt Lord --- go/vt/vtgate/executor.go | 66 ++++++------------- .../tabletserver/throttle/throttlerapp/app.go | 4 +- 2 files changed, 24 insertions(+), 46 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 8877a39bb68..41ed8ae453a 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "strings" "sync" @@ -29,6 +28,7 @@ import ( "time" "github.com/spf13/pflag" + "google.golang.org/protobuf/encoding/protojson" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/cache/theine" @@ -40,11 +40,6 @@ import ( "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" @@ -62,6 +57,15 @@ import ( "vitess.io/vitess/go/vt/vtgate/vtgateservice" "vitess.io/vitess/go/vt/vthash" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var ( @@ -910,6 +914,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp rows := [][]sqltypes.Value{} status := e.scatterConn.GetHealthCheckCacheStatus() + tmc := tmclient.NewTabletManagerClient() for _, s := range status { for _, ts := range s.TabletsStats { @@ -928,9 +933,16 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp } tabletHostPort := ts.GetTabletHostPort() - throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort) + + res, err := tmc.CheckThrottler(ctx, ts.Tablet, &tabletmanagerdatapb.CheckThrottlerRequest{ + AppName: throttlerapp.VTGateName, + }) + if err != nil { + log.Warningf("Could not get check the tablet throttler on %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err) + } + throttlerStatus, err := protojson.Marshal(res) if err != nil { - log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err) + log.Warningf("Invalid tablet throttler response from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err) } replSourceHost := "" @@ -972,7 +984,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp fmt.Sprintf("%s:%d", replSourceHost, replSourcePort), replicationHealth, replLag, - throttlerStatus, + string(throttlerStatus), )) } } @@ -1486,42 +1498,6 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument") } -func getTabletThrottlerStatus(tabletHostPort string) (string, error) { - client := http.Client{ - Timeout: 100 * time.Millisecond, - } - resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort)) - if err != nil { - return "", err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", err - } - - var elements struct { - StatusCode int - Value float64 - Threshold float64 - Message string - } - err = json.Unmarshal(body, &elements) - if err != nil { - return "", err - } - - httpStatusStr := http.StatusText(elements.StatusCode) - - load := float64(0) - if elements.Threshold > 0 { - load = float64((elements.Value / elements.Threshold) * 100) - } - - status := fmt.Sprintf("{\"state\":\"%s\",\"load\":%.2f,\"message\":\"%s\"}", httpStatusStr, load, elements.Message) - return status, nil -} - // ReleaseLock implements the IExecutor interface func (e *Executor) ReleaseLock(ctx context.Context, session *SafeSession) error { return e.txConn.ReleaseLock(ctx, session) diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index 4f1f5857837..33dd651c007 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -41,7 +41,7 @@ func (n Name) Concatenate(other Name) Name { } const ( - // DefaultName is the app name used by vitess when app doesn't indicate its name + // DefaultName is the app name used by vitess when app doesn't indicate its name. DefaultName Name = "default" VitessName Name = "vitess" @@ -62,6 +62,8 @@ const ( BinlogWatcherName Name = "binlog-watcher" MessagerName Name = "messager" SchemaTrackerName Name = "schema-tracker" + + VTGateName = "vtgate" ) var ( From ab510e8417a656a261aa78fe41ecf1762ce1f53c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 25 Feb 2024 12:57:18 -0500 Subject: [PATCH 07/12] Revert "Address throttler review feedback" This reverts commit 6d07ac271f4e1a8d852a6d5b165c31039a5b8255. CheckThrottler is a tmclient RPC can thus can't be called from a non-tablet. Signed-off-by: Matt Lord --- go/vt/vtgate/executor.go | 66 +++++++++++++------ .../tabletserver/throttle/throttlerapp/app.go | 4 +- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 41ed8ae453a..8877a39bb68 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "strings" "sync" @@ -28,7 +29,6 @@ import ( "time" "github.com/spf13/pflag" - "google.golang.org/protobuf/encoding/protojson" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/cache/theine" @@ -40,6 +40,11 @@ import ( "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" @@ -57,15 +62,6 @@ import ( "vitess.io/vitess/go/vt/vtgate/vtgateservice" "vitess.io/vitess/go/vt/vthash" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" - "vitess.io/vitess/go/vt/vttablet/tmclient" - - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var ( @@ -914,7 +910,6 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp rows := [][]sqltypes.Value{} status := e.scatterConn.GetHealthCheckCacheStatus() - tmc := tmclient.NewTabletManagerClient() for _, s := range status { for _, ts := range s.TabletsStats { @@ -933,16 +928,9 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp } tabletHostPort := ts.GetTabletHostPort() - - res, err := tmc.CheckThrottler(ctx, ts.Tablet, &tabletmanagerdatapb.CheckThrottlerRequest{ - AppName: throttlerapp.VTGateName, - }) - if err != nil { - log.Warningf("Could not get check the tablet throttler on %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err) - } - throttlerStatus, err := protojson.Marshal(res) + throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort) if err != nil { - log.Warningf("Invalid tablet throttler response from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err) + log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err) } replSourceHost := "" @@ -984,7 +972,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp fmt.Sprintf("%s:%d", replSourceHost, replSourcePort), replicationHealth, replLag, - string(throttlerStatus), + throttlerStatus, )) } } @@ -1498,6 +1486,42 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument") } +func getTabletThrottlerStatus(tabletHostPort string) (string, error) { + client := http.Client{ + Timeout: 100 * time.Millisecond, + } + resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort)) + if err != nil { + return "", err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + + var elements struct { + StatusCode int + Value float64 + Threshold float64 + Message string + } + err = json.Unmarshal(body, &elements) + if err != nil { + return "", err + } + + httpStatusStr := http.StatusText(elements.StatusCode) + + load := float64(0) + if elements.Threshold > 0 { + load = float64((elements.Value / elements.Threshold) * 100) + } + + status := fmt.Sprintf("{\"state\":\"%s\",\"load\":%.2f,\"message\":\"%s\"}", httpStatusStr, load, elements.Message) + return status, nil +} + // ReleaseLock implements the IExecutor interface func (e *Executor) ReleaseLock(ctx context.Context, session *SafeSession) error { return e.txConn.ReleaseLock(ctx, session) diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index 33dd651c007..4f1f5857837 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -41,7 +41,7 @@ func (n Name) Concatenate(other Name) Name { } const ( - // DefaultName is the app name used by vitess when app doesn't indicate its name. + // DefaultName is the app name used by vitess when app doesn't indicate its name DefaultName Name = "default" VitessName Name = "vitess" @@ -62,8 +62,6 @@ const ( BinlogWatcherName Name = "binlog-watcher" MessagerName Name = "messager" SchemaTrackerName Name = "schema-tracker" - - VTGateName = "vtgate" ) var ( From 63e25695b32b602bf0d18f387a422de82f0c410f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 25 Feb 2024 13:02:30 -0500 Subject: [PATCH 08/12] Add vtgate name to throttlerapp Signed-off-by: Matt Lord --- go/vt/vtgate/executor.go | 8 ++++++-- go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 8877a39bb68..ecb08c4c4fe 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -62,6 +62,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/vtgateservice" "vitess.io/vitess/go/vt/vthash" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var ( @@ -930,7 +931,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp tabletHostPort := ts.GetTabletHostPort() throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort) if err != nil { - log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err) + log.Warningf("Could not get throttler status from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err) } replSourceHost := "" @@ -1486,11 +1487,14 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument") } +// getTabletThrottlerStatus uses HTTP to get the throttler status +// from a tablet. It uses HTTP because the CheckThrottler RPC is +// a tmclient RPC and you cannot use tmclient outside of a tablet. func getTabletThrottlerStatus(tabletHostPort string) (string, error) { client := http.Client{ Timeout: 100 * time.Millisecond, } - resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort)) + resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=%s", tabletHostPort, throttlerapp.VitessName)) if err != nil { return "", err } diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index 4f1f5857837..b743396d9a0 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -62,6 +62,8 @@ const ( BinlogWatcherName Name = "binlog-watcher" MessagerName Name = "messager" SchemaTrackerName Name = "schema-tracker" + + VTGateName = "vtgate" ) var ( From f4819d8cb650df9d76cd6d141e2de33061815d76 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 25 Feb 2024 13:20:19 -0500 Subject: [PATCH 09/12] Replace errant usage of tabletenv in vtgate Signed-off-by: Matt Lord --- go/vt/vtgate/executor.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index ecb08c4c4fe..5d7438f93b4 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -61,7 +61,6 @@ import ( "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" "vitess.io/vitess/go/vt/vthash" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) @@ -950,16 +949,25 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp replIOThreadHealth = row["Slave_IO_Running"].ToString() replSQLThreadHealth = row["Slave_SQL_Running"].ToString() replLastError = row["Last_Error"].ToString() - if tabletenv.NewCurrentConfig().ReplicationTracker.Mode == tabletenv.Disable { // Use the value from mysqld + // We cannot check the tablet's tabletenv config from here so + // we only use the tablet's stat -- which is managed by the + // ReplicationTracker -- if we can tell that it's enabled, + // meaning that it has a non-zero value. If it's actually + // enabled AND zero (rather than the zeroval), then mysqld + // should also return 0 so in this case the value is correct + // and equivalent either way. The only reason that we would + // want to use the ReplicationTracker based value, when we + // can, is because the polling method allows us to get the + // estimated lag value when replication is not running (based + // on how long we've seen that it's not been running). + if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the replication tracker + replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds) + } else { // Use the value from mysqld if row["Seconds_Behind_Master"].IsNull() { replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS } else { replLag = row["Seconds_Behind_Master"].ToString() } - } else { // Use the value we get from the replication tracker - if ts.Stats != nil { - replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds) - } } } replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError) From 266eccbdfe74a33685b07e3224b21dad97b06066 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 25 Feb 2024 13:30:45 -0500 Subject: [PATCH 10/12] Use check-self on the non-PRIMARY tablets As check?app=vtgate is only valid on a PRIMARY. Also, vtgate's don't interface directly with the tablet throttler so it doesn't make sense to use the vtgate app name. Signed-off-by: Matt Lord --- go/vt/vtgate/executor.go | 9 ++++----- go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go | 2 -- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 5d7438f93b4..9f14888c846 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -61,7 +61,6 @@ import ( "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" "vitess.io/vitess/go/vt/vthash" - "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var ( @@ -960,7 +959,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp // can, is because the polling method allows us to get the // estimated lag value when replication is not running (based // on how long we've seen that it's not been running). - if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the replication tracker + if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the ReplicationTracker replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds) } else { // Use the value from mysqld if row["Seconds_Behind_Master"].IsNull() { @@ -1496,13 +1495,13 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P } // getTabletThrottlerStatus uses HTTP to get the throttler status -// from a tablet. It uses HTTP because the CheckThrottler RPC is -// a tmclient RPC and you cannot use tmclient outside of a tablet. +// on a tablet. It uses HTTP because the CheckThrottler RPC is a +// tmclient RPC and you cannot use tmclient outside of a tablet. func getTabletThrottlerStatus(tabletHostPort string) (string, error) { client := http.Client{ Timeout: 100 * time.Millisecond, } - resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=%s", tabletHostPort, throttlerapp.VitessName)) + resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort)) if err != nil { return "", err } diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index b743396d9a0..4f1f5857837 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -62,8 +62,6 @@ const ( BinlogWatcherName Name = "binlog-watcher" MessagerName Name = "messager" SchemaTrackerName Name = "schema-tracker" - - VTGateName = "vtgate" ) var ( From fc5f2f3a7594c6515b0d89992ee7039079e87e74 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 25 Feb 2024 18:28:19 -0500 Subject: [PATCH 11/12] Nittiest of comment nits Signed-off-by: Matt Lord --- go/test/endtoend/tabletgateway/vtgate_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index 98b1167f329..6ddc1bcad4b 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -86,7 +86,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { require.NoError(t, err) _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave") require.NoError(t, err) - // Restart replication as the cluster is re-used. + // Restart replication afterward as the cluster is re-used. defer func() { _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave") require.NoError(t, err) From bbcb44ae89a256726e67d200ad2ec2b886437f38 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 26 Feb 2024 12:30:59 -0500 Subject: [PATCH 12/12] Improve e2e test using review comments Signed-off-by: Matt Lord --- go/test/endtoend/tabletgateway/vtgate_test.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index 6ddc1bcad4b..c48aa6c2131 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -28,15 +28,14 @@ import ( "testing" "time" - "vitess.io/vitess/go/test/endtoend/utils" - "vitess.io/vitess/go/vt/proto/topodata" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils" + "vitess.io/vitess/go/vt/proto/topodata" ) func TestVtgateHealthCheck(t *testing.T) { @@ -69,16 +68,14 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { numRows := len(qr.Rows) assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows)) - // Stop any VTOrc(s) so that it doesn't immediately repair/restart replication. + // Disable VTOrc(s) recoveries so that it doesn't immediately repair/restart replication. for _, vtorcProcess := range clusterInstance.VTOrcProcesses { - err := vtorcProcess.TearDown() - require.NoError(t, err) + vtorcutils.DisableGlobalRecoveries(t, vtorcProcess) } - // Restart them afterward as the cluster is re-used. + // Re-enable recoveries afterward as the cluster is re-used. defer func() { for _, vtorcProcess := range clusterInstance.VTOrcProcesses { - err := vtorcProcess.Setup() - require.NoError(t, err) + vtorcutils.EnableGlobalRecoveries(t, vtorcProcess) } }() // Stop replication on the non-PRIMARY tablets.