From e9b3e7fe5e291e69c792a10c72ae35f81f45466e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 26 Feb 2024 17:04:47 +0200 Subject: [PATCH 01/19] Tablet throttler: replica checks stimulate PRIMARY to request heartbeats Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttler.go | 50 ++++++++++++++++++- .../tabletserver/throttle/throttlerapp/app.go | 5 +- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 8f6db91936d..02ee5f1ab6d 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -578,6 +578,31 @@ func (throttler *Throttler) requestHeartbeats() { go stats.GetOrNewCounter("ThrottlerHeartbeatRequests", "heartbeat requests").Add(1) } +// stimulatePrimaryThrottler sends a check request to the primary tablet in the shard, to stimulate +// it to request for heartbeats. +func (throttler *Throttler) stimulatePrimaryThrottler(ctx context.Context, tmClient tmclient.TabletManagerClient) error { + tabletAliases, err := throttler.ts.FindAllTabletAliasesInShard(ctx, throttler.keyspace, throttler.shard) + if err != nil { + return err + } + for _, tabletAlias := range tabletAliases { + tablet, err := throttler.ts.GetTablet(ctx, tabletAlias) + if err != nil { + return err + } + if tablet.Type != topodatapb.TabletType_PRIMARY { + continue + } + req := &tabletmanagerdatapb.CheckThrottlerRequest{AppName: throttlerapp.ThrottlerStimulatorName.String()} + _, err = tmClient.CheckThrottler(ctx, tablet.Tablet, req) + if err != nil { + log.Errorf("stimulatePrimaryThrottler: %+v", err) + } + return err + } + return nil +} + func (throttler *Throttler) generateSelfMySQLThrottleMetricFunc(ctx context.Context, probe *mysql.Probe) func() *mysql.MySQLThrottleMetric { f := func() *mysql.MySQLThrottleMetric { return throttler.readSelfThrottleMetric(ctx, probe) @@ -664,10 +689,12 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { mysqlAggregateTicker := addTicker(throttler.mysqlAggregateInterval) throttledAppsTicker := addTicker(throttler.throttledAppsSnapshotInterval) recentCheckTicker := addTicker(time.Second) + primaryStimulatorRateLimiter := timer.NewRateLimiter(time.Minute) wg.Add(1) go func() { defer func() { + primaryStimulatorRateLimiter.Stop() throttler.aggregatedMetrics.Flush() throttler.recentApps.Flush() throttler.nonLowPriorityAppRequestsThrottled.Flush() @@ -727,6 +754,18 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { if !throttler.isDormant() { throttler.collectMySQLMetrics(ctx, tmClient) } + // + if throttler.recentCheckValue.Load() >= throttler.recentCheckTickerValue.Load() { + if !throttler.isLeader.Load() { + // This is a replica, and has just recently been checked. + // We want to proactively "stimulate" the primary throttler to renew the heartbeat lease. + primaryStimulatorRateLimiter.Do( + func() error { + return throttler.stimulatePrimaryThrottler(ctx, tmClient) + }) + } + } + } case <-mysqlDormantCollectTicker.C: if throttler.IsOpen() { @@ -1174,7 +1213,16 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor // continuous and do not generate a substantial load. return okMetricCheckResult } - if !flags.SkipRequestHeartbeats && !throttlerapp.VitessName.Equals(appName) { + shouldRequestHeartbeats := !flags.SkipRequestHeartbeats + if throttlerapp.VitessName.Equals(appName) { + // Override: "vitess" app never requests heartbeats. + shouldRequestHeartbeats = false + } + if throttlerapp.ThrottlerStimulatorName.Equals(appName) { + // Ovreride: throttler-stimulator app always requests heartbeats. + shouldRequestHeartbeats = true + } + if shouldRequestHeartbeats { throttler.requestHeartbeats() // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index 4f1f5857837..7594df6c1b2 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -42,8 +42,9 @@ func (n Name) Concatenate(other Name) Name { const ( // DefaultName is the app name used by vitess when app doesn't indicate its name - DefaultName Name = "default" - VitessName Name = "vitess" + DefaultName Name = "default" + VitessName Name = "vitess" + ThrottlerStimulatorName Name = "throttler-stimulator" TableGCName Name = "tablegc" OnlineDDLName Name = "online-ddl" From 3a92d148e23c94076ab4ed2d5daa60d9025b3088 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 29 Feb 2024 07:44:15 +0200 Subject: [PATCH 02/19] Use atomic.Int64 Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/timer/rate_limiter.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/go/timer/rate_limiter.go b/go/timer/rate_limiter.go index 25bc2b32f61..3f4f6360006 100644 --- a/go/timer/rate_limiter.go +++ b/go/timer/rate_limiter.go @@ -28,7 +28,7 @@ import ( // For example, we can create a RateLimiter of 1second. Then, we can ask it, over time, to run many // tasks. It will only ever run a single task in any 1 second time frame. The rest are ignored. type RateLimiter struct { - tickerValue int64 + tickerValue atomic.Int64 lastDoValue int64 mu sync.Mutex @@ -37,7 +37,8 @@ type RateLimiter struct { // NewRateLimiter creates a new limiter with given duration. It is immediately ready to run tasks. func NewRateLimiter(d time.Duration) *RateLimiter { - r := &RateLimiter{tickerValue: 1} + r := &RateLimiter{} + r.tickerValue.Add(1) // start at 1, so that the first Do() call is not rate limited. ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel go func() { @@ -48,7 +49,7 @@ func NewRateLimiter(d time.Duration) *RateLimiter { case <-ctx.Done(): return case <-ticker.C: - atomic.StoreInt64(&r.tickerValue, r.tickerValue+1) + r.tickerValue.Add(1) } } }() @@ -61,13 +62,13 @@ func (r *RateLimiter) Do(f func() error) (err error) { r.mu.Lock() defer r.mu.Unlock() - if r.lastDoValue >= atomic.LoadInt64(&r.tickerValue) { + if r.lastDoValue >= r.tickerValue.Load() { return nil // rate limited. Skipped. } if f != nil { err = f() } - r.lastDoValue = atomic.LoadInt64(&r.tickerValue) + r.lastDoValue = r.tickerValue.Load() return err } From 034c5cf64b0abb9a803feacdf8e851f89ba66455 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 29 Feb 2024 07:44:30 +0200 Subject: [PATCH 03/19] typo Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/stats/counter_map.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/stats/counter_map.go b/go/stats/counter_map.go index 5ee7d19181e..a9af4495c60 100644 --- a/go/stats/counter_map.go +++ b/go/stats/counter_map.go @@ -25,7 +25,7 @@ var ( countersMu sync.RWMutex ) -// GetOrNewCounter returns a Counter with given name; the functiona either creates the counter +// GetOrNewCounter returns a Counter with given name; the function either creates the counter // if it does not exist, or returns a pre-existing one. The function is thread safe. func GetOrNewCounter(name string, help string) *Counter { // first, attempt read lock only From 3893657ae7826b17dc2c0f0cd41687eb1309694a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 29 Feb 2024 08:15:48 +0200 Subject: [PATCH 04/19] Mark() and Diff() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/timer/rate_limiter.go | 13 +++++++++++++ go/timer/rate_limiter_test.go | 15 +++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/go/timer/rate_limiter.go b/go/timer/rate_limiter.go index 3f4f6360006..31cbc338ce5 100644 --- a/go/timer/rate_limiter.go +++ b/go/timer/rate_limiter.go @@ -72,6 +72,19 @@ func (r *RateLimiter) Do(f func() error) (err error) { return err } +// Mark is a convenience method to invoke Do() with no function. +func (r *RateLimiter) Mark() { + _ = r.Do(nil) +} + +// Diff returns the logical clock diff between the ticker and the last Do() call. +func (r *RateLimiter) Diff() int64 { + r.mu.Lock() + defer r.mu.Unlock() + + return r.tickerValue.Load() - r.lastDoValue +} + // Stop terminates rate limiter's operation and will not allow any more Do() executions. func (r *RateLimiter) Stop() { r.cancel() diff --git a/go/timer/rate_limiter_test.go b/go/timer/rate_limiter_test.go index 84122233996..c8a1216662c 100644 --- a/go/timer/rate_limiter_test.go +++ b/go/timer/rate_limiter_test.go @@ -75,3 +75,18 @@ func TestRateLimiterStop(t *testing.T) { } assert.Equal(t, valSnapshot, val) } + +func TestRateLimiterDiff(t *testing.T) { + d := 2 * time.Second + r := NewRateLimiter(d) + require.NotNil(t, r) + defer r.Stop() + + // This assumes the last couple lines of code run faster than 2 seconds, which should be the case. + // But if you see flakiness due to slow runners, we can revisit the logic. + assert.Equal(t, int64(1), r.Diff()) + time.Sleep(d + time.Second) + assert.Greater(t, r.Diff(), int64(1)) + r.Mark() + assert.LessOrEqual(t, r.Diff(), int64(1)) +} From b78ab5bd23ad47ee064ec7efff25f51be08a0912 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 29 Feb 2024 09:25:52 +0200 Subject: [PATCH 05/19] do not mark last cehck time for 'vitess' app Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/check.go | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go index 9dfbade8af6..c288d80d177 100644 --- a/go/vt/vttablet/tabletserver/throttle/check.go +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -148,20 +148,21 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp } checkResult = check.checkAppMetricResult(ctx, appName, storeType, storeName, metricResultFunc, flags) - check.throttler.lastCheckTimeNano.Store(time.Now().UnixNano()) + if !throttlerapp.VitessName.Equals(appName) { + check.throttler.lastCheckTimeNano.Store(time.Now().UnixNano()) - go func(statusCode int) { - stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1) - stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) + go func(statusCode int) { + stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1) + stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) - if statusCode != http.StatusOK { - stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1) - stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) - } - - check.throttler.markRecentApp(appName, remoteAddr) - }(checkResult.StatusCode) + if statusCode != http.StatusOK { + stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1) + stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) + } + check.throttler.markRecentApp(appName, remoteAddr) + }(checkResult.StatusCode) + } return checkResult } From 430cc58e51981e1ec620443aaaef02575aa37d71 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 29 Feb 2024 12:06:20 +0200 Subject: [PATCH 06/19] init lastDoValue to minimal val rather than increasing tickerValue Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/timer/rate_limiter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/timer/rate_limiter.go b/go/timer/rate_limiter.go index 31cbc338ce5..da44829636b 100644 --- a/go/timer/rate_limiter.go +++ b/go/timer/rate_limiter.go @@ -38,7 +38,7 @@ type RateLimiter struct { // NewRateLimiter creates a new limiter with given duration. It is immediately ready to run tasks. func NewRateLimiter(d time.Duration) *RateLimiter { r := &RateLimiter{} - r.tickerValue.Add(1) // start at 1, so that the first Do() call is not rate limited. + r.lastDoValue = math.MinInt32 // Far enough to make a difference, but not too far to overflow. ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel go func() { From cd102bbf798d5d1b56afcc08c3400c0b3ac730fd Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 29 Feb 2024 12:11:20 +0200 Subject: [PATCH 07/19] DoEmpty Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/timer/rate_limiter.go | 4 ++-- go/timer/rate_limiter_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/timer/rate_limiter.go b/go/timer/rate_limiter.go index da44829636b..d42a4d7e14c 100644 --- a/go/timer/rate_limiter.go +++ b/go/timer/rate_limiter.go @@ -72,8 +72,8 @@ func (r *RateLimiter) Do(f func() error) (err error) { return err } -// Mark is a convenience method to invoke Do() with no function. -func (r *RateLimiter) Mark() { +// DoEmpty is a convenience method to invoke Do() with no function. +func (r *RateLimiter) DoEmpty() { _ = r.Do(nil) } diff --git a/go/timer/rate_limiter_test.go b/go/timer/rate_limiter_test.go index c8a1216662c..bdda9b56eb2 100644 --- a/go/timer/rate_limiter_test.go +++ b/go/timer/rate_limiter_test.go @@ -87,6 +87,6 @@ func TestRateLimiterDiff(t *testing.T) { assert.Equal(t, int64(1), r.Diff()) time.Sleep(d + time.Second) assert.Greater(t, r.Diff(), int64(1)) - r.Mark() + r.DoEmpty() assert.LessOrEqual(t, r.Diff(), int64(1)) } From 612ad1e03ec80ae188910a2a2f8e35e55d3020f7 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 29 Feb 2024 12:12:34 +0200 Subject: [PATCH 08/19] Throttler: consolidate lastCheckTimeNano, recentCheckValue; remove recentCheckTickerValue. Use recentCheckRateLimiter Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/check.go | 2 - .../tabletserver/throttle/throttler.go | 49 ++++----- .../tabletserver/throttle/throttler_test.go | 103 +++++++++++++++--- 3 files changed, 111 insertions(+), 43 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go index c288d80d177..06f42890b38 100644 --- a/go/vt/vttablet/tabletserver/throttle/check.go +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -149,8 +149,6 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp checkResult = check.checkAppMetricResult(ctx, appName, storeType, storeName, metricResultFunc, flags) if !throttlerapp.VitessName.Equals(appName) { - check.throttler.lastCheckTimeNano.Store(time.Now().UnixNano()) - go func(statusCode int) { stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1) stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 02ee5f1ab6d..1a084c1bc7e 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -159,6 +159,7 @@ type Throttler struct { mysqlRefreshInterval time.Duration mysqlAggregateInterval time.Duration throttledAppsSnapshotInterval time.Duration + dormantPeriod time.Duration configSettings *config.ConfigurationSettings env tabletenv.Env @@ -169,11 +170,7 @@ type Throttler struct { heartbeatWriter heartbeat.HeartbeatWriter overrideTmClient tmclient.TabletManagerClient - // recentCheckTickerValue is an ever increasing number, incrementing once per second. - recentCheckTickerValue atomic.Int64 - // recentCheckValue is set to match or exceed recentCheckTickerValue whenever a "check" was made (other than by the throttler itself). - // when recentCheckValue < recentCheckTickerValue that means there hasn't been a recent check. - recentCheckValue atomic.Int64 + recentCheckRateLimiter *timer.RateLimiter throttleTabletTypesMap map[topodatapb.TabletType]bool @@ -194,8 +191,6 @@ type Throttler struct { recentApps *cache.Cache metricsHealth *cache.Cache - lastCheckTimeNano atomic.Int64 - initMutex sync.Mutex enableMutex sync.Mutex cancelOpenContext context.CancelFunc @@ -263,6 +258,7 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv throttler.mysqlRefreshInterval = mysqlRefreshInterval throttler.mysqlAggregateInterval = mysqlAggregateInterval throttler.throttledAppsSnapshotInterval = throttledAppsSnapshotInterval + throttler.dormantPeriod = dormantPeriod throttler.StoreMetricsThreshold(defaultThrottleLagThreshold.Seconds()) //default throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric { @@ -574,6 +570,9 @@ func (throttler *Throttler) Close() { // requestHeartbeats sends a heartbeat lease request to the heartbeat writer. // This action is recorded in stats. func (throttler *Throttler) requestHeartbeats() { + if !throttler.isLeader.Load() { + return + } go throttler.heartbeatWriter.RequestHeartbeats() go stats.GetOrNewCounter("ThrottlerHeartbeatRequests", "heartbeat requests").Add(1) } @@ -669,8 +668,7 @@ func (throttler *Throttler) ThrottledApps() (result []base.AppThrottle) { // isDormant returns true when the last check was more than dormantPeriod ago func (throttler *Throttler) isDormant() bool { - lastCheckTime := time.Unix(0, throttler.lastCheckTimeNano.Load()) - return time.Since(lastCheckTime) > dormantPeriod + return throttler.recentCheckRateLimiter.Diff() > int64(throttler.dormantPeriod/time.Second) } // Operate is the main entry point for the throttler operation and logic. It will @@ -688,12 +686,13 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { mysqlRefreshTicker := addTicker(throttler.mysqlRefreshInterval) mysqlAggregateTicker := addTicker(throttler.mysqlAggregateInterval) throttledAppsTicker := addTicker(throttler.throttledAppsSnapshotInterval) - recentCheckTicker := addTicker(time.Second) - primaryStimulatorRateLimiter := timer.NewRateLimiter(time.Minute) + primaryStimulatorRateLimiter := timer.NewRateLimiter(throttler.dormantPeriod) + throttler.recentCheckRateLimiter = timer.NewRateLimiter(time.Second) wg.Add(1) go func() { defer func() { + throttler.recentCheckRateLimiter.Stop() primaryStimulatorRateLimiter.Stop() throttler.aggregatedMetrics.Flush() throttler.recentApps.Flush() @@ -755,10 +754,17 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { throttler.collectMySQLMetrics(ctx, tmClient) } // - if throttler.recentCheckValue.Load() >= throttler.recentCheckTickerValue.Load() { + if throttler.recentCheckRateLimiter.Diff() <= 1 { if !throttler.isLeader.Load() { // This is a replica, and has just recently been checked. // We want to proactively "stimulate" the primary throttler to renew the heartbeat lease. + // The intent is to "wake up" an on-demand heartbeat lease. We don't need to poke the + // primary for every single time this replica was checked, so we rate limit. The idea is that + // once heartbeats update, more checks will be successful, this replica will be "recently checked" + // more than not, and the primary throttler will pick that up, extending the on-demand lease + // even further. + // Another outcome is that the primary will go out of "dormant" mode, and start collecting + // replica metrics more frequently. primaryStimulatorRateLimiter.Do( func() error { return throttler.stimulatePrimaryThrottler(ctx, tmClient) @@ -795,9 +801,6 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { } case throttlerConfig := <-throttler.throttlerConfigChan: throttler.applyThrottlerConfig(ctx, throttlerConfig) - case <-recentCheckTicker.C: - // Increment recentCheckTickerValue by one. - throttler.recentCheckTickerValue.Add(1) } } }() @@ -1213,29 +1216,25 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor // continuous and do not generate a substantial load. return okMetricCheckResult } + + checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + shouldRequestHeartbeats := !flags.SkipRequestHeartbeats if throttlerapp.VitessName.Equals(appName) { // Override: "vitess" app never requests heartbeats. shouldRequestHeartbeats = false } if throttlerapp.ThrottlerStimulatorName.Equals(appName) { - // Ovreride: throttler-stimulator app always requests heartbeats. + // Ovreride: "throttler-stimulator" app always requests heartbeats. shouldRequestHeartbeats = true } + if shouldRequestHeartbeats { throttler.requestHeartbeats() + throttler.recentCheckRateLimiter.DoEmpty() // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back // to the PRIMARY so that it knows it must renew the heartbeat lease. - throttler.recentCheckValue.Store(1 + throttler.recentCheckTickerValue.Load()) - } - checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) - - if throttler.recentCheckValue.Load() >= throttler.recentCheckTickerValue.Load() { - // This indicates someone, who is not "vitess" ie not internal to the throttling logic, did a _recent_ `check`. - // This could be online-ddl, or vreplication or whoever else. - // If this tablet is a REPLICA or RDONLY, we want to advertise to the PRIMARY that someone did a recent check, - // so that the PRIMARY knows it must renew the heartbeat lease. checkResult.RecentlyChecked = true go stats.GetOrNewCounter("ThrottlerRecentlyChecked", "recently checked").Add(1) } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 25de8ca96f5..72a0b918773 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -34,6 +34,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tmclient" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -56,7 +57,7 @@ func (c *fakeTMClient) CheckThrottler(ctx context.Context, tablet *topodatapb.Ta StatusCode: http.StatusOK, Value: 0, Threshold: 1, - RecentlyChecked: true, + RecentlyChecked: false, } return resp, nil } @@ -80,8 +81,8 @@ func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.Table func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) { aliases := []*topodatapb.TabletAlias{ - {Cell: "zone1", Uid: 100}, - {Cell: "zone2", Uid: 101}, + {Cell: "fakezone1", Uid: 100}, + {Cell: "fakezone2", Uid: 101}, } return aliases, nil } @@ -92,9 +93,15 @@ func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace str } type FakeHeartbeatWriter struct { + requests atomic.Int64 } -func (w FakeHeartbeatWriter) RequestHeartbeats() { +func (w *FakeHeartbeatWriter) RequestHeartbeats() { + w.requests.Add(1) +} + +func (w *FakeHeartbeatWriter) Requests() int64 { + return w.requests.Load() } func newTestThrottler() *Throttler { @@ -113,7 +120,7 @@ func newTestThrottler() *Throttler { throttler := &Throttler{ mysqlClusterProbesChan: make(chan *mysql.ClusterProbes), mysqlClusterThresholds: cache.New(cache.NoExpiration, 0), - heartbeatWriter: FakeHeartbeatWriter{}, + heartbeatWriter: &FakeHeartbeatWriter{}, ts: &FakeTopoServer{}, mysqlInventory: mysql.NewInventory(), pool: connpool.NewPool(env, "ThrottlerPool", tabletenv.ConnPoolConfig{}), @@ -137,13 +144,14 @@ func newTestThrottler() *Throttler { throttler.initThrottleTabletTypes() throttler.check = NewThrottlerCheck(throttler) - // High contention & racy itnervals: + // High contention & racy intervals: throttler.leaderCheckInterval = 10 * time.Millisecond throttler.mysqlCollectInterval = 10 * time.Millisecond throttler.mysqlDormantCollectInterval = 10 * time.Millisecond throttler.mysqlRefreshInterval = 10 * time.Millisecond throttler.mysqlAggregateInterval = 10 * time.Millisecond throttler.throttledAppsSnapshotInterval = 10 * time.Millisecond + throttler.dormantPeriod = 5 * time.Second throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric { return &mysql.MySQLThrottleMetric{ @@ -160,7 +168,7 @@ func newTestThrottler() *Throttler { func TestIsAppThrottled(t *testing.T) { throttler := Throttler{ throttledApps: cache.New(cache.NoExpiration, 0), - heartbeatWriter: FakeHeartbeatWriter{}, + heartbeatWriter: &FakeHeartbeatWriter{}, } assert.False(t, throttler.IsAppThrottled("app1")) assert.False(t, throttler.IsAppThrottled("app2")) @@ -190,7 +198,7 @@ func TestIsAppExempted(t *testing.T) { throttler := Throttler{ throttledApps: cache.New(cache.NoExpiration, 0), - heartbeatWriter: FakeHeartbeatWriter{}, + heartbeatWriter: &FakeHeartbeatWriter{}, } assert.False(t, throttler.IsAppExempted("app1")) assert.False(t, throttler.IsAppExempted("app2")) @@ -315,10 +323,10 @@ func TestRefreshMySQLInventory(t *testing.T) { }) } -// runThrottler opens and enables the throttler, therby making it run the Operate() function, for a given amount of time. +// runThrottler opens and enables the throttler, thereby making it run the Operate() function, for a given amount of time. // Optionally, running a given function halfway while the throttler is still open and running. -func runThrottler(t *testing.T, throttler *Throttler, timeout time.Duration, f func(*testing.T)) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func runThrottler(t *testing.T, ctx context.Context, throttler *Throttler, timeout time.Duration, f func(*testing.T, context.Context)) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() assert.False(t, throttler.IsOpen()) @@ -339,9 +347,17 @@ func runThrottler(t *testing.T, throttler *Throttler, timeout time.Duration, f f wg2 := throttler.Enable() assert.Nil(t, wg2) + sleepTime := 3 * time.Second + if timeout/2 < sleepTime { + sleepTime = timeout / 2 + } if f != nil { - time.Sleep(timeout / 2) - f(t) + select { + case <-ctx.Done(): + return + case <-time.After(sleepTime): + f(t, ctx) + } } <-ctx.Done() @@ -355,7 +371,7 @@ func runThrottler(t *testing.T, throttler *Throttler, timeout time.Duration, f f // This is relevant to `go test -race` func TestRace(t *testing.T) { throttler := newTestThrottler() - runThrottler(t, throttler, 5*time.Second, nil) + runThrottler(t, context.Background(), throttler, 5*time.Second, nil) } // TestProbes enables a throttler for a few seocnds, and afterwards expects to find probes and metrics. @@ -365,7 +381,9 @@ func TestProbesWhileOperating(t *testing.T) { t.Run("aggregated", func(t *testing.T) { assert.Equal(t, 0, throttler.aggregatedMetrics.ItemCount()) }) - runThrottler(t, throttler, 5*time.Second, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + runThrottler(t, ctx, throttler, 5*time.Second, func(t *testing.T, ctx context.Context) { t.Run("aggregated", func(t *testing.T) { assert.Equal(t, 2, throttler.aggregatedMetrics.ItemCount()) // flushed upon Disable() aggr := throttler.aggregatedMetricsSnapshot() @@ -382,6 +400,7 @@ func TestProbesWhileOperating(t *testing.T) { assert.Failf(t, "unknown clusterName", "%v", clusterName) } } + cancel() }) }) } @@ -389,7 +408,7 @@ func TestProbesWhileOperating(t *testing.T) { // TestProbesPostDisable runs the throttler for some time, and then investigates the internal throttler maps and values. func TestProbesPostDisable(t *testing.T) { throttler := newTestThrottler() - runThrottler(t, throttler, 2*time.Second, nil) + runThrottler(t, context.Background(), throttler, 2*time.Second, nil) probes := throttler.mysqlInventory.ClustersProbes assert.NotEmpty(t, probes) @@ -431,3 +450,55 @@ func TestProbesPostDisable(t *testing.T) { assert.Empty(t, aggr) }) } + +func TestDormant(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + throttler := newTestThrottler() + + heartbeatWriter, ok := throttler.heartbeatWriter.(*FakeHeartbeatWriter) + assert.True(t, ok) + assert.Zero(t, heartbeatWriter.Requests()) // once upon Enable() + + runThrottler(t, ctx, throttler, time.Minute, func(t *testing.T, ctx context.Context) { + assert.True(t, throttler.isDormant()) + assert.EqualValues(t, 1, heartbeatWriter.Requests()) // once upon Enable() + flags := &CheckFlags{} + throttler.CheckByType(ctx, throttlerapp.VitessName.String(), "", flags, ThrottleCheckSelf) + go func() { + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.True(t, throttler.isDormant()) + assert.EqualValues(t, 1, heartbeatWriter.Requests()) // "vitess" name does not cause heartbeat requests + } + throttler.CheckByType(ctx, throttlerapp.ThrottlerStimulatorName.String(), "", flags, ThrottleCheckSelf) + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.False(t, throttler.isDormant()) + assert.Greater(t, heartbeatWriter.Requests(), int64(1)) + } + throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf) + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.False(t, throttler.isDormant()) + assert.Greater(t, heartbeatWriter.Requests(), int64(2)) + } + + // Dormant period + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(throttler.dormantPeriod): + assert.True(t, throttler.isDormant()) + } + cancel() + }() + }) +} From 1ec9c30cea973725e1855e295dacd2316e63a510 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 29 Feb 2024 12:38:15 +0200 Subject: [PATCH 09/19] comment Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/throttler_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 72a0b918773..d731c20c8d9 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -383,7 +383,7 @@ func TestProbesWhileOperating(t *testing.T) { }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - runThrottler(t, ctx, throttler, 5*time.Second, func(t *testing.T, ctx context.Context) { + runThrottler(t, ctx, throttler, time.Minute, func(t *testing.T, ctx context.Context) { t.Run("aggregated", func(t *testing.T) { assert.Equal(t, 2, throttler.aggregatedMetrics.ItemCount()) // flushed upon Disable() aggr := throttler.aggregatedMetricsSnapshot() @@ -400,7 +400,7 @@ func TestProbesWhileOperating(t *testing.T) { assert.Failf(t, "unknown clusterName", "%v", clusterName) } } - cancel() + cancel() // end test early }) }) } @@ -498,7 +498,7 @@ func TestDormant(t *testing.T) { case <-time.After(throttler.dormantPeriod): assert.True(t, throttler.isDormant()) } - cancel() + cancel() // end test early }() }) } From 65f2e52eeed30fbf1a45346680af54e2e0ab85a4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 3 Mar 2024 13:47:18 +0200 Subject: [PATCH 10/19] pre-calculate logical diff value Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttler.go | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 1a084c1bc7e..1e56bd9cc37 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -82,19 +82,20 @@ import ( ) const ( - leaderCheckInterval = 5 * time.Second - mysqlCollectInterval = 250 * time.Millisecond - mysqlDormantCollectInterval = 5 * time.Second - mysqlRefreshInterval = 10 * time.Second - mysqlAggregateInterval = 125 * time.Millisecond - throttledAppsSnapshotInterval = 5 * time.Second + leaderCheckInterval = 5 * time.Second + mysqlCollectInterval = 250 * time.Millisecond // PRIMARY polls replicas + mysqlDormantCollectInterval = 5 * time.Second // PRIMARY polls replicas when dormant (no recent checks) + mysqlRefreshInterval = 10 * time.Second // Refreshing tablet inventory + mysqlAggregateInterval = 125 * time.Millisecond + throttledAppsSnapshotInterval = 5 * time.Second + recentCheckRateLimiterInterval = 1 * time.Second // Ticker assisting in determining dormancy aggregatedMetricsExpiration = 5 * time.Second recentAppsExpiration = time.Hour * 24 nonDeprioritizedAppMapExpiration = time.Second - dormantPeriod = time.Minute + dormantPeriod = time.Minute // How long since last check to be considered dormant DefaultAppThrottleDuration = time.Hour DefaultThrottleRatio = 1.0 @@ -171,6 +172,7 @@ type Throttler struct { overrideTmClient tmclient.TabletManagerClient recentCheckRateLimiter *timer.RateLimiter + recentCheckDormantDiff int64 throttleTabletTypesMap map[topodatapb.TabletType]bool @@ -259,6 +261,7 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv throttler.mysqlAggregateInterval = mysqlAggregateInterval throttler.throttledAppsSnapshotInterval = throttledAppsSnapshotInterval throttler.dormantPeriod = dormantPeriod + throttler.recentCheckDormantDiff = int64(throttler.dormantPeriod / recentCheckRateLimiterInterval) throttler.StoreMetricsThreshold(defaultThrottleLagThreshold.Seconds()) //default throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric { @@ -666,9 +669,11 @@ func (throttler *Throttler) ThrottledApps() (result []base.AppThrottle) { return result } -// isDormant returns true when the last check was more than dormantPeriod ago +// isDormant returns true when the last check was more than dormantPeriod ago. +// Instead of measuring actual time, we use the fact recentCheckRateLimiter ticks every second, and take +// a logical diff, counting the number of ticks since the last check. This is a good enough approximation. func (throttler *Throttler) isDormant() bool { - return throttler.recentCheckRateLimiter.Diff() > int64(throttler.dormantPeriod/time.Second) + return throttler.recentCheckRateLimiter.Diff() > throttler.recentCheckDormantDiff } // Operate is the main entry point for the throttler operation and logic. It will @@ -687,7 +692,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { mysqlAggregateTicker := addTicker(throttler.mysqlAggregateInterval) throttledAppsTicker := addTicker(throttler.throttledAppsSnapshotInterval) primaryStimulatorRateLimiter := timer.NewRateLimiter(throttler.dormantPeriod) - throttler.recentCheckRateLimiter = timer.NewRateLimiter(time.Second) + throttler.recentCheckRateLimiter = timer.NewRateLimiter(recentCheckRateLimiterInterval) wg.Add(1) go func() { @@ -754,7 +759,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { throttler.collectMySQLMetrics(ctx, tmClient) } // - if throttler.recentCheckRateLimiter.Diff() <= 1 { + if throttler.recentCheckRateLimiter.Diff() <= 1 { // recently checked if !throttler.isLeader.Load() { // This is a replica, and has just recently been checked. // We want to proactively "stimulate" the primary throttler to renew the heartbeat lease. From be90fc0ba7d455635bec64d0d706ce5c36bb4e39 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 3 Mar 2024 13:47:57 +0200 Subject: [PATCH 11/19] test replica throttler's stimulation of PRIMARY throttler Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttler_test.go | 75 ++++++++++++++++++- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index d731c20c8d9..796fb296702 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -47,6 +47,7 @@ const ( type fakeTMClient struct { tmclient.TabletManagerClient + appNames []string } func (c *fakeTMClient) Close() { @@ -59,6 +60,7 @@ func (c *fakeTMClient) CheckThrottler(ctx context.Context, tablet *topodatapb.Ta Threshold: 1, RecentlyChecked: false, } + c.appNames = append(c.appNames, request.AppName) return resp, nil } @@ -66,6 +68,10 @@ type FakeTopoServer struct { } func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) { + tabletType := topodatapb.TabletType_PRIMARY + if alias.Uid != 100 { + tabletType = topodatapb.TabletType_REPLICA + } tablet := &topo.TabletInfo{ Tablet: &topodatapb.Tablet{ Alias: alias, @@ -73,7 +79,7 @@ func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.Table MysqlHostname: "127.0.0.1", MysqlPort: 3306, PortMap: map[string]int32{"vt": 5000}, - Type: topodatapb.TabletType_REPLICA, + Type: tabletType, }, } return tablet, nil @@ -83,6 +89,7 @@ func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keysp aliases := []*topodatapb.TabletAlias{ {Cell: "fakezone1", Uid: 100}, {Cell: "fakezone2", Uid: 101}, + {Cell: "fakezone3", Uid: 103}, } return aliases, nil } @@ -152,6 +159,7 @@ func newTestThrottler() *Throttler { throttler.mysqlAggregateInterval = 10 * time.Millisecond throttler.throttledAppsSnapshotInterval = 10 * time.Millisecond throttler.dormantPeriod = 5 * time.Second + throttler.recentCheckDormantDiff = int64(throttler.dormantPeriod / recentCheckRateLimiterInterval) throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric { return &mysql.MySQLThrottleMetric{ @@ -374,10 +382,14 @@ func TestRace(t *testing.T) { runThrottler(t, context.Background(), throttler, 5*time.Second, nil) } -// TestProbes enables a throttler for a few seocnds, and afterwards expects to find probes and metrics. +// TestProbes enables a throttler for a few seconds, and afterwards expects to find probes and metrics. func TestProbesWhileOperating(t *testing.T) { throttler := newTestThrottler() + tmClient, ok := throttler.overrideTmClient.(*fakeTMClient) + require.True(t, ok) + assert.Empty(t, tmClient.appNames) + t.Run("aggregated", func(t *testing.T) { assert.Equal(t, 0, throttler.aggregatedMetrics.ItemCount()) }) @@ -400,6 +412,20 @@ func TestProbesWhileOperating(t *testing.T) { assert.Failf(t, "unknown clusterName", "%v", clusterName) } } + assert.NotEmpty(t, tmClient.appNames) + // The throttler here emulates a PRIMARY tablet, and therefore should probe the replicas using + // the "vitess" app name. + uniqueNames := map[string]int{} + for _, appName := range tmClient.appNames { + uniqueNames[appName]++ + } + // PRIMARY throttler probes replicas with empty app name, which is then + // interpreted as "vitess" name. + _, ok := uniqueNames[""] + assert.Truef(t, ok, "%+v", uniqueNames) + // And that's the only app we expect to see. + assert.Equalf(t, 1, len(uniqueNames), "%+v", uniqueNames) + cancel() // end test early }) }) @@ -502,3 +528,48 @@ func TestDormant(t *testing.T) { }() }) } + +func TestReplica(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + throttler := newTestThrottler() + throttler.dormantPeriod = time.Minute + throttler.tabletTypeFunc = func() topodatapb.TabletType { return topodatapb.TabletType_REPLICA } + + tmClient, ok := throttler.overrideTmClient.(*fakeTMClient) + require.True(t, ok) + assert.Empty(t, tmClient.appNames) + + runThrottler(t, ctx, throttler, time.Minute, func(t *testing.T, ctx context.Context) { + assert.Empty(t, tmClient.appNames) + flags := &CheckFlags{} + throttler.CheckByType(ctx, throttlerapp.VitessName.String(), "", flags, ThrottleCheckSelf) + go func() { + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.Empty(t, tmClient.appNames) + } + throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf) + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.NotEmpty(t, tmClient.appNames) + assert.Containsf(t, tmClient.appNames, throttlerapp.ThrottlerStimulatorName.String(), "%+v", tmClient.appNames) + assert.Equalf(t, 1, len(tmClient.appNames), "%+v", tmClient.appNames) + } + throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf) + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + // Due to stimulation rate limiting, we shouldn't see a 2nd CheckThrottler request. + assert.Equalf(t, 1, len(tmClient.appNames), "%+v", tmClient.appNames) + } + cancel() // end test early + }() + }) +} From c3cae9c4560582b1129e8ce8ccc93243cde53be4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 3 Mar 2024 14:49:59 +0200 Subject: [PATCH 12/19] better comment Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/throttler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 1e56bd9cc37..1204fd1b4a0 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -88,7 +88,7 @@ const ( mysqlRefreshInterval = 10 * time.Second // Refreshing tablet inventory mysqlAggregateInterval = 125 * time.Millisecond throttledAppsSnapshotInterval = 5 * time.Second - recentCheckRateLimiterInterval = 1 * time.Second // Ticker assisting in determining dormancy + recentCheckRateLimiterInterval = 1 * time.Second // Ticker assisting in determining when the throttler was last checked aggregatedMetricsExpiration = 5 * time.Second recentAppsExpiration = time.Hour * 24 From 5f69eac06c67a9e3e9ea64a8dc11bacc1b75a611 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 6 Mar 2024 11:33:39 +0200 Subject: [PATCH 13/19] mroe debug output in endtoend test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../throttler_topo/throttler_test.go | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 9824f28ae2b..46929411011 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -89,6 +89,7 @@ var ( throttledAppsAPIPath = "throttler/throttled-apps" checkAPIPath = "throttler/check" checkSelfAPIPath = "throttler/check-self" + statusAPIPath = "throttler/status" getResponseBody = func(resp *http.Response) string { body, _ := io.ReadAll(resp.Body) return string(body) @@ -180,6 +181,16 @@ func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=%s", tablet.HTTPPort, checkSelfAPIPath, testAppName)) } +func throttleStatus(t *testing.T, tablet *cluster.Vttablet) string { + resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, statusAPIPath)) + require.NoError(t, err) + defer resp.Body.Close() + + b, err := io.ReadAll(resp.Body) + require.NoError(t, err) + return string(b) +} + func warmUpHeartbeat(t *testing.T) (respStatus int) { // because we run with -heartbeat_on_demand_duration=5s, the heartbeat is "cold" right now. // Let's warm it up. @@ -314,17 +325,32 @@ func TestInitialThrottler(t *testing.T) { }) t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) { time.Sleep(1 * time.Second) + cluster.ValidateReplicationIsHealthy(t, replicaTablet) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) + if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) { + rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false) + assert.NoError(t, err) + t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString()) + t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) + t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) + } }) + t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) { time.Sleep(1 * time.Second) + cluster.ValidateReplicationIsHealthy(t, replicaTablet) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) + if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) { + rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false) + assert.NoError(t, err) + t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString()) + t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) + t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) + } }) t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops From b58bac6dabeba9fdc2b09a7924357c06dfdaf655 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 6 Mar 2024 11:34:44 +0200 Subject: [PATCH 14/19] Always collect self metrcis, even if dormant. Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/check.go | 4 ++-- .../vttablet/tabletserver/throttle/throttler.go | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go index 06f42890b38..85952a496d1 100644 --- a/go/vt/vttablet/tabletserver/throttle/check.go +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -148,6 +148,7 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp } checkResult = check.checkAppMetricResult(ctx, appName, storeType, storeName, metricResultFunc, flags) + check.throttler.markRecentApp(appName, remoteAddr) if !throttlerapp.VitessName.Equals(appName) { go func(statusCode int) { stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1) @@ -157,8 +158,6 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1) stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) } - - check.throttler.markRecentApp(appName, remoteAddr) }(checkResult.StatusCode) } return checkResult @@ -226,6 +225,7 @@ func (check *ThrottlerCheck) SelfChecks(ctx context.Context) { for metricName, metricResult := range check.AggregatedMetrics(ctx) { metricName := metricName metricResult := metricResult + go check.localCheck(ctx, metricName) go check.reportAggregated(metricName, metricResult) } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 1204fd1b4a0..3a92774c022 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -755,8 +755,14 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { case <-mysqlCollectTicker.C: if throttler.IsOpen() { // frequent + // Always collect self metrics: + throttler.collectMySQLMetrics(ctx, tmClient, func(clusterName string) bool { + return clusterName == selfStoreName + }) if !throttler.isDormant() { - throttler.collectMySQLMetrics(ctx, tmClient) + throttler.collectMySQLMetrics(ctx, tmClient, func(clusterName string) bool { + return clusterName != selfStoreName + }) } // if throttler.recentCheckRateLimiter.Diff() <= 1 { // recently checked @@ -782,7 +788,9 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { if throttler.IsOpen() { // infrequent if throttler.isDormant() { - throttler.collectMySQLMetrics(ctx, tmClient) + throttler.collectMySQLMetrics(ctx, tmClient, func(clusterName string) bool { + return clusterName != selfStoreName + }) } } case metric := <-throttler.mysqlThrottleMetricChan: @@ -846,9 +854,12 @@ func (throttler *Throttler) generateTabletProbeFunction(ctx context.Context, clu } } -func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tmclient.TabletManagerClient) error { +func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tmclient.TabletManagerClient, includeCluster func(clusterName string) bool) error { // synchronously, get lists of probes for clusterName, probes := range throttler.mysqlInventory.ClustersProbes { + if !includeCluster(clusterName) { + continue + } clusterName := clusterName // probes is known not to change. It can be *replaced*, but not changed. // so it's safe to iterate it From 87f95fefbf80c3367e24deb87cc44f64a74048d5 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:06:14 +0200 Subject: [PATCH 15/19] remove flakiness Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletmanager/throttler_topo/throttler_test.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 46929411011..fbe95500ff1 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -399,6 +399,10 @@ func TestLag(t *testing.T) { err := clusterInstance.VtctldClientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) assert.NoError(t, err) }) + t.Run("requesting heartbeats while replication stopped", func(t *testing.T) { + _ = warmUpHeartbeat(t) + }) + t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { time.Sleep(2 * throttler.DefaultThreshold) @@ -412,7 +416,14 @@ func TestLag(t *testing.T) { require.NoError(t, err) defer resp.Body.Close() // self (on primary) is unaffected by replication lag - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) + if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) { + rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false) + assert.NoError(t, err) + t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString()) + t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) + t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) + } + }) t.Run("replica self-check should show error", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) From 9a1200e0b765ca8486e25c5de3386b19c9e55a7a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:13:37 +0200 Subject: [PATCH 16/19] fix flakiness: renew heartbeats after waiting for lag Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletmanager/throttler_topo/throttler_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index fbe95500ff1..df63d5a84a1 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -399,13 +399,15 @@ func TestLag(t *testing.T) { err := clusterInstance.VtctldClientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias) assert.NoError(t, err) }) + t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { + time.Sleep(2 * throttler.DefaultThreshold) + }) t.Run("requesting heartbeats while replication stopped", func(t *testing.T) { + // By now on-demand heartbeats have stopped. _ = warmUpHeartbeat(t) }) - t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { - time.Sleep(2 * throttler.DefaultThreshold) - + t.Run("expecting throttler push back", func(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() @@ -417,13 +419,9 @@ func TestLag(t *testing.T) { defer resp.Body.Close() // self (on primary) is unaffected by replication lag if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) { - rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false) - assert.NoError(t, err) - t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString()) t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) } - }) t.Run("replica self-check should show error", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) From 10bba832aca3764103c15461df2864f8e74b9ee9 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 6 Mar 2024 16:51:40 +0200 Subject: [PATCH 17/19] adapt test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/timer/rate_limiter_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/timer/rate_limiter_test.go b/go/timer/rate_limiter_test.go index bdda9b56eb2..83690b98a22 100644 --- a/go/timer/rate_limiter_test.go +++ b/go/timer/rate_limiter_test.go @@ -17,6 +17,7 @@ limitations under the License. package timer import ( + "math" "testing" "time" @@ -84,9 +85,9 @@ func TestRateLimiterDiff(t *testing.T) { // This assumes the last couple lines of code run faster than 2 seconds, which should be the case. // But if you see flakiness due to slow runners, we can revisit the logic. - assert.Equal(t, int64(1), r.Diff()) + assert.Greater(t, r.Diff(), int64(math.MaxInt32)) time.Sleep(d + time.Second) - assert.Greater(t, r.Diff(), int64(1)) + assert.Greater(t, r.Diff(), int64(math.MaxInt32)) r.DoEmpty() assert.LessOrEqual(t, r.Diff(), int64(1)) } From feedb1a6a586f939387a670e78e72e3aa514493d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 7 Mar 2024 08:18:49 +0200 Subject: [PATCH 18/19] fix to racy test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttler_test.go | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 796fb296702..98f94439a3d 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "sync" "sync/atomic" "testing" "time" @@ -48,6 +49,8 @@ const ( type fakeTMClient struct { tmclient.TabletManagerClient appNames []string + + mu sync.Mutex } func (c *fakeTMClient) Close() { @@ -60,10 +63,18 @@ func (c *fakeTMClient) CheckThrottler(ctx context.Context, tablet *topodatapb.Ta Threshold: 1, RecentlyChecked: false, } + c.mu.Lock() + defer c.mu.Unlock() c.appNames = append(c.appNames, request.AppName) return resp, nil } +func (c *fakeTMClient) AppNames() []string { + c.mu.Lock() + defer c.mu.Unlock() + return c.appNames +} + type FakeTopoServer struct { } @@ -388,7 +399,7 @@ func TestProbesWhileOperating(t *testing.T) { tmClient, ok := throttler.overrideTmClient.(*fakeTMClient) require.True(t, ok) - assert.Empty(t, tmClient.appNames) + assert.Empty(t, tmClient.AppNames()) t.Run("aggregated", func(t *testing.T) { assert.Equal(t, 0, throttler.aggregatedMetrics.ItemCount()) @@ -412,11 +423,11 @@ func TestProbesWhileOperating(t *testing.T) { assert.Failf(t, "unknown clusterName", "%v", clusterName) } } - assert.NotEmpty(t, tmClient.appNames) + assert.NotEmpty(t, tmClient.AppNames()) // The throttler here emulates a PRIMARY tablet, and therefore should probe the replicas using // the "vitess" app name. uniqueNames := map[string]int{} - for _, appName := range tmClient.appNames { + for _, appName := range tmClient.AppNames() { uniqueNames[appName]++ } // PRIMARY throttler probes replicas with empty app name, which is then @@ -539,10 +550,10 @@ func TestReplica(t *testing.T) { tmClient, ok := throttler.overrideTmClient.(*fakeTMClient) require.True(t, ok) - assert.Empty(t, tmClient.appNames) + assert.Empty(t, tmClient.AppNames()) runThrottler(t, ctx, throttler, time.Minute, func(t *testing.T, ctx context.Context) { - assert.Empty(t, tmClient.appNames) + assert.Empty(t, tmClient.AppNames()) flags := &CheckFlags{} throttler.CheckByType(ctx, throttlerapp.VitessName.String(), "", flags, ThrottleCheckSelf) go func() { @@ -550,16 +561,17 @@ func TestReplica(t *testing.T) { case <-ctx.Done(): require.FailNow(t, "context expired before testing completed") case <-time.After(time.Second): - assert.Empty(t, tmClient.appNames) + assert.Empty(t, tmClient.AppNames()) } throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf) select { case <-ctx.Done(): require.FailNow(t, "context expired before testing completed") case <-time.After(time.Second): - assert.NotEmpty(t, tmClient.appNames) - assert.Containsf(t, tmClient.appNames, throttlerapp.ThrottlerStimulatorName.String(), "%+v", tmClient.appNames) - assert.Equalf(t, 1, len(tmClient.appNames), "%+v", tmClient.appNames) + appNames := tmClient.AppNames() + assert.NotEmpty(t, appNames) + assert.Containsf(t, appNames, throttlerapp.ThrottlerStimulatorName.String(), "%+v", appNames) + assert.Equalf(t, 1, len(appNames), "%+v", appNames) } throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf) select { @@ -567,7 +579,8 @@ func TestReplica(t *testing.T) { require.FailNow(t, "context expired before testing completed") case <-time.After(time.Second): // Due to stimulation rate limiting, we shouldn't see a 2nd CheckThrottler request. - assert.Equalf(t, 1, len(tmClient.appNames), "%+v", tmClient.appNames) + appNames := tmClient.AppNames() + assert.Equalf(t, 1, len(appNames), "%+v", appNames) } cancel() // end test early }() From 1f4f73fa67f79ba8d4e8b8934e30d78f456bf754 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 10 Mar 2024 17:46:24 +0200 Subject: [PATCH 19/19] set timeout for primary stimulation connection Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/throttle/throttler.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 3a92774c022..922ece16145 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -583,6 +583,10 @@ func (throttler *Throttler) requestHeartbeats() { // stimulatePrimaryThrottler sends a check request to the primary tablet in the shard, to stimulate // it to request for heartbeats. func (throttler *Throttler) stimulatePrimaryThrottler(ctx context.Context, tmClient tmclient.TabletManagerClient) error { + // Some reasonable timeout, to ensure we release connections even if they're hanging (otherwise grpc-go keeps polling those connections forever) + ctx, cancel := context.WithTimeout(ctx, throttler.dormantPeriod) + defer cancel() + tabletAliases, err := throttler.ts.FindAllTabletAliasesInShard(ctx, throttler.keyspace, throttler.shard) if err != nil { return err