Skip to content

Commit

Permalink
[Hammer] Throttle has mutex guarding changes (#424)
Browse files Browse the repository at this point in the history
This should avoid the weird number that can be reported when changing the limits at runtime. This also cleans up the readability of the main supply loop, avoiding the need to have a label to break to.
  • Loading branch information
mhutchinson authored Dec 17, 2024
1 parent c4d2331 commit 21df5fa
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions internal/hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,16 @@ func NewThrottle(opsPerSecond int) *Throttle {
}

type Throttle struct {
opsPerSecond int
tokenChan chan bool
mu sync.Mutex
opsPerSecond int

oversupply int
}

func (t *Throttle) Increase() {
t.mu.Lock()
defer t.mu.Unlock()
tokenCount := t.opsPerSecond
delta := float64(tokenCount) * 0.1
if delta < 1 {
Expand All @@ -424,6 +427,8 @@ func (t *Throttle) Increase() {
}

func (t *Throttle) Decrease() {
t.mu.Lock()
defer t.mu.Unlock()
tokenCount := t.opsPerSecond
if tokenCount <= 1 {
return
Expand All @@ -443,20 +448,27 @@ func (t *Throttle) Run(ctx context.Context) {
case <-ctx.Done(): //context cancelled
return
case <-ticker.C:
tokenCount := t.opsPerSecond
timeout := time.After(interval)
Loop:
for i := 0; i < t.opsPerSecond; i++ {
select {
case t.tokenChan <- true:
tokenCount--
case <-timeout:
break Loop
}
}
ctx, cancel := context.WithTimeout(ctx, interval)
t.supplyTokens(ctx)
cancel()
}
}
}

func (t *Throttle) supplyTokens(ctx context.Context) {
t.mu.Lock()
defer t.mu.Unlock()
tokenCount := t.opsPerSecond
for i := 0; i < t.opsPerSecond; i++ {
select {
case t.tokenChan <- true:
tokenCount--
case <-ctx.Done():
t.oversupply = tokenCount
return
}
}
t.oversupply = 0
}

func (t *Throttle) String() string {
Expand Down

0 comments on commit 21df5fa

Please sign in to comment.