Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

srvtopo: Setup metrics in init() function #15304

Merged
merged 3 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -84,6 +85,8 @@
tabletTypesToWait []topodatapb.TabletType

env *vtenv.Environment

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
Expand Down Expand Up @@ -131,6 +134,7 @@
if err != nil {
log.Fatalf("unable to initialize env: %v", err)
}
srvTopoCounts = stats.NewCountersWithSingleLabel("ResilientSrvTopoServer", "Resilient srvtopo server operations", "type")

Check warning on line 137 in go/cmd/vtcombo/cli/main.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtcombo/cli/main.go#L137

Added line #L137 was not covered by tests
}

func startMysqld(uid uint32) (mysqld *mysqlctl.Mysqld, cnf *mysqlctl.Mycnf, err error) {
Expand Down Expand Up @@ -234,7 +238,7 @@
// to be the "internal" protocol that InitTabletMap registers.
cmd.Flags().Set("tablet_manager_protocol", "internal")
cmd.Flags().Set("tablet_protocol", "internal")
uid, err := vtcombo.InitTabletMap(env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, startMysql)
uid, err := vtcombo.InitTabletMap(env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, startMysql, srvTopoCounts)

Check warning on line 241 in go/cmd/vtcombo/cli/main.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtcombo/cli/main.go#L241

Added line #L241 was not covered by tests
if err != nil {
// ensure we start mysql in the event we fail here
if startMysql {
Expand All @@ -260,7 +264,7 @@
}

wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr, srvTopoCounts)

Check warning on line 267 in go/cmd/vtcombo/cli/main.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtcombo/cli/main.go#L267

Added line #L267 was not covered by tests
if err != nil {
return err
}
Expand Down Expand Up @@ -297,7 +301,7 @@
}

// vtgate configuration and init
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, srvTopoCounts)

Check warning on line 304 in go/cmd/vtcombo/cli/main.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtcombo/cli/main.go#L304

Added line #L304 was not covered by tests

tabletTypes := make([]topodatapb.TabletType, 0, 1)
if len(tabletTypesToWait) != 0 {
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtexplain/cli/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"os"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand Down Expand Up @@ -186,7 +187,8 @@
}
ctx := context.Background()
ts := memorytopo.NewServer(ctx, vtexplain.Cell)
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts)
srvTopoCounts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts, srvTopoCounts)

Check warning on line 191 in go/cmd/vtexplain/cli/vtexplain.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtexplain/cli/vtexplain.go#L190-L191

Added lines #L190 - L191 were not covered by tests
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion go/cmd/vtgate/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -73,8 +74,14 @@
PreRunE: servenv.CobraPreRunE,
RunE: run,
}

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
srvTopoCounts = stats.NewCountersWithSingleLabel("ResilientSrvTopoServer", "Resilient srvtopo server operations", "type")

Check warning on line 82 in go/cmd/vtgate/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtgate/cli/cli.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}

// CheckCellFlags will check validation of cell and cells_to_watch flag
// it will help to avoid strange behaviors when vtgate runs but actually does not work
func CheckCellFlags(ctx context.Context, serv srvtopo.Server, cell string, cellsToWatch string) error {
Expand Down Expand Up @@ -139,7 +146,7 @@
ts := topo.Open()
defer ts.Close()

resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, srvTopoCounts)

Check warning on line 149 in go/cmd/vtgate/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtgate/cli/cli.go#L149

Added line #L149 was not covered by tests

tabletTypes := make([]topodatapb.TabletType, 0, 1)
for _, tt := range tabletTypesToWait {
Expand Down
13 changes: 10 additions & 3 deletions go/cmd/vttablet/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -102,8 +103,14 @@
PreRunE: servenv.CobraPreRunE,
RunE: run,
}

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
srvTopoCounts = stats.NewCountersWithSingleLabel("TabletSrvTopo", "Resilient srvtopo server operations", "type")

Check warning on line 111 in go/cmd/vttablet/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vttablet/cli/cli.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}

func run(cmd *cobra.Command, args []string) error {
servenv.Init()

Expand All @@ -129,7 +136,7 @@
}

ts := topo.Open()
qsc, err := createTabletServer(context.Background(), env, config, ts, tabletAlias)
qsc, err := createTabletServer(context.Background(), env, config, ts, tabletAlias, srvTopoCounts)

Check warning on line 139 in go/cmd/vttablet/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vttablet/cli/cli.go#L139

Added line #L139 was not covered by tests
if err != nil {
ts.Close()
return err
Expand Down Expand Up @@ -249,7 +256,7 @@
return nil
}

func createTabletServer(ctx context.Context, env *vtenv.Environment, config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias) (*tabletserver.TabletServer, error) {
func createTabletServer(ctx context.Context, env *vtenv.Environment, config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, srvTopoCounts *stats.CountersWithSingleLabel) (*tabletserver.TabletServer, error) {

Check warning on line 259 in go/cmd/vttablet/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vttablet/cli/cli.go#L259

Added line #L259 was not covered by tests
if tableACLConfig != "" {
// To override default simpleacl, other ACL plugins must set themselves to be default ACL factory
tableacl.Register("simpleacl", &simpleacl.Factory{})
Expand All @@ -258,7 +265,7 @@
}

// creates and registers the query service
qsc := tabletserver.NewTabletServer(ctx, env, "", config, ts, tabletAlias)
qsc := tabletserver.NewTabletServer(ctx, env, "", config, ts, tabletAlias, srvTopoCounts)

Check warning on line 268 in go/cmd/vttablet/cli/cli.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vttablet/cli/cli.go#L268

Added line #L268 was not covered by tests
servenv.OnRun(func() {
qsc.Register()
addStatusParts(qsc)
Expand Down
4 changes: 3 additions & 1 deletion go/vt/srvtopo/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/topo/memorytopo"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -59,7 +60,8 @@ func TestFindAllTargets(t *testing.T) {
srvTopoCacheTTL = 1 * time.Second

}()
rs := NewResilientServer(ctx, ts, "TestFindAllKeyspaceShards")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// No keyspace / shards.
ks, err := FindAllTargets(ctx, rs, "cell1", []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
Expand Down
10 changes: 1 addition & 9 deletions go/vt/srvtopo/resilient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ const (
// - return the last known value of the data if there is an error
type ResilientServer struct {
topoServer *topo.Server
counts *stats.CountersWithSingleLabel

*SrvKeyspaceWatcher
*SrvVSchemaWatcher
Expand All @@ -79,20 +78,13 @@ type ResilientServer struct {

// NewResilientServer creates a new ResilientServer
// based on the provided topo.Server.
func NewResilientServer(ctx context.Context, base *topo.Server, counterPrefix string) *ResilientServer {
func NewResilientServer(ctx context.Context, base *topo.Server, counts *stats.CountersWithSingleLabel) *ResilientServer {
dbussink marked this conversation as resolved.
Show resolved Hide resolved
if srvTopoCacheRefresh > srvTopoCacheTTL {
log.Fatalf("srv_topo_cache_refresh must be less than or equal to srv_topo_cache_ttl")
}

metric := ""
if counterPrefix != "" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

counterPrefix was never empty, so we never created counters with no name anyway.

metric = counterPrefix + "Counts"
}
counts := stats.NewCountersWithSingleLabel(metric, "Resilient srvtopo server operations", "type")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dynamic naming here is what is core to the problem and if the same name is (accidentally) used twice, it would panic. Instead we now always pass in the counter and build it in init() functions in the appropriate places beforehand.


return &ResilientServer{
topoServer: base,
counts: counts,
SrvKeyspaceWatcher: NewSrvKeyspaceWatcher(ctx, base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
SrvVSchemaWatcher: NewSrvVSchemaWatcher(ctx, base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
SrvKeyspaceNamesQuery: NewSrvKeyspaceNamesQuery(base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
Expand Down
31 changes: 19 additions & 12 deletions go/vt/srvtopo/resilient_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/google/safehtml/template"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/key"

"github.com/stretchr/testify/assert"
Expand All @@ -53,7 +54,8 @@ func TestGetSrvKeyspace(t *testing.T) {
srvTopoCacheRefresh = 1 * time.Second
}()

rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspace")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Ask for a not-yet-created keyspace
_, err := rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks")
Expand Down Expand Up @@ -175,7 +177,7 @@ func TestGetSrvKeyspace(t *testing.T) {
// Now simulate a topo service error and see that the last value is
// cached for at least half of the expected ttl.
errorTestStart := time.Now()
errorReqsBefore := rs.counts.Counts()[errorCategory]
errorReqsBefore := counts.Counts()[errorCategory]
forceErr := topo.NewError(topo.Timeout, "test topo error")
factory.SetError(forceErr)

Expand Down Expand Up @@ -271,7 +273,7 @@ func TestGetSrvKeyspace(t *testing.T) {

// Check that the expected number of errors were counted during the
// interval
errorReqs := rs.counts.Counts()[errorCategory]
errorReqs := counts.Counts()[errorCategory]
expectedErrors := int64(time.Since(errorTestStart) / srvTopoCacheRefresh)
if errorReqs-errorReqsBefore > expectedErrors {
t.Errorf("expected <= %v error requests got %d", expectedErrors, errorReqs-errorReqsBefore)
Expand Down Expand Up @@ -370,7 +372,8 @@ func TestSrvKeyspaceCachedError(t *testing.T) {
srvTopoCacheTTL = 1 * time.Second
srvTopoCacheRefresh = 1 * time.Second
}()
rs := NewResilientServer(ctx, ts, "TestSrvKeyspaceCachedErrors")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Ask for an unknown keyspace, should get an error.
_, err := rs.GetSrvKeyspace(ctx, "test_cell", "unknown_ks")
Expand Down Expand Up @@ -401,7 +404,8 @@ func TestGetSrvKeyspaceCreated(t *testing.T) {
defer cancel()
ts := memorytopo.NewServer(ctx, "test_cell")
defer ts.Close()
rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspaceCreated")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Set SrvKeyspace with value.
want := &topodatapb.SrvKeyspace{}
Expand Down Expand Up @@ -435,7 +439,8 @@ func TestWatchSrvVSchema(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := memorytopo.NewServer(ctx, "test_cell")
rs := NewResilientServer(ctx, ts, "TestWatchSrvVSchema")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// mu protects watchValue and watchErr.
mu := sync.Mutex{}
Expand Down Expand Up @@ -529,7 +534,8 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
srvTopoCacheTTL = 1 * time.Second
srvTopoCacheRefresh = 1 * time.Second
}()
rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspaceNames")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Set SrvKeyspace with value
want := &topodatapb.SrvKeyspace{}
Expand Down Expand Up @@ -614,7 +620,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {

// Check that we only checked the topo service 1 or 2 times during the
// period where we got the cached error.
cachedReqs, ok := rs.counts.Counts()[cachedCategory]
cachedReqs, ok := counts.Counts()[cachedCategory]
if !ok || cachedReqs > 2 {
t.Errorf("expected <= 2 cached requests got %v", cachedReqs)
}
Expand All @@ -640,7 +646,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
t.Errorf("GetSrvKeyspaceNames got %v want %v", names, wantNames)
}

errorReqs, ok := rs.counts.Counts()[errorCategory]
errorReqs, ok := counts.Counts()[errorCategory]
if !ok || errorReqs == 0 {
t.Errorf("expected non-zero error requests got %v", errorReqs)
}
Expand Down Expand Up @@ -684,8 +690,8 @@ func TestSrvKeyspaceWatcher(t *testing.T) {
srvTopoCacheTTL = 1 * time.Second
srvTopoCacheRefresh = 1 * time.Second
}()

rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspaceWatcher")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

var wmu sync.Mutex
var wseen []watched
Expand Down Expand Up @@ -811,7 +817,8 @@ func TestSrvKeyspaceListener(t *testing.T) {
srvTopoCacheRefresh = 1 * time.Second
}()

rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspaceListener")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

cancelCtx, cancelFunc := context.WithCancel(context.Background())
var callbackCount atomic.Int32
Expand Down
8 changes: 5 additions & 3 deletions go/vt/srvtopo/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand All @@ -33,10 +34,11 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func initResolver(t *testing.T, ctx context.Context, name string) *Resolver {
func initResolver(t *testing.T, ctx context.Context) *Resolver {
cell := "cell1"
ts := memorytopo.NewServer(ctx, cell)
rs := NewResilientServer(ctx, ts, name)
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Create sharded keyspace and shards.
if err := ts.CreateKeyspace(ctx, "sks", &topodatapb.Keyspace{}); err != nil {
Expand Down Expand Up @@ -97,7 +99,7 @@ func initResolver(t *testing.T, ctx context.Context, name string) *Resolver {
func TestResolveDestinations(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resolver := initResolver(t, ctx, "TestResolveDestinations")
resolver := initResolver(t, ctx)

id1 := &querypb.Value{
Type: sqltypes.VarChar,
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/gorilla/mux"
"github.com/patrickmn/go-cache"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/vtenv"

"vitess.io/vitess/go/sets"
Expand Down Expand Up @@ -2384,7 +2385,8 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
}

ts := memorytopo.NewServer(ctx, vtexplain.Cell)
vte, err := vtexplain.Init(ctx, api.env, ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"})
srvTopoCounts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
vte, err := vtexplain.Init(ctx, api.env, ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"}, srvTopoCounts)
if err != nil {
return nil, fmt.Errorf("error initilaizing vtexplain: %w", err)
}
Expand Down
Loading
Loading