Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: starvation fix and consolidation of logic. #15398

Merged
merged 19 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/stats/counter_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
countersMu sync.RWMutex
)

// GetOrNewCounter returns a Counter with given name; the functiona either creates the counter
// GetOrNewCounter returns a Counter with given name; the function either creates the counter
// if it does not exist, or returns a pre-existing one. The function is thread safe.
func GetOrNewCounter(name string, help string) *Counter {
// first, attempt read lock only
Expand Down
41 changes: 38 additions & 3 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ var (
throttledAppsAPIPath = "throttler/throttled-apps"
checkAPIPath = "throttler/check"
checkSelfAPIPath = "throttler/check-self"
statusAPIPath = "throttler/status"
getResponseBody = func(resp *http.Response) string {
body, _ := io.ReadAll(resp.Body)
return string(body)
Expand Down Expand Up @@ -180,6 +181,16 @@ func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=%s", tablet.HTTPPort, checkSelfAPIPath, testAppName))
}

func throttleStatus(t *testing.T, tablet *cluster.Vttablet) string {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, statusAPIPath))
require.NoError(t, err)
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
return string(b)
}

func warmUpHeartbeat(t *testing.T) (respStatus int) {
// because we run with -heartbeat_on_demand_duration=5s, the heartbeat is "cold" right now.
// Let's warm it up.
Expand Down Expand Up @@ -314,17 +325,32 @@ func TestInitialThrottler(t *testing.T) {
})
t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) {
time.Sleep(1 * time.Second)
cluster.ValidateReplicationIsHealthy(t, replicaTablet)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
})

t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) {
time.Sleep(1 * time.Second)
cluster.ValidateReplicationIsHealthy(t, replicaTablet)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
})
t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) {
time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops
Expand Down Expand Up @@ -375,7 +401,13 @@ func TestLag(t *testing.T) {
})
t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) {
time.Sleep(2 * throttler.DefaultThreshold)
})
t.Run("requesting heartbeats while replication stopped", func(t *testing.T) {
// By now on-demand heartbeats have stopped.
_ = warmUpHeartbeat(t)
})

t.Run("expecting throttler push back", func(t *testing.T) {
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
defer resp.Body.Close()
Expand All @@ -386,7 +418,10 @@ func TestLag(t *testing.T) {
require.NoError(t, err)
defer resp.Body.Close()
// self (on primary) is unaffected by replication lag
assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
})
t.Run("replica self-check should show error", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
Expand Down
24 changes: 19 additions & 5 deletions go/timer/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// For example, we can create a RateLimiter of 1second. Then, we can ask it, over time, to run many
// tasks. It will only ever run a single task in any 1 second time frame. The rest are ignored.
type RateLimiter struct {
tickerValue int64
tickerValue atomic.Int64
lastDoValue int64

mu sync.Mutex
Expand All @@ -37,7 +37,8 @@ type RateLimiter struct {

// NewRateLimiter creates a new limiter with given duration. It is immediately ready to run tasks.
func NewRateLimiter(d time.Duration) *RateLimiter {
r := &RateLimiter{tickerValue: 1}
r := &RateLimiter{}
r.lastDoValue = math.MinInt32 // Far enough to make a difference, but not too far to overflow.
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
go func() {
Expand All @@ -48,7 +49,7 @@ func NewRateLimiter(d time.Duration) *RateLimiter {
case <-ctx.Done():
return
case <-ticker.C:
atomic.StoreInt64(&r.tickerValue, r.tickerValue+1)
r.tickerValue.Add(1)
}
}
}()
Expand All @@ -61,16 +62,29 @@ func (r *RateLimiter) Do(f func() error) (err error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.lastDoValue >= atomic.LoadInt64(&r.tickerValue) {
if r.lastDoValue >= r.tickerValue.Load() {
return nil // rate limited. Skipped.
}
if f != nil {
err = f()
}
r.lastDoValue = atomic.LoadInt64(&r.tickerValue)
r.lastDoValue = r.tickerValue.Load()
return err
}

// DoEmpty is a convenience method to invoke Do() with no function.
func (r *RateLimiter) DoEmpty() {
_ = r.Do(nil)
}

// Diff returns the logical clock diff between the ticker and the last Do() call.
func (r *RateLimiter) Diff() int64 {
r.mu.Lock()
defer r.mu.Unlock()

return r.tickerValue.Load() - r.lastDoValue
}

// Stop terminates rate limiter's operation and will not allow any more Do() executions.
func (r *RateLimiter) Stop() {
r.cancel()
Expand Down
16 changes: 16 additions & 0 deletions go/timer/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package timer

import (
"math"
"testing"
"time"

Expand Down Expand Up @@ -75,3 +76,18 @@ func TestRateLimiterStop(t *testing.T) {
}
assert.Equal(t, valSnapshot, val)
}

func TestRateLimiterDiff(t *testing.T) {
d := 2 * time.Second
r := NewRateLimiter(d)
require.NotNil(t, r)
defer r.Stop()

// This assumes the last couple lines of code run faster than 2 seconds, which should be the case.
// But if you see flakiness due to slow runners, we can revisit the logic.
assert.Greater(t, r.Diff(), int64(math.MaxInt32))
time.Sleep(d + time.Second)
assert.Greater(t, r.Diff(), int64(math.MaxInt32))
r.DoEmpty()
assert.LessOrEqual(t, r.Diff(), int64(1))
}
27 changes: 13 additions & 14 deletions go/vt/vttablet/tabletserver/throttle/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,18 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp
}

checkResult = check.checkAppMetricResult(ctx, appName, storeType, storeName, metricResultFunc, flags)
check.throttler.lastCheckTimeNano.Store(time.Now().UnixNano())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed from check.,go. Delegated to, and consolidated in checkStore() in throttler.go.


go func(statusCode int) {
stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)

if statusCode != http.StatusOK {
stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)
}

check.throttler.markRecentApp(appName, remoteAddr)
}(checkResult.StatusCode)

check.throttler.markRecentApp(appName, remoteAddr)
if !throttlerapp.VitessName.Equals(appName) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No metrics for "vitess" app.

go func(statusCode int) {
stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)

if statusCode != http.StatusOK {
stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)
}
}(checkResult.StatusCode)
}
return checkResult
}

Expand Down Expand Up @@ -227,6 +225,7 @@ func (check *ThrottlerCheck) SelfChecks(ctx context.Context) {
for metricName, metricResult := range check.AggregatedMetrics(ctx) {
metricName := metricName
metricResult := metricResult

go check.localCheck(ctx, metricName)
go check.reportAggregated(metricName, metricResult)
}
Expand Down
Loading
Loading