Skip to content

Commit

Permalink
TxThrottler: dont throttle unless lag (#14789)
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <[email protected]>
  • Loading branch information
ejortegau authored Feb 9, 2024
1 parent a60297b commit 2b25639
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 30 deletions.
23 changes: 23 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"

throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
)
Expand Down Expand Up @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// MaxLag returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 41 additions & 2 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -81,6 +82,7 @@ type ThrottlerInterface interface {
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
MaxLag(tabletType topodatapb.TabletType) uint32
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
Expand Down Expand Up @@ -167,6 +169,10 @@ type txThrottlerStateImpl struct {

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

maxLag int64
done chan bool
waitForTermination sync.WaitGroup
}

// NewTxThrottler tries to construct a txThrottler from the relevant
Expand Down Expand Up @@ -245,7 +251,7 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {

// Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority
result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle()

t.requestsTotal.Add(workload, 1)
if result {
Expand Down Expand Up @@ -284,6 +290,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
done: make(chan bool, 1),
}

// get cells from topo if none defined in tabletenv config
Expand All @@ -298,6 +305,8 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi
state.stopHealthCheck = cancel
state.initHealthCheckStream(txThrottler.topoServer, target)
go state.healthChecksProcessor(ctx, txThrottler.topoServer, target)
state.waitForTermination.Add(1)
go state.updateMaxLag()

return state, nil
}
Expand Down Expand Up @@ -356,14 +365,44 @@ func (ts *txThrottlerStateImpl) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

maxLag := atomic.LoadInt64(&ts.maxLag)

return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec &&
ts.throttler.Throttle(0 /* threadId */) > 0
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
defer ticker.Stop()
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
}

func (ts *txThrottlerStateImpl) deallocateResources() {
// Close healthcheck and topo watchers
ts.closeHealthCheckStream()
ts.healthCheck = nil

ts.done <- true
ts.waitForTermination.Wait()
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
Expand Down
88 changes: 60 additions & 28 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package txthrottler

import (
"context"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -50,7 +51,7 @@ func TestDisabledThrottler(t *testing.T) {
Shard: "shard",
})
assert.Nil(t, throttler.Open())
assert.False(t, throttler.Throttle(0, "some_workload"))
assert.False(t, throttler.Throttle(0, "some-workload"))
throttlerImpl, _ := throttler.(*txThrottler)
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
throttler.Close()
Expand Down Expand Up @@ -80,28 +81,45 @@ func TestEnabledThrottler(t *testing.T) {
return mockThrottler, nil
}

call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
call1 := mockThrottler.EXPECT().Throttle(0)
call1.Return(0 * time.Second)
var calls []*gomock.Call

call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
calls = append(calls, call)

// 1
call = mockThrottler.EXPECT().Throttle(0)
call.Return(0 * time.Second)
calls = append(calls, call)

tabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
Cell: "cell1",
TabletType: topodatapb.TabletType_REPLICA,
},
}
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
call3 := mockThrottler.EXPECT().Throttle(0)
call3.Return(1 * time.Second)

call4 := mockThrottler.EXPECT().Throttle(0)
call4.Return(1 * time.Second)
calllast := mockThrottler.EXPECT().Close()
call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
calls = append(calls, call)

// 2
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

// 3
// Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first
// whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle()

// 4
// Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first
// whether there is lag or not, so no call to the underlying mockThrottler is issued.

call1.After(call0)
call2.After(call1)
call3.After(call2)
call4.After(call3)
calllast.After(call4)
call = mockThrottler.EXPECT().Close()
calls = append(calls, call)

for i := 1; i < len(calls); i++ {
calls[i].After(calls[i-1])
}

cfg := tabletenv.NewDefaultConfig()
cfg.EnableTxThrottler = true
Expand All @@ -118,13 +136,20 @@ func TestEnabledThrottler(t *testing.T) {
})

assert.Nil(t, throttlerImpl.Open())
throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl)
throttlerStateImpl, ok := throttlerImpl.state.(*txThrottlerStateImpl)
assert.True(t, ok)
assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes)
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())

assert.False(t, throttlerImpl.Throttle(100, "some_workload"))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some_workload"])
// Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a
// way that will interfere with how we manipulate that value in our tests to evaluate different cases:
throttlerStateImpl.done <- true

// 1 should not throttle due to return value of underlying Throttle(), despite high lag
atomic.StoreInt64(&throttlerStateImpl.maxLag, 20)
assert.False(t, throttlerImpl.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some-workload"])

throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts())
Expand All @@ -140,16 +165,23 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())

// The second throttle call should reject.
assert.True(t, throttlerImpl.Throttle(100, "some_workload"))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"])
// 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100
assert.True(t, throttlerImpl.Throttle(100, "some-workload"))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])

// This call should not throttle due to priority. Check that's the case and counters agree.
assert.False(t, throttlerImpl.Throttle(0, "some_workload"))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"])
throttlerImpl.Close()
// 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0
assert.False(t, throttlerImpl.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])

// 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag
atomic.StoreInt64(&throttlerStateImpl.maxLag, 1)
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])

throttler.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
}

Expand Down

0 comments on commit 2b25639

Please sign in to comment.