Skip to content

Commit

Permalink
Cherry-pick 2b25639 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Feb 9, 2024
1 parent 58cd10c commit c55fdeb
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
33 changes: 33 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ type txThrottlerState struct {
maxLag int64
done chan bool
waitForTermination sync.WaitGroup
<<<<<<< HEAD
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

Expand Down Expand Up @@ -499,6 +502,33 @@ func (ts *txThrottlerState) throttle() bool {

return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec &&
ts.throttler.Throttle(0 /* threadId */) > 0
<<<<<<< HEAD
=======
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
defer ticker.Stop()
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
Expand Down Expand Up @@ -537,9 +567,12 @@ func (ts *txThrottlerState) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

<<<<<<< HEAD
<<<<<<< HEAD
// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
=======
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
ts.done <- true
ts.waitForTermination.Wait()
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
Expand Down
29 changes: 29 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
=======
"context"
"sync/atomic"
<<<<<<< HEAD
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
"testing"
"time"
Expand Down Expand Up @@ -122,6 +125,7 @@ func TestEnabledThrottler(t *testing.T) {
TabletType: topodatapb.TabletType_REPLICA,
},
}
<<<<<<< HEAD
<<<<<<< HEAD
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
call3 := mockThrottler.EXPECT().Throttle(0)
Expand All @@ -132,6 +136,31 @@ func TestEnabledThrottler(t *testing.T) {
call3.After(call2)
call4.After(call3)
=======
=======

call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
calls = append(calls, call)

// 2
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

// 3
// Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first
// whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle()

// 4
// Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first
// whether there is lag or not, so no call to the underlying mockThrottler is issued.

call = mockThrottler.EXPECT().Close()
calls = append(calls, call)

for i := 1; i < len(calls); i++ {
calls[i].After(calls[i-1])
}
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))

call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
calls = append(calls, call)
Expand Down

0 comments on commit c55fdeb

Please sign in to comment.