From e94eaa0ec5433aac43098c6a2f80ec093b88d95c Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 18 Sep 2024 08:14:49 -0400 Subject: [PATCH] fix: code review --- waku/v2/node/wakunode2.go | 2 +- waku/v2/node/wakuoptions.go | 14 ++++++++++++++ waku/v2/protocol/store/client.go | 19 ++++++++++--------- waku/v2/protocol/store/options.go | 1 + 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index f9dc443fc..10153fd6d 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) - w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) + w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit) if params.storeFactory != nil { w.storeFactory = params.storeFactory diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 445065de6..112cafe61 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -38,6 +38,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" ) // Default UserAgent @@ -94,6 +95,8 @@ type WakuNodeParameters struct { enableStore bool messageProvider legacy_store.MessageProvider + storeRateLimit rate.Limit + enableRendezvousPoint bool rendezvousDB *rendezvous.DB @@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{ WithCircuitRelayParams(2*time.Second, 3*time.Minute), WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), + WithWakuStoreRateLimit(8), // Value currently set in status.staging } // MultiAddresses return the list of multiaddresses configured in the node @@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption { } } +// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will +// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming +// the store protocol from a storenode +func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.storeRateLimit = value + return nil + } +} + // WithWakuStore enables the Waku V2 Store protocol and if the messages should // be stored or not in a message provider. func WithWakuStore() WakuNodeOption { diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 0b08d5f98..3398c4bf9 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -33,8 +33,6 @@ const MaxPageSize = 100 // DefaultPageSize is the default number of waku messages per page const DefaultPageSize = 20 -const maxQueriesPerSecond = 8 - const ok = uint32(200) var ( @@ -68,19 +66,22 @@ func (e *StoreError) Error() string { // WakuStore represents an instance of a store client type WakuStore struct { - h host.Host - timesource timesource.Timesource - log *zap.Logger - pm *peermanager.PeerManager - rateLimiters map[peer.ID]*rate.Limiter + h host.Host + timesource timesource.Timesource + log *zap.Logger + pm *peermanager.PeerManager + + defaultRatelimit rate.Limit + rateLimiters map[peer.ID]*rate.Limiter } // NewWakuStore is used to instantiate a StoreV3 client -func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore { +func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore { s := new(WakuStore) s.log = log.Named("store-client") s.timesource = timesource s.pm = pm + s.defaultRatelimit = defaultRatelimit s.rateLimiters = make(map[peer.ID]*rate.Limiter) if pm != nil { @@ -269,7 +270,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe if !params.skipRatelimit { rateLimiter, ok := s.rateLimiters[params.selectedPeer] if !ok { - rateLimiter = rate.NewLimiter(maxQueriesPerSecond, 1) + rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1) s.rateLimiters[params.selectedPeer] = rateLimiter } err := rateLimiter.Wait(ctx) diff --git a/waku/v2/protocol/store/options.go b/waku/v2/protocol/store/options.go index a4d29f6f7..b8deba47c 100644 --- a/waku/v2/protocol/store/options.go +++ b/waku/v2/protocol/store/options.go @@ -116,6 +116,7 @@ func IncludeData(v bool) RequestOption { } } +// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429)) func SkipRateLimit() RequestOption { return func(params *Parameters) error { params.skipRatelimit = true