Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] SHOW VITESS_REPLICATION_STATUS: Only use replication tracker when it's enabled (#15348) #15362

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
"testing"
"time"

"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/proto/topodata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils"
"vitess.io/vitess/go/vt/proto/topodata"
)

func TestVtgateHealthCheck(t *testing.T) {
Expand All @@ -59,7 +58,7 @@
time.Sleep(2 * time.Second)
verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
conn, err := mysql.Connect(ctx, &vtParams) // VTGate
require.NoError(t, err)
defer conn.Close()

Expand All @@ -68,6 +67,38 @@
expectNumRows := 2
numRows := len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows))

// Disable VTOrc(s) recoveries so that it doesn't immediately repair/restart replication.
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
vtorcutils.DisableGlobalRecoveries(t, vtorcProcess)

Check failure on line 73 in go/test/endtoend/tabletgateway/vtgate_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

undefined: vtorcutils.DisableGlobalRecoveries
}
// Re-enable recoveries afterward as the cluster is re-used.
defer func() {
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
vtorcutils.EnableGlobalRecoveries(t, vtorcProcess)

Check failure on line 78 in go/test/endtoend/tabletgateway/vtgate_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

undefined: vtorcutils.EnableGlobalRecoveries (typecheck)
}
}()
// Stop replication on the non-PRIMARY tablets.
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave")
require.NoError(t, err)
// Restart replication afterward as the cluster is re-used.
defer func() {
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave")
require.NoError(t, err)
}()
time.Sleep(2 * time.Second) // Build up some replication lag
res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false)
require.NoError(t, err)
expectNumRows = 2
numRows = len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status, expected %d, got %d", expectNumRows, numRows))
rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row
lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL"
assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString())
}

func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) {
Expand All @@ -90,6 +121,11 @@
rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly()
err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE)
require.NoError(t, err)
// Change it back to RDONLY afterward as the cluster is re-used.
defer func() {
err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY)
require.NoError(t, err)
}()

// Only returns rows for REPLICA and RDONLY tablets -- so should be 1 of them since we updated 1 to spare
qr = utils.Exec(t, conn, "show vitess_replication_status like '%'")
Expand Down
32 changes: 26 additions & 6 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,15 +930,15 @@
tabletHostPort := ts.GetTabletHostPort()
throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort)
if err != nil {
log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err)
log.Warningf("Could not get throttler status from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err)
}

replSourceHost := ""
replSourcePort := int64(0)
replIOThreadHealth := ""
replSQLThreadHealth := ""
replLastError := ""
replLag := int64(-1)
replLag := "-1" // A string to support NULL as a value
sql := "show slave status"
results, err := e.txConn.tabletGateway.Execute(ctx, ts.Target, sql, nil, 0, 0, nil)
if err != nil || results == nil {
Expand All @@ -949,8 +949,25 @@
replIOThreadHealth = row["Slave_IO_Running"].ToString()
replSQLThreadHealth = row["Slave_SQL_Running"].ToString()
replLastError = row["Last_Error"].ToString()
if ts.Stats != nil {
replLag = int64(ts.Stats.ReplicationLagSeconds)
// We cannot check the tablet's tabletenv config from here so
// we only use the tablet's stat -- which is managed by the
// ReplicationTracker -- if we can tell that it's enabled,
// meaning that it has a non-zero value. If it's actually
// enabled AND zero (rather than the zeroval), then mysqld
// should also return 0 so in this case the value is correct
// and equivalent either way. The only reason that we would
// want to use the ReplicationTracker based value, when we
// can, is because the polling method allows us to get the
// estimated lag value when replication is not running (based
// on how long we've seen that it's not been running).
if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the ReplicationTracker
replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds)

Check warning on line 964 in go/vt/vtgate/executor.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/executor.go#L964

Added line #L964 was not covered by tests
} else { // Use the value from mysqld
if row["Seconds_Behind_Master"].IsNull() {
replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS
} else {
replLag = row["Seconds_Behind_Master"].ToString()

Check warning on line 969 in go/vt/vtgate/executor.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/executor.go#L969

Added line #L969 was not covered by tests
}
}
}
replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError)
Expand All @@ -963,7 +980,7 @@
ts.Tablet.Hostname,
fmt.Sprintf("%s:%d", replSourceHost, replSourcePort),
replicationHealth,
fmt.Sprintf("%d", replLag),
replLag,
throttlerStatus,
))
}
Expand Down Expand Up @@ -1478,11 +1495,14 @@
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument")
}

// getTabletThrottlerStatus uses HTTP to get the throttler status
// on a tablet. It uses HTTP because the CheckThrottler RPC is a
// tmclient RPC and you cannot use tmclient outside of a tablet.
func getTabletThrottlerStatus(tabletHostPort string) (string, error) {
client := http.Client{
Timeout: 100 * time.Millisecond,
}
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort))
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort))
if err != nil {
return "", err
}
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ import (
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/buffer"
Expand All @@ -55,6 +50,12 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vtgate/vschemaacl"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestExecutorResultsExceeded(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"time"

"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/mysqlctl"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var replicationLagSeconds = stats.NewGauge("replicationLagSec", "replication lag in seconds")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/heartbeat"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
Expand Down
Loading