diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go b/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go index fb92ea089477..1b38eaf4421b 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go @@ -13,6 +13,7 @@ package kvcoord import ( "context" "fmt" + "sync" "sync/atomic" "time" @@ -85,10 +86,6 @@ const ( // cbGCInterval is the interval between garbage collection scans. cbGCInterval = time.Minute - // cbGCBatchSize is the number of circuit breakers to garbage collect - // at once while holding the mutex. - cbGCBatchSize = 20 - // cbProbeIdleIntervals is the number of probe intervals with no client // requests after which a failing probe should exit. It will be relaunched on // the next request, if any. @@ -161,13 +158,7 @@ type DistSenderCircuitBreakers struct { settings *cluster.Settings transportFactory TransportFactory metrics DistSenderMetrics - - // TODO(erikgrinaker): consider using a generic sync.Map here, but needs - // benchmarking. - mu struct { - syncutil.RWMutex - replicas map[cbKey]*ReplicaCircuitBreaker - } + replicas sync.Map // cbKey -> *ReplicaCircuitBreaker } // NewDistSenderCircuitBreakers creates new DistSender circuit breakers. @@ -178,15 +169,13 @@ func NewDistSenderCircuitBreakers( transportFactory TransportFactory, metrics DistSenderMetrics, ) *DistSenderCircuitBreakers { - d := &DistSenderCircuitBreakers{ + return &DistSenderCircuitBreakers{ ambientCtx: ambientCtx, stopper: stopper, settings: settings, transportFactory: transportFactory, metrics: metrics, } - d.mu.replicas = map[cbKey]*ReplicaCircuitBreaker{} - return d } // Start starts the circuit breaker manager, and runs it until the stopper @@ -207,8 +196,6 @@ func (d *DistSenderCircuitBreakers) Start() error { // probeStallLoop periodically scans replica circuit breakers to detect stalls // and launch probes. func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) { - var cbs []*ReplicaCircuitBreaker // reuse across scans - // We use the probe interval as the scan interval, since we can sort of // consider this to be probing the replicas for a stall. var timer timeutil.Timer @@ -234,17 +221,20 @@ func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) { // Probe replicas for a stall if we haven't seen a response from them in the // past probe threshold. - cbs = d.snapshot(cbs[:0]) nowNanos := timeutil.Now().UnixNano() probeThreshold := CircuitBreakerProbeThreshold.Get(&d.settings.SV) - for _, cb := range cbs { + d.replicas.Range(func(_, v any) bool { + cb := v.(*ReplicaCircuitBreaker) + // Don't probe if the breaker is already tripped. It will be probed in // response to user traffic, to reduce the number of concurrent probes. if cb.stallDuration(nowNanos) >= probeThreshold && !cb.isTripped() { cb.breaker.Probe() } - } + + return true + }) } } @@ -259,9 +249,6 @@ func (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) { // breakers if the DistSender keeps sending requests to them for some // reason. func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) { - var cbs []*ReplicaCircuitBreaker // reuse across scans - var gc []cbKey - ticker := time.NewTicker(cbGCInterval) defer ticker.Stop() for { @@ -273,82 +260,35 @@ func (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) { return } - // Collect circuit breakers eligible for GC. - cbs = d.snapshot(cbs[:0]) - gc = gc[:0] - nowNanos := timeutil.Now().UnixNano() - for _, cb := range cbs { + var cbs, gced int + d.replicas.Range(func(key, v any) bool { + cb := v.(*ReplicaCircuitBreaker) + cbs++ + if idleDuration := cb.lastRequestDuration(nowNanos); idleDuration >= cbGCThreshold { if !cb.isTripped() || idleDuration >= cbGCThresholdTripped { - gc = append(gc, cbKey{rangeID: cb.rangeID, replicaID: cb.desc.ReplicaID}) + // Check if we raced with a concurrent delete. We don't expect to, + // since only this loop removes circuit breakers. + if _, ok := d.replicas.LoadAndDelete(key); ok { + // TODO(erikgrinaker): this needs to remove tripped circuit breakers + // from the metrics, otherwise they'll appear as tripped forever. + // However, there are race conditions with concurrent probes that + // can lead to metrics gauge leaks (both positive and negative), so + // we'll have to make sure we reap the probes here first. + d.metrics.CircuitBreaker.Replicas.Dec(1) + gced++ + } } } - } + return true + }) - if len(gc) == 0 { - continue - } - - // Garbage collect the replicas. We may have raced with concurrent requests, - // but that's ok. - log.VEventf(ctx, 2, "garbage collecting %d/%d replica circuit breakers", len(gc), len(cbs)) - - func() { - d.mu.Lock() - defer d.mu.Unlock() - - for i, key := range gc { - // Periodically release the mutex to avoid tail latency. - if i%cbGCBatchSize == 0 && i > 0 { - d.mu.Unlock() - d.mu.Lock() - } - // Check if we raced with a concurrent delete. We don't expect to, - // since only this loop removes circuit breakers. - // - // TODO(erikgrinaker): this needs to remove tripped circuit breakers - // from the metrics, otherwise they'll appear as tripped forever. - // However, there are race conditions with concurrent probes that can - // lead to metrics gauge leaks (both positive and negative), so we'll - // have to make sure we reap the probes here first. - if _, ok := d.mu.replicas[key]; ok { - delete(d.mu.replicas, key) // nolint:deferunlockcheck - d.metrics.CircuitBreaker.Replicas.Dec(1) // nolint:deferunlockcheck - } - } - }() - - log.VEventf(ctx, 2, "garbage collected %d/%d replica circuit breakers", len(gc), len(cbs)) // nolint:deferunlockcheck + log.VEventf(ctx, 2, "garbage collected %d/%d DistSender replica circuit breakers", gced, cbs) } } -// snapshot fetches a snapshot of the current replica circuit breakers, reusing -// the given slice if it has sufficient capacity. -func (d *DistSenderCircuitBreakers) snapshot( - buf []*ReplicaCircuitBreaker, -) []*ReplicaCircuitBreaker { - // Make sure the slice has sufficient capacity first, to avoid growing it - // while holding the mutex. We give it an additional 10% capacity, to avoid - // frequent growth and races. - d.mu.RLock() - l := len(d.mu.replicas) // nolint:deferunlockcheck - d.mu.RUnlock() - if cap(buf) < l { - buf = make([]*ReplicaCircuitBreaker, 0, l+l/10) - } else { - buf = buf[:0] - } - - d.mu.RLock() - defer d.mu.RUnlock() - for _, cb := range d.mu.replicas { - buf = append(buf, cb) - } - return buf -} - // ForReplica returns a circuit breaker for a given replica. func (d *DistSenderCircuitBreakers) ForReplica( rangeDesc *roachpb.RangeDescriptor, replDesc *roachpb.ReplicaDescriptor, @@ -361,33 +301,17 @@ func (d *DistSenderCircuitBreakers) ForReplica( key := cbKey{rangeID: rangeDesc.RangeID, replicaID: replDesc.ReplicaID} // Fast path: use existing circuit breaker. - d.mu.RLock() - cb, ok := d.mu.replicas[key] - d.mu.RUnlock() - if ok { - return cb + if v, ok := d.replicas.Load(key); ok { + return v.(*ReplicaCircuitBreaker) } - // Slow path: construct a new replica circuit breaker and insert it. - // - // We construct it outside of the lock to avoid holding it for too long, since - // it incurs a fair number of allocations. This can cause us to concurrently - // construct and then discard a bunch of circuit breakers, but it will be - // bounded by the number of concurrent requests to the replica, and is likely - // better than delaying requests to other, unrelated replicas. - cb = newReplicaCircuitBreaker(d, rangeDesc, replDesc) - - d.mu.Lock() - defer d.mu.Unlock() - - if c, ok := d.mu.replicas[key]; ok { - cb = c // we raced with a concurrent insert - } else { - d.mu.replicas[key] = cb + // Slow path: construct a new replica circuit breaker and insert it. If we + // race with a concurrent insert, return it instead. + v, loaded := d.replicas.LoadOrStore(key, newReplicaCircuitBreaker(d, rangeDesc, replDesc)) + if !loaded { d.metrics.CircuitBreaker.Replicas.Inc(1) } - - return cb + return v.(*ReplicaCircuitBreaker) } // ReplicaCircuitBreaker is a circuit breaker for an individual replica.