Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120597: kvcoord: use `sync.Map` for DistSender circuit breakers r=erikgrinaker a=erikgrinaker

This has better concurrency properties, in that map scans don't have to hold long-lived locks. Such scans can otherwise cause high tail latency, since circuit breaker map lookups are on the RPC hot path.

This adds about 20% overhead for circuit breaker construction, with 3 new allocations, but the request hot path overhead is negligible:

```
name                                                                         old time/op    new time/op    delta
DistSenderCircuitBreakersForReplica-24                                         2.69µs ± 1%    3.24µs ± 4%  +20.06%  (p=0.000 n=9+10)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=1-24        117ns ± 0%     120ns ± 1%   +2.55%  (p=0.000 n=7+9)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=16-24       193ns ±25%     181ns ±18%     ~     (p=0.382 n=10+10)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=64-24       165ns ±25%     172ns ± 7%     ~     (p=1.000 n=10+9)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=1024-24     142ns ±14%     169ns ±10%  +18.82%  (p=0.000 n=8+9)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=1-24         315ns ± 2%     326ns ± 2%   +3.60%  (p=0.000 n=10+10)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=16-24        795ns ± 6%     773ns ±11%     ~     (p=0.780 n=9+10)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=64-24        853ns ± 4%     821ns ± 9%     ~     (p=0.143 n=10+10)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=1024-24      898ns ± 5%     861ns ± 9%     ~     (p=0.052 n=10+10)

name                                                                         old allocs/op  new allocs/op  delta
DistSenderCircuitBreakersForReplica-24                                           13.0 ± 0%      16.0 ± 0%  +23.08%  (p=0.000 n=10+10)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=1-24         0.00           0.00          ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=16-24        0.00           0.00          ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=64-24        0.00           0.00          ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=false/alone=true/err=nil/conc=1024-24      0.00           0.00          ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=1-24          2.00 ± 0%      2.00 ± 0%     ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=16-24         2.00 ± 0%      2.00 ± 0%     ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=64-24         2.00 ± 0%      2.00 ± 0%     ~     (all equal)
DistSenderCircuitBreakersTrack/cancel=true/alone=true/err=nil/conc=1024-24       2.00 ± 0%      2.00 ± 0%     ~     (all equal)
```

Touches cockroachdb#119919.
Epic: none
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Mar 24, 2024
2 parents a1deabf + 17912df commit c994982
Showing 1 changed file with 35 additions and 111 deletions.
146 changes: 35 additions & 111 deletions pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvcoord
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -85,10 +86,6 @@ const (
// cbGCInterval is the interval between garbage collection scans.
cbGCInterval = time.Minute

// cbGCBatchSize is the number of circuit breakers to garbage collect
// at once while holding the mutex.
cbGCBatchSize = 20

// cbProbeIdleIntervals is the number of probe intervals with no client
// requests after which a failing probe should exit. It will be relaunched on
// the next request, if any.
Expand Down Expand Up @@ -161,13 +158,7 @@ type DistSenderCircuitBreakers struct {
settings *cluster.Settings
transportFactory TransportFactory
metrics DistSenderMetrics

// TODO(erikgrinaker): consider using a generic sync.Map here, but needs
// benchmarking.
mu struct {
syncutil.RWMutex
replicas map[cbKey]*ReplicaCircuitBreaker
}
replicas sync.Map // cbKey -> *ReplicaCircuitBreaker
}

// NewDistSenderCircuitBreakers creates new DistSender circuit breakers.
Expand All @@ -178,15 +169,13 @@ func NewDistSenderCircuitBreakers(
transportFactory TransportFactory,
metrics DistSenderMetrics,
) *DistSenderCircuitBreakers {
d := &DistSenderCircuitBreakers{
return &DistSenderCircuitBreakers{
ambientCtx: ambientCtx,
stopper: stopper,
settings: settings,
transportFactory: transportFactory,
metrics: metrics,
}
d.mu.replicas = map[cbKey]*ReplicaCircuitBreaker{}
return d
}

// Start starts the circuit breaker manager, and runs it until the stopper
Expand All @@ -207,8 +196,6 @@ func (d *DistSenderCircuitBreakers) Start() error {
// probeStallLoop periodically scans replica circuit breakers to detect stalls
// and launch probes.
func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) {
var cbs []*ReplicaCircuitBreaker // reuse across scans

// We use the probe interval as the scan interval, since we can sort of
// consider this to be probing the replicas for a stall.
var timer timeutil.Timer
Expand All @@ -234,17 +221,20 @@ func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) {

// Probe replicas for a stall if we haven't seen a response from them in the
// past probe threshold.
cbs = d.snapshot(cbs[:0])
nowNanos := timeutil.Now().UnixNano()
probeThreshold := CircuitBreakerProbeThreshold.Get(&d.settings.SV)

for _, cb := range cbs {
d.replicas.Range(func(_, v any) bool {
cb := v.(*ReplicaCircuitBreaker)

// Don't probe if the breaker is already tripped. It will be probed in
// response to user traffic, to reduce the number of concurrent probes.
if cb.stallDuration(nowNanos) >= probeThreshold && !cb.isTripped() {
cb.breaker.Probe()
}
}

return true
})
}
}

Expand All @@ -259,9 +249,6 @@ func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) {
// breakers if the DistSender keeps sending requests to them for some
// reason.
func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) {
var cbs []*ReplicaCircuitBreaker // reuse across scans
var gc []cbKey

ticker := time.NewTicker(cbGCInterval)
defer ticker.Stop()
for {
Expand All @@ -273,82 +260,35 @@ func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) {
return
}

// Collect circuit breakers eligible for GC.
cbs = d.snapshot(cbs[:0])
gc = gc[:0]

nowNanos := timeutil.Now().UnixNano()

for _, cb := range cbs {
var cbs, gced int
d.replicas.Range(func(key, v any) bool {
cb := v.(*ReplicaCircuitBreaker)
cbs++

if idleDuration := cb.lastRequestDuration(nowNanos); idleDuration >= cbGCThreshold {
if !cb.isTripped() || idleDuration >= cbGCThresholdTripped {
gc = append(gc, cbKey{rangeID: cb.rangeID, replicaID: cb.desc.ReplicaID})
// Check if we raced with a concurrent delete. We don't expect to,
// since only this loop removes circuit breakers.
if _, ok := d.replicas.LoadAndDelete(key); ok {
// TODO(erikgrinaker): this needs to remove tripped circuit breakers
// from the metrics, otherwise they'll appear as tripped forever.
// However, there are race conditions with concurrent probes that
// can lead to metrics gauge leaks (both positive and negative), so
// we'll have to make sure we reap the probes here first.
d.metrics.CircuitBreaker.Replicas.Dec(1)
gced++
}
}
}
}
return true
})

if len(gc) == 0 {
continue
}

// Garbage collect the replicas. We may have raced with concurrent requests,
// but that's ok.
log.VEventf(ctx, 2, "garbage collecting %d/%d replica circuit breakers", len(gc), len(cbs))

func() {
d.mu.Lock()
defer d.mu.Unlock()

for i, key := range gc {
// Periodically release the mutex to avoid tail latency.
if i%cbGCBatchSize == 0 && i > 0 {
d.mu.Unlock()
d.mu.Lock()
}
// Check if we raced with a concurrent delete. We don't expect to,
// since only this loop removes circuit breakers.
//
// TODO(erikgrinaker): this needs to remove tripped circuit breakers
// from the metrics, otherwise they'll appear as tripped forever.
// However, there are race conditions with concurrent probes that can
// lead to metrics gauge leaks (both positive and negative), so we'll
// have to make sure we reap the probes here first.
if _, ok := d.mu.replicas[key]; ok {
delete(d.mu.replicas, key) // nolint:deferunlockcheck
d.metrics.CircuitBreaker.Replicas.Dec(1) // nolint:deferunlockcheck
}
}
}()

log.VEventf(ctx, 2, "garbage collected %d/%d replica circuit breakers", len(gc), len(cbs)) // nolint:deferunlockcheck
log.VEventf(ctx, 2, "garbage collected %d/%d DistSender replica circuit breakers", gced, cbs)
}
}

// snapshot fetches a snapshot of the current replica circuit breakers, reusing
// the given slice if it has sufficient capacity.
func (d *DistSenderCircuitBreakers) snapshot(
buf []*ReplicaCircuitBreaker,
) []*ReplicaCircuitBreaker {
// Make sure the slice has sufficient capacity first, to avoid growing it
// while holding the mutex. We give it an additional 10% capacity, to avoid
// frequent growth and races.
d.mu.RLock()
l := len(d.mu.replicas) // nolint:deferunlockcheck
d.mu.RUnlock()
if cap(buf) < l {
buf = make([]*ReplicaCircuitBreaker, 0, l+l/10)
} else {
buf = buf[:0]
}

d.mu.RLock()
defer d.mu.RUnlock()
for _, cb := range d.mu.replicas {
buf = append(buf, cb)
}
return buf
}

// ForReplica returns a circuit breaker for a given replica.
func (d *DistSenderCircuitBreakers) ForReplica(
rangeDesc *roachpb.RangeDescriptor, replDesc *roachpb.ReplicaDescriptor,
Expand All @@ -361,33 +301,17 @@ func (d *DistSenderCircuitBreakers) ForReplica(
key := cbKey{rangeID: rangeDesc.RangeID, replicaID: replDesc.ReplicaID}

// Fast path: use existing circuit breaker.
d.mu.RLock()
cb, ok := d.mu.replicas[key]
d.mu.RUnlock()
if ok {
return cb
if v, ok := d.replicas.Load(key); ok {
return v.(*ReplicaCircuitBreaker)
}

// Slow path: construct a new replica circuit breaker and insert it.
//
// We construct it outside of the lock to avoid holding it for too long, since
// it incurs a fair number of allocations. This can cause us to concurrently
// construct and then discard a bunch of circuit breakers, but it will be
// bounded by the number of concurrent requests to the replica, and is likely
// better than delaying requests to other, unrelated replicas.
cb = newReplicaCircuitBreaker(d, rangeDesc, replDesc)

d.mu.Lock()
defer d.mu.Unlock()

if c, ok := d.mu.replicas[key]; ok {
cb = c // we raced with a concurrent insert
} else {
d.mu.replicas[key] = cb
// Slow path: construct a new replica circuit breaker and insert it. If we
// race with a concurrent insert, return it instead.
v, loaded := d.replicas.LoadOrStore(key, newReplicaCircuitBreaker(d, rangeDesc, replDesc))
if !loaded {
d.metrics.CircuitBreaker.Replicas.Inc(1)
}

return cb
return v.(*ReplicaCircuitBreaker)
}

// ReplicaCircuitBreaker is a circuit breaker for an individual replica.
Expand Down

0 comments on commit c994982

Please sign in to comment.