diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index dc0965aa868..b25acdc3dc2 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -24,6 +24,7 @@ import ( "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" + ratelimit "golang.org/x/time/rate" ) // loggerName is the tree path name of the logger for this package. @@ -65,6 +66,8 @@ type Puller struct { rate *rate.Rate // rate of historical syncing start sync.Once + + limiter *ratelimit.Limiter } func New( @@ -92,6 +95,7 @@ func New( blockLister: blockLister, rate: rate.New(DefaultHistRateWindow), cancel: func() { /* Noop, since the context is initialized in the Start(). */ }, + limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second), int(swarm.MaxBins)), // allows for 1 sync per second, max bins bursts } return p @@ -301,6 +305,11 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, for { + // rate limit within neighborhood + if bin >= p.radius.StorageRadius() { + _ = p.limiter.Wait(ctx) + } + select { case <-ctx.Done(): loggerV2.Debug("syncWorker context cancelled", "peer_address", peer, "bin", bin) @@ -308,8 +317,6 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, default: } - p.metrics.SyncWorkerIterCounter.Inc() - s, _, _, err := p.nextPeerInterval(peer, bin) if err != nil { p.metrics.SyncWorkerErrCounter.Inc()