Skip to content

Commit

Permalink
kvcoord: use sync.Map for DistSender circuit breakers
Browse files Browse the repository at this point in the history
This has better concurrency properties, in that map scans don't have to
hold long-lived locks. Such scans can otherwise cause high tail latency,
since circuit breaker map lookups are on the RPC hot path.

This adds about 20% overhead for circuit breaker construction, with 3
new allocations, but the request hot path overhead is negligible:

```
name                                                                         old time/op    new time/op    delta
DistSenderCircuitBreakersForReplica-24                                         2.69µs ± 1%    3.24µs ± 4%  +20.06%  (p=0.000 n=9+10)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=1-24        117ns ± 0%     120ns ± 1%   +2.55%  (p=0.000 n=7+9)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=16-24       193ns ±25%     181ns ±18%     ~     (p=0.382 n=10+10)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=64-24       165ns ±25%     172ns ± 7%     ~     (p=1.000 n=10+9)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=1024-24     142ns ±14%     169ns ±10%  +18.82%  (p=0.000 n=8+9)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=1-24         315ns ± 2%     326ns ± 2%   +3.60%  (p=0.000 n=10+10)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=16-24        795ns ± 6%     773ns ±11%     ~     (p=0.780 n=9+10)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=64-24        853ns ± 4%     821ns ± 9%     ~     (p=0.143 n=10+10)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=1024-24      898ns ± 5%     861ns ± 9%     ~     (p=0.052 n=10+10)

name                                                                         old allocs/op  new allocs/op  delta
DistSenderCircuitBreakersForReplica-24                                           13.0 ± 0%      16.0 ± 0%  +23.08%  (p=0.000 n=10+10)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=1-24         0.00           0.00          ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=16-24        0.00           0.00          ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=64-24        0.00           0.00          ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=1024-24      0.00           0.00          ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=1-24          2.00 ± 0%      2.00 ± 0%     ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=16-24         2.00 ± 0%      2.00 ± 0%     ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=64-24         2.00 ± 0%      2.00 ± 0%     ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=1024-24       2.00 ± 0%      2.00 ± 0%     ~     (all equal)
```

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Mar 24, 2024
1 parent 794a129 commit 17912df
Showing 1 changed file with 35 additions and 111 deletions.
146 changes: 35 additions & 111 deletions pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvcoord
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -85,10 +86,6 @@ const (
// cbGCInterval is the interval between garbage collection scans.
cbGCInterval = time.Minute

// cbGCBatchSize is the number of circuit breakers to garbage collect
// at once while holding the mutex.
cbGCBatchSize = 20

// cbProbeIdleIntervals is the number of probe intervals with no client
// requests after which a failing probe should exit. It will be relaunched on
// the next request, if any.
Expand Down Expand Up @@ -161,13 +158,7 @@ type DistSenderCircuitBreakers struct {
settings *cluster.Settings
transportFactory TransportFactory
metrics DistSenderMetrics

// TODO(erikgrinaker): consider using a generic sync.Map here, but needs
// benchmarking.
mu struct {
syncutil.RWMutex
replicas map[cbKey]*ReplicaCircuitBreaker
}
replicas sync.Map // cbKey -> *ReplicaCircuitBreaker
}

// NewDistSenderCircuitBreakers creates new DistSender circuit breakers.
Expand All @@ -178,15 +169,13 @@ func NewDistSenderCircuitBreakers(
transportFactory TransportFactory,
metrics DistSenderMetrics,
) *DistSenderCircuitBreakers {
d := &DistSenderCircuitBreakers{
return &DistSenderCircuitBreakers{
ambientCtx: ambientCtx,
stopper: stopper,
settings: settings,
transportFactory: transportFactory,
metrics: metrics,
}
d.mu.replicas = map[cbKey]*ReplicaCircuitBreaker{}
return d
}

// Start starts the circuit breaker manager, and runs it until the stopper
Expand All @@ -207,8 +196,6 @@ func (d *DistSenderCircuitBreakers) Start() error {
// probeStallLoop periodically scans replica circuit breakers to detect stalls
// and launch probes.
func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) {
var cbs []*ReplicaCircuitBreaker // reuse across scans

// We use the probe interval as the scan interval, since we can sort of
// consider this to be probing the replicas for a stall.
var timer timeutil.Timer
Expand All @@ -234,17 +221,20 @@ func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) {

// Probe replicas for a stall if we haven't seen a response from them in the
// past probe threshold.
cbs = d.snapshot(cbs[:0])
nowNanos := timeutil.Now().UnixNano()
probeThreshold := CircuitBreakerProbeThreshold.Get(&d.settings.SV)

for _, cb := range cbs {
d.replicas.Range(func(_, v any) bool {
cb := v.(*ReplicaCircuitBreaker)

// Don't probe if the breaker is already tripped. It will be probed in
// response to user traffic, to reduce the number of concurrent probes.
if cb.stallDuration(nowNanos) >= probeThreshold && !cb.isTripped() {
cb.breaker.Probe()
}
}

return true
})
}
}

Expand All @@ -259,9 +249,6 @@ func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) {
// breakers if the DistSender keeps sending requests to them for some
// reason.
func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) {
var cbs []*ReplicaCircuitBreaker // reuse across scans
var gc []cbKey

ticker := time.NewTicker(cbGCInterval)
defer ticker.Stop()
for {
Expand All @@ -273,82 +260,35 @@ func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) {
return
}

// Collect circuit breakers eligible for GC.
cbs = d.snapshot(cbs[:0])
gc = gc[:0]

nowNanos := timeutil.Now().UnixNano()

for _, cb := range cbs {
var cbs, gced int
d.replicas.Range(func(key, v any) bool {
cb := v.(*ReplicaCircuitBreaker)
cbs++

if idleDuration := cb.lastRequestDuration(nowNanos); idleDuration >= cbGCThreshold {
if !cb.isTripped() || idleDuration >= cbGCThresholdTripped {
gc = append(gc, cbKey{rangeID: cb.rangeID, replicaID: cb.desc.ReplicaID})
// Check if we raced with a concurrent delete. We don't expect to,
// since only this loop removes circuit breakers.
if _, ok := d.replicas.LoadAndDelete(key); ok {
// TODO(erikgrinaker): this needs to remove tripped circuit breakers
// from the metrics, otherwise they'll appear as tripped forever.
// However, there are race conditions with concurrent probes that
// can lead to metrics gauge leaks (both positive and negative), so
// we'll have to make sure we reap the probes here first.
d.metrics.CircuitBreaker.Replicas.Dec(1)
gced++
}
}
}
}
return true
})

if len(gc) == 0 {
continue
}

// Garbage collect the replicas. We may have raced with concurrent requests,
// but that's ok.
log.VEventf(ctx, 2, "garbage collecting %d/%d replica circuit breakers", len(gc), len(cbs))

func() {
d.mu.Lock()
defer d.mu.Unlock()

for i, key := range gc {
// Periodically release the mutex to avoid tail latency.
if i%cbGCBatchSize == 0 && i > 0 {
d.mu.Unlock()
d.mu.Lock()
}
// Check if we raced with a concurrent delete. We don't expect to,
// since only this loop removes circuit breakers.
//
// TODO(erikgrinaker): this needs to remove tripped circuit breakers
// from the metrics, otherwise they'll appear as tripped forever.
// However, there are race conditions with concurrent probes that can
// lead to metrics gauge leaks (both positive and negative), so we'll
// have to make sure we reap the probes here first.
if _, ok := d.mu.replicas[key]; ok {
delete(d.mu.replicas, key) // nolint:deferunlockcheck
d.metrics.CircuitBreaker.Replicas.Dec(1) // nolint:deferunlockcheck
}
}
}()

log.VEventf(ctx, 2, "garbage collected %d/%d replica circuit breakers", len(gc), len(cbs)) // nolint:deferunlockcheck
log.VEventf(ctx, 2, "garbage collected %d/%d DistSender replica circuit breakers", gced, cbs)
}
}

// snapshot fetches a snapshot of the current replica circuit breakers, reusing
// the given slice if it has sufficient capacity.
func (d *DistSenderCircuitBreakers) snapshot(
buf []*ReplicaCircuitBreaker,
) []*ReplicaCircuitBreaker {
// Make sure the slice has sufficient capacity first, to avoid growing it
// while holding the mutex. We give it an additional 10% capacity, to avoid
// frequent growth and races.
d.mu.RLock()
l := len(d.mu.replicas) // nolint:deferunlockcheck
d.mu.RUnlock()
if cap(buf) < l {
buf = make([]*ReplicaCircuitBreaker, 0, l+l/10)
} else {
buf = buf[:0]
}

d.mu.RLock()
defer d.mu.RUnlock()
for _, cb := range d.mu.replicas {
buf = append(buf, cb)
}
return buf
}

// ForReplica returns a circuit breaker for a given replica.
func (d *DistSenderCircuitBreakers) ForReplica(
rangeDesc *roachpb.RangeDescriptor, replDesc *roachpb.ReplicaDescriptor,
Expand All @@ -361,33 +301,17 @@ func (d *DistSenderCircuitBreakers) ForReplica(
key := cbKey{rangeID: rangeDesc.RangeID, replicaID: replDesc.ReplicaID}

// Fast path: use existing circuit breaker.
d.mu.RLock()
cb, ok := d.mu.replicas[key]
d.mu.RUnlock()
if ok {
return cb
if v, ok := d.replicas.Load(key); ok {
return v.(*ReplicaCircuitBreaker)
}

// Slow path: construct a new replica circuit breaker and insert it.
//
// We construct it outside of the lock to avoid holding it for too long, since
// it incurs a fair number of allocations. This can cause us to concurrently
// construct and then discard a bunch of circuit breakers, but it will be
// bounded by the number of concurrent requests to the replica, and is likely
// better than delaying requests to other, unrelated replicas.
cb = newReplicaCircuitBreaker(d, rangeDesc, replDesc)

d.mu.Lock()
defer d.mu.Unlock()

if c, ok := d.mu.replicas[key]; ok {
cb = c // we raced with a concurrent insert
} else {
d.mu.replicas[key] = cb
// Slow path: construct a new replica circuit breaker and insert it. If we
// race with a concurrent insert, return it instead.
v, loaded := d.replicas.LoadOrStore(key, newReplicaCircuitBreaker(d, rangeDesc, replDesc))
if !loaded {
d.metrics.CircuitBreaker.Replicas.Inc(1)
}

return cb
return v.(*ReplicaCircuitBreaker)
}

// ReplicaCircuitBreaker is a circuit breaker for an individual replica.
Expand Down

0 comments on commit 17912df

Please sign in to comment.