Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: starvation fix and consolidation of logic. #15398

Merged
merged 19 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/stats/counter_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
countersMu sync.RWMutex
)

// GetOrNewCounter returns a Counter with given name; the functiona either creates the counter
// GetOrNewCounter returns a Counter with given name; the function either creates the counter
// if it does not exist, or returns a pre-existing one. The function is thread safe.
func GetOrNewCounter(name string, help string) *Counter {
// first, attempt read lock only
Expand Down
24 changes: 19 additions & 5 deletions go/timer/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// For example, we can create a RateLimiter of 1second. Then, we can ask it, over time, to run many
// tasks. It will only ever run a single task in any 1 second time frame. The rest are ignored.
type RateLimiter struct {
tickerValue int64
tickerValue atomic.Int64
lastDoValue int64

mu sync.Mutex
Expand All @@ -37,7 +37,8 @@ type RateLimiter struct {

// NewRateLimiter creates a new limiter with given duration. It is immediately ready to run tasks.
func NewRateLimiter(d time.Duration) *RateLimiter {
r := &RateLimiter{tickerValue: 1}
r := &RateLimiter{}
r.lastDoValue = math.MinInt32 // Far enough to make a difference, but not too far to overflow.
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
go func() {
Expand All @@ -48,7 +49,7 @@ func NewRateLimiter(d time.Duration) *RateLimiter {
case <-ctx.Done():
return
case <-ticker.C:
atomic.StoreInt64(&r.tickerValue, r.tickerValue+1)
r.tickerValue.Add(1)
}
}
}()
Expand All @@ -61,16 +62,29 @@ func (r *RateLimiter) Do(f func() error) (err error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.lastDoValue >= atomic.LoadInt64(&r.tickerValue) {
if r.lastDoValue >= r.tickerValue.Load() {
return nil // rate limited. Skipped.
}
if f != nil {
err = f()
}
r.lastDoValue = atomic.LoadInt64(&r.tickerValue)
r.lastDoValue = r.tickerValue.Load()
return err
}

// DoEmpty is a convenience method to invoke Do() with no function.
func (r *RateLimiter) DoEmpty() {
_ = r.Do(nil)
}

// Diff returns the logical clock diff between the ticker and the last Do() call.
func (r *RateLimiter) Diff() int64 {
r.mu.Lock()
defer r.mu.Unlock()

return r.tickerValue.Load() - r.lastDoValue
}

// Stop terminates rate limiter's operation and will not allow any more Do() executions.
func (r *RateLimiter) Stop() {
r.cancel()
Expand Down
15 changes: 15 additions & 0 deletions go/timer/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,18 @@ func TestRateLimiterStop(t *testing.T) {
}
assert.Equal(t, valSnapshot, val)
}

func TestRateLimiterDiff(t *testing.T) {
d := 2 * time.Second
r := NewRateLimiter(d)
require.NotNil(t, r)
defer r.Stop()

// This assumes the last couple lines of code run faster than 2 seconds, which should be the case.
// But if you see flakiness due to slow runners, we can revisit the logic.
assert.Equal(t, int64(1), r.Diff())
time.Sleep(d + time.Second)
assert.Greater(t, r.Diff(), int64(1))
r.DoEmpty()
assert.LessOrEqual(t, r.Diff(), int64(1))
}
25 changes: 12 additions & 13 deletions go/vt/vttablet/tabletserver/throttle/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,19 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp
}

checkResult = check.checkAppMetricResult(ctx, appName, storeType, storeName, metricResultFunc, flags)
check.throttler.lastCheckTimeNano.Store(time.Now().UnixNano())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed from check.,go. Delegated to, and consolidated in checkStore() in throttler.go.


go func(statusCode int) {
stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)

if statusCode != http.StatusOK {
stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)
}

check.throttler.markRecentApp(appName, remoteAddr)
}(checkResult.StatusCode)
if !throttlerapp.VitessName.Equals(appName) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No metrics for "vitess" app.

go func(statusCode int) {
stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)

if statusCode != http.StatusOK {
stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)
}

check.throttler.markRecentApp(appName, remoteAddr)
}(checkResult.StatusCode)
}
return checkResult
}

Expand Down
114 changes: 83 additions & 31 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,20 @@ import (
)

const (
leaderCheckInterval = 5 * time.Second
mysqlCollectInterval = 250 * time.Millisecond
mysqlDormantCollectInterval = 5 * time.Second
mysqlRefreshInterval = 10 * time.Second
mysqlAggregateInterval = 125 * time.Millisecond
throttledAppsSnapshotInterval = 5 * time.Second
leaderCheckInterval = 5 * time.Second
mysqlCollectInterval = 250 * time.Millisecond // PRIMARY polls replicas
mysqlDormantCollectInterval = 5 * time.Second // PRIMARY polls replicas when dormant (no recent checks)
mysqlRefreshInterval = 10 * time.Second // Refreshing tablet inventory
mysqlAggregateInterval = 125 * time.Millisecond
throttledAppsSnapshotInterval = 5 * time.Second
recentCheckRateLimiterInterval = 1 * time.Second // Ticker assisting in determining when the throttler was last checked

aggregatedMetricsExpiration = 5 * time.Second
recentAppsExpiration = time.Hour * 24

nonDeprioritizedAppMapExpiration = time.Second

dormantPeriod = time.Minute
dormantPeriod = time.Minute // How long since last check to be considered dormant
DefaultAppThrottleDuration = time.Hour
DefaultThrottleRatio = 1.0

Expand Down Expand Up @@ -159,6 +160,7 @@ type Throttler struct {
mysqlRefreshInterval time.Duration
mysqlAggregateInterval time.Duration
throttledAppsSnapshotInterval time.Duration
dormantPeriod time.Duration

configSettings *config.ConfigurationSettings
env tabletenv.Env
Expand All @@ -169,11 +171,8 @@ type Throttler struct {
heartbeatWriter heartbeat.HeartbeatWriter
overrideTmClient tmclient.TabletManagerClient

// recentCheckTickerValue is an ever increasing number, incrementing once per second.
recentCheckTickerValue atomic.Int64
// recentCheckValue is set to match or exceed recentCheckTickerValue whenever a "check" was made (other than by the throttler itself).
// when recentCheckValue < recentCheckTickerValue that means there hasn't been a recent check.
recentCheckValue atomic.Int64
recentCheckRateLimiter *timer.RateLimiter
recentCheckDormantDiff int64

throttleTabletTypesMap map[topodatapb.TabletType]bool

Expand All @@ -194,8 +193,6 @@ type Throttler struct {
recentApps *cache.Cache
metricsHealth *cache.Cache

lastCheckTimeNano atomic.Int64

initMutex sync.Mutex
enableMutex sync.Mutex
cancelOpenContext context.CancelFunc
Expand Down Expand Up @@ -263,6 +260,8 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv
throttler.mysqlRefreshInterval = mysqlRefreshInterval
throttler.mysqlAggregateInterval = mysqlAggregateInterval
throttler.throttledAppsSnapshotInterval = throttledAppsSnapshotInterval
throttler.dormantPeriod = dormantPeriod
throttler.recentCheckDormantDiff = int64(throttler.dormantPeriod / recentCheckRateLimiterInterval)

throttler.StoreMetricsThreshold(defaultThrottleLagThreshold.Seconds()) //default
throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric {
Expand Down Expand Up @@ -574,10 +573,38 @@ func (throttler *Throttler) Close() {
// requestHeartbeats sends a heartbeat lease request to the heartbeat writer.
// This action is recorded in stats.
func (throttler *Throttler) requestHeartbeats() {
if !throttler.isLeader.Load() {
return
}
go throttler.heartbeatWriter.RequestHeartbeats()
go stats.GetOrNewCounter("ThrottlerHeartbeatRequests", "heartbeat requests").Add(1)
}

// stimulatePrimaryThrottler sends a check request to the primary tablet in the shard, to stimulate
// it to request for heartbeats.
func (throttler *Throttler) stimulatePrimaryThrottler(ctx context.Context, tmClient tmclient.TabletManagerClient) error {
tabletAliases, err := throttler.ts.FindAllTabletAliasesInShard(ctx, throttler.keyspace, throttler.shard)
if err != nil {
return err
}
for _, tabletAlias := range tabletAliases {
tablet, err := throttler.ts.GetTablet(ctx, tabletAlias)
if err != nil {
return err
}
if tablet.Type != topodatapb.TabletType_PRIMARY {
continue
}
req := &tabletmanagerdatapb.CheckThrottlerRequest{AppName: throttlerapp.ThrottlerStimulatorName.String()}
_, err = tmClient.CheckThrottler(ctx, tablet.Tablet, req)
if err != nil {
log.Errorf("stimulatePrimaryThrottler: %+v", err)
}
return err
}
return nil
}

func (throttler *Throttler) generateSelfMySQLThrottleMetricFunc(ctx context.Context, probe *mysql.Probe) func() *mysql.MySQLThrottleMetric {
f := func() *mysql.MySQLThrottleMetric {
return throttler.readSelfThrottleMetric(ctx, probe)
Expand Down Expand Up @@ -642,10 +669,11 @@ func (throttler *Throttler) ThrottledApps() (result []base.AppThrottle) {
return result
}

// isDormant returns true when the last check was more than dormantPeriod ago
// isDormant returns true when the last check was more than dormantPeriod ago.
// Instead of measuring actual time, we use the fact recentCheckRateLimiter ticks every second, and take
// a logical diff, counting the number of ticks since the last check. This is a good enough approximation.
func (throttler *Throttler) isDormant() bool {
lastCheckTime := time.Unix(0, throttler.lastCheckTimeNano.Load())
return time.Since(lastCheckTime) > dormantPeriod
return throttler.recentCheckRateLimiter.Diff() > throttler.recentCheckDormantDiff
}

// Operate is the main entry point for the throttler operation and logic. It will
Expand All @@ -663,11 +691,14 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
mysqlRefreshTicker := addTicker(throttler.mysqlRefreshInterval)
mysqlAggregateTicker := addTicker(throttler.mysqlAggregateInterval)
throttledAppsTicker := addTicker(throttler.throttledAppsSnapshotInterval)
recentCheckTicker := addTicker(time.Second)
primaryStimulatorRateLimiter := timer.NewRateLimiter(throttler.dormantPeriod)
throttler.recentCheckRateLimiter = timer.NewRateLimiter(recentCheckRateLimiterInterval)

wg.Add(1)
go func() {
defer func() {
throttler.recentCheckRateLimiter.Stop()
primaryStimulatorRateLimiter.Stop()
throttler.aggregatedMetrics.Flush()
throttler.recentApps.Flush()
throttler.nonLowPriorityAppRequestsThrottled.Flush()
Expand Down Expand Up @@ -727,6 +758,25 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
if !throttler.isDormant() {
throttler.collectMySQLMetrics(ctx, tmClient)
}
//
if throttler.recentCheckRateLimiter.Diff() <= 1 { // recently checked
if !throttler.isLeader.Load() {
// This is a replica, and has just recently been checked.
// We want to proactively "stimulate" the primary throttler to renew the heartbeat lease.
// The intent is to "wake up" an on-demand heartbeat lease. We don't need to poke the
// primary for every single time this replica was checked, so we rate limit. The idea is that
// once heartbeats update, more checks will be successful, this replica will be "recently checked"
// more than not, and the primary throttler will pick that up, extending the on-demand lease
// even further.
// Another outcome is that the primary will go out of "dormant" mode, and start collecting
// replica metrics more frequently.
primaryStimulatorRateLimiter.Do(
func() error {
return throttler.stimulatePrimaryThrottler(ctx, tmClient)
})
}
}

}
case <-mysqlDormantCollectTicker.C:
if throttler.IsOpen() {
Expand Down Expand Up @@ -756,9 +806,6 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
}
case throttlerConfig := <-throttler.throttlerConfigChan:
throttler.applyThrottlerConfig(ctx, throttlerConfig)
case <-recentCheckTicker.C:
// Increment recentCheckTickerValue by one.
throttler.recentCheckTickerValue.Add(1)
}
}
}()
Expand Down Expand Up @@ -1174,20 +1221,25 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor
// continuous and do not generate a substantial load.
return okMetricCheckResult
}
if !flags.SkipRequestHeartbeats && !throttlerapp.VitessName.Equals(appName) {

checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags)

shouldRequestHeartbeats := !flags.SkipRequestHeartbeats
if throttlerapp.VitessName.Equals(appName) {
// Override: "vitess" app never requests heartbeats.
shouldRequestHeartbeats = false
}
if throttlerapp.ThrottlerStimulatorName.Equals(appName) {
// Ovreride: "throttler-stimulator" app always requests heartbeats.
shouldRequestHeartbeats = true
}

if shouldRequestHeartbeats {
throttler.requestHeartbeats()
throttler.recentCheckRateLimiter.DoEmpty()
// This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other.
// We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back
// to the PRIMARY so that it knows it must renew the heartbeat lease.
throttler.recentCheckValue.Store(1 + throttler.recentCheckTickerValue.Load())
}
checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags)

if throttler.recentCheckValue.Load() >= throttler.recentCheckTickerValue.Load() {
// This indicates someone, who is not "vitess" ie not internal to the throttling logic, did a _recent_ `check`.
// This could be online-ddl, or vreplication or whoever else.
// If this tablet is a REPLICA or RDONLY, we want to advertise to the PRIMARY that someone did a recent check,
// so that the PRIMARY knows it must renew the heartbeat lease.
checkResult.RecentlyChecked = true
go stats.GetOrNewCounter("ThrottlerRecentlyChecked", "recently checked").Add(1)
}
Expand Down
Loading
Loading