diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index be227927981..c48aa6c2131 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -28,15 +28,14 @@ import ( "testing" "time" - "vitess.io/vitess/go/test/endtoend/utils" - "vitess.io/vitess/go/vt/proto/topodata" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils" + "vitess.io/vitess/go/vt/proto/topodata" ) func TestVtgateHealthCheck(t *testing.T) { @@ -59,7 +58,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { time.Sleep(2 * time.Second) verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL) ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) + conn, err := mysql.Connect(ctx, &vtParams) // VTGate require.NoError(t, err) defer conn.Close() @@ -68,6 +67,38 @@ func TestVtgateReplicationStatusCheck(t *testing.T) { expectNumRows := 2 numRows := len(qr.Rows) assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows)) + + // Disable VTOrc(s) recoveries so that it doesn't immediately repair/restart replication. + for _, vtorcProcess := range clusterInstance.VTOrcProcesses { + vtorcutils.DisableGlobalRecoveries(t, vtorcProcess) + } + // Re-enable recoveries afterward as the cluster is re-used. + defer func() { + for _, vtorcProcess := range clusterInstance.VTOrcProcesses { + vtorcutils.EnableGlobalRecoveries(t, vtorcProcess) + } + }() + // Stop replication on the non-PRIMARY tablets. + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave") + require.NoError(t, err) + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave") + require.NoError(t, err) + // Restart replication afterward as the cluster is re-used. + defer func() { + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave") + require.NoError(t, err) + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave") + require.NoError(t, err) + }() + time.Sleep(2 * time.Second) // Build up some replication lag + res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false) + require.NoError(t, err) + expectNumRows = 2 + numRows = len(qr.Rows) + assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status, expected %d, got %d", expectNumRows, numRows)) + rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row + lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL" + assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString()) } func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { @@ -90,6 +121,11 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE) require.NoError(t, err) + // Change it back to RDONLY afterward as the cluster is re-used. + defer func() { + err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY) + require.NoError(t, err) + }() // Only returns rows for REPLICA and RDONLY tablets -- so should be 1 of them since we updated 1 to spare qr = utils.Exec(t, conn, "show vitess_replication_status like '%'") diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 07b5b016fcc..eeea70df443 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -1121,3 +1121,19 @@ func PrintVTOrcLogsOnFailure(t *testing.T, clusterInstance *cluster.LocalProcess log.Errorf("%s", string(content)) } } + +// EnableGlobalRecoveries enables global recoveries for the given VTOrc. +func EnableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) { + status, resp, err := MakeAPICall(t, vtorc, "/api/enable-global-recoveries") + require.NoError(t, err) + assert.Equal(t, 200, status) + assert.Equal(t, "Global recoveries enabled\n", resp) +} + +// DisableGlobalRecoveries disables global recoveries for the given VTOrc. +func DisableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) { + status, resp, err := MakeAPICall(t, vtorc, "/api/disable-global-recoveries") + require.NoError(t, err) + assert.Equal(t, 200, status) + assert.Equal(t, "Global recoveries disabled\n", resp) +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 867685b8fec..d1e555bf990 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -930,7 +930,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp tabletHostPort := ts.GetTabletHostPort() throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort) if err != nil { - log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err) + log.Warningf("Could not get throttler status from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err) } replSourceHost := "" @@ -938,7 +938,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp replIOThreadHealth := "" replSQLThreadHealth := "" replLastError := "" - replLag := int64(-1) + replLag := "-1" // A string to support NULL as a value sql := "show slave status" results, err := e.txConn.tabletGateway.Execute(ctx, ts.Target, sql, nil, 0, 0, nil) if err != nil || results == nil { @@ -949,8 +949,25 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp replIOThreadHealth = row["Slave_IO_Running"].ToString() replSQLThreadHealth = row["Slave_SQL_Running"].ToString() replLastError = row["Last_Error"].ToString() - if ts.Stats != nil { - replLag = int64(ts.Stats.ReplicationLagSeconds) + // We cannot check the tablet's tabletenv config from here so + // we only use the tablet's stat -- which is managed by the + // ReplicationTracker -- if we can tell that it's enabled, + // meaning that it has a non-zero value. If it's actually + // enabled AND zero (rather than the zeroval), then mysqld + // should also return 0 so in this case the value is correct + // and equivalent either way. The only reason that we would + // want to use the ReplicationTracker based value, when we + // can, is because the polling method allows us to get the + // estimated lag value when replication is not running (based + // on how long we've seen that it's not been running). + if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the ReplicationTracker + replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds) + } else { // Use the value from mysqld + if row["Seconds_Behind_Master"].IsNull() { + replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS + } else { + replLag = row["Seconds_Behind_Master"].ToString() + } } } replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError) @@ -963,7 +980,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp ts.Tablet.Hostname, fmt.Sprintf("%s:%d", replSourceHost, replSourcePort), replicationHealth, - fmt.Sprintf("%d", replLag), + replLag, throttlerStatus, )) } @@ -1478,11 +1495,14 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument") } +// getTabletThrottlerStatus uses HTTP to get the throttler status +// on a tablet. It uses HTTP because the CheckThrottler RPC is a +// tmclient RPC and you cannot use tmclient outside of a tablet. func getTabletThrottlerStatus(tabletHostPort string) (string, error) { client := http.Client{ Timeout: 100 * time.Millisecond, } - resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort)) + resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort)) if err != nil { return "", err } diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 1670d64b6fc..c4d231cf651 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -42,11 +42,6 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/discovery" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/buffer" @@ -55,6 +50,12 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vtgate/vschemaacl" "vitess.io/vitess/go/vt/vtgate/vtgateservice" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func TestExecutorResultsExceeded(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/repltracker/poller.go b/go/vt/vttablet/tabletserver/repltracker/poller.go index ace01dffb2d..6fc964bef57 100644 --- a/go/vt/vttablet/tabletserver/repltracker/poller.go +++ b/go/vt/vttablet/tabletserver/repltracker/poller.go @@ -21,10 +21,10 @@ import ( "time" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/mysqlctl" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var replicationLagSeconds = stats.NewGauge("replicationLagSec", "replication lag in seconds") diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker.go b/go/vt/vttablet/tabletserver/repltracker/repltracker.go index 6f504b2a445..c98005851d1 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker.go @@ -23,10 +23,11 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletserver/heartbeat" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var (