From f4debb9e51ca38e708db1cb2f340312e1114f45e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 28 Feb 2024 12:00:35 +0100 Subject: [PATCH] Limit concurrent creation of healthcheck gRPC connections (#15053) Signed-off-by: Tim Vaillancourt --- changelog/20.0/20.0.0/summary.md | 5 ++++ go/flags/endtoend/vtcombo.txt | 1 + go/flags/endtoend/vtctld.txt | 1 + go/flags/endtoend/vtgate.txt | 1 + go/vt/discovery/healthcheck.go | 10 ++++++- go/vt/discovery/tablet_health_check.go | 37 +++++++++++++++++++++----- go/vt/grpcclient/client.go | 6 +++++ 7 files changed, 54 insertions(+), 7 deletions(-) diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md index cd3b9718503..863de8aefec 100644 --- a/changelog/20.0/20.0.0/summary.md +++ b/changelog/20.0/20.0.0/summary.md @@ -12,6 +12,7 @@ - [Delete with Subquery Support](#delete-subquery) - **[Flag changes](#flag-changes)** - [`pprof-http` default change](#pprof-http-default) + - [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag) - **[Minor Changes](#minor-changes)** - **[New Stats](#new-stats)** - [VTTablet Query Cache Hits and Misses](#vttablet-query-cache-hits-and-misses) @@ -72,6 +73,10 @@ The `--pprof-http` flag, which was introduced in v19 with a default of `true`, h This makes HTTP `pprof` endpoints now an *opt-in* feature, rather than opt-out. To continue enabling these endpoints, explicitly set `--pprof-http` when starting up Vitess components. +#### New `--healthcheck-dial-concurrency` flag + +The new `--healthcheck-dial-concurrency` flag defines the maximum number of healthcheck connections that can open concurrently. This limit is to avoid hitting Go runtime panics on deployments watching enough tablets [to hit the runtime's maximum thread limit of `10000`](https://pkg.go.dev/runtime/debug#SetMaxThreads) due to blocking network syscalls. This flag applies to `vtcombo`, `vtctld` and `vtgate` only and a value less than the runtime max thread limit _(`10000`)_ is recommended. + ## Minor Changes ### New Stats diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 26e145e6930..a508b78dc80 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -167,6 +167,7 @@ Flags: --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. --health_check_interval duration Interval between health checks (default 20s) + --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) --heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the sidecar database's heartbeat table. The result is used to inform the serving state of the vttablet via healthchecks. diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index da60dfeb3b1..e0f552d2b00 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -84,6 +84,7 @@ Flags: --grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs) --grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s) --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) + --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) -h, --help help for vtctld --jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done --keep_logs duration keep logs for this long (using ctime) (zero to keep forever) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index bd9849cfd71..c9431fb43db 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -96,6 +96,7 @@ Flags: --grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s) --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. + --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) -h, --help help for vtgate diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 6b30a794893..f37c9ad1d8b 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -46,6 +46,7 @@ import ( "github.com/google/safehtml/template" "github.com/google/safehtml/template/uncheckedconversions" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" @@ -87,6 +88,9 @@ var ( // refreshKnownTablets tells us whether to process all tablets or only new tablets. refreshKnownTablets = true + // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. + healthCheckDialConcurrency int64 = 1024 + // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -168,6 +172,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") + fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ParseTabletURLTemplateFromFlag() } @@ -287,6 +292,8 @@ type HealthCheckImpl struct { subscribers map[chan *TabletHealth]struct{} // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} + // healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once. + healthCheckDialSem *semaphore.Weighted } // NewHealthCheck creates a new HealthCheck object. @@ -321,6 +328,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, + healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -828,7 +836,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target // TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } - return thc.Connection(), nil + return thc.Connection(hc), nil } // getAliasByCell should only be called while holding hc.mu diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index 24496155e74..fc3ab242210 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -19,6 +19,7 @@ package discovery import ( "context" "fmt" + "net" "strings" "sync" "sync/atomic" @@ -33,12 +34,16 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" ) +// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options. +var withDialerContextOnce sync.Once + // tabletHealthCheck maintains the health status of a tablet. A map of this // structure is maintained in HealthCheck. type tabletHealthCheck struct { @@ -122,8 +127,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { } // stream streams healthcheck responses to callback. -func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error { - conn := thc.Connection() +func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error { + conn := thc.Connection(hc) if conn == nil { // This signals the caller to retry return nil @@ -136,14 +141,34 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.S return err } -func (thc *tabletHealthCheck) Connection() queryservice.QueryService { +func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService { thc.connMu.Lock() defer thc.connMu.Unlock() - return thc.connectionLocked() + return thc.connectionLocked(hc) +} + +func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) { + return func(ctx context.Context, addr string) (net.Conn, error) { + // Limit the number of healthcheck connections opened in parallel to avoid high OS-thread + // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, + // etc). Without this limit it is possible for vtgates watching >10k tablets to hit + // the panic: 'runtime: program exceeds 10000-thread limit'. + if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer hc.healthCheckDialSem.Release(1) + var dialer net.Dialer + return dialer.DialContext(ctx, "tcp", addr) + } } -func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService { +func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService { if thc.Conn == nil { + withDialerContextOnce.Do(func() { + grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { + return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil + }) + }) conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true)) if err != nil { thc.LastError = err @@ -272,7 +297,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { }() // Read stream health responses. - err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error { + err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay // Don't block on send to avoid deadlocks. diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go index b2ef0d4fb28..7524298514e 100644 --- a/go/vt/grpcclient/client.go +++ b/go/vt/grpcclient/client.go @@ -21,6 +21,7 @@ package grpcclient import ( "context" "crypto/tls" + "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -39,6 +40,7 @@ import ( ) var ( + grpcDialOptionsMu sync.Mutex keepaliveTime = 10 * time.Second keepaliveTimeout = 10 * time.Second initialConnWindowSize int @@ -86,6 +88,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error) // RegisterGRPCDialOptions registers an implementation of AuthServer. func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) { + grpcDialOptionsMu.Lock() + defer grpcDialOptionsMu.Unlock() grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc) } @@ -134,12 +138,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ... newopts = append(newopts, opts...) var err error + grpcDialOptionsMu.Lock() for _, grpcDialOptionInitializer := range grpcDialOptions { newopts, err = grpcDialOptionInitializer(newopts) if err != nil { log.Fatalf("There was an error initializing client grpc.DialOption: %v", err) } } + grpcDialOptionsMu.Unlock() newopts = append(newopts, interceptors()...)