From 2dfbdba4a9af2c09a6f2251354223105868f3beb Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 17 Sep 2024 17:34:46 -0400 Subject: [PATCH 1/2] feat: ratelimit store queries --- waku/v2/protocol/store/client.go | 50 ++++++++++++++++++++++++------- waku/v2/protocol/store/options.go | 8 +++++ waku/v2/protocol/store/result.go | 4 +-- 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 92c47ff4c..0b08d5f98 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" + "golang.org/x/time/rate" "google.golang.org/protobuf/proto" ) @@ -32,6 +33,8 @@ const MaxPageSize = 100 // DefaultPageSize is the default number of waku messages per page const DefaultPageSize = 20 +const maxQueriesPerSecond = 8 + const ok = uint32(200) var ( @@ -65,10 +68,11 @@ func (e *StoreError) Error() string { // WakuStore represents an instance of a store client type WakuStore struct { - h host.Host - timesource timesource.Timesource - log *zap.Logger - pm *peermanager.PeerManager + h host.Host + timesource timesource.Timesource + log *zap.Logger + pm *peermanager.PeerManager + rateLimiters map[peer.ID]*rate.Limiter } // NewWakuStore is used to instantiate a StoreV3 client @@ -77,6 +81,7 @@ func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, s.log = log.Named("store-client") s.timesource = timesource s.pm = pm + s.rateLimiters = make(map[peer.ID]*rate.Limiter) if pm != nil { pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField) @@ -171,7 +176,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return nil, err } - response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer) + response, err := s.queryFrom(ctx, storeRequest, params) if err != nil { return nil, err } @@ -211,7 +216,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt return len(result.messages) != 0, nil } -func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { +func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) { if r.IsComplete() { return &Result{ store: s, @@ -223,11 +228,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { }, nil } + params := new(Parameters) + params.selectedPeer = r.PeerID() + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, err + } + } + storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest) storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) storeRequest.PaginationCursor = r.Cursor() - response, err := s.queryFrom(ctx, storeRequest, r.PeerID()) + response, err := s.queryFrom(ctx, storeRequest, params) if err != nil { return nil, err } @@ -245,16 +261,28 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { } -func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) { - logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) +func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) { + logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) logger.Debug("sending store request") - stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) + if !params.skipRatelimit { + rateLimiter, ok := s.rateLimiters[params.selectedPeer] + if !ok { + rateLimiter = rate.NewLimiter(maxQueriesPerSecond, 1) + s.rateLimiters[params.selectedPeer] = rateLimiter + } + err := rateLimiter.Wait(ctx) + if err != nil { + return nil, err + } + } + + stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(selectedPeer) + ps.AddConnFailure(params.selectedPeer) } return nil, err } diff --git a/waku/v2/protocol/store/options.go b/waku/v2/protocol/store/options.go index b38afd53a..a4d29f6f7 100644 --- a/waku/v2/protocol/store/options.go +++ b/waku/v2/protocol/store/options.go @@ -19,6 +19,7 @@ type Parameters struct { pageLimit uint64 forward bool includeData bool + skipRatelimit bool } type RequestOption func(*Parameters) error @@ -115,6 +116,13 @@ func IncludeData(v bool) RequestOption { } } +func SkipRateLimit() RequestOption { + return func(params *Parameters) error { + params.skipRatelimit = true + return nil + } +} + // Default options to be used when querying a store node for results func DefaultOptions() []RequestOption { return []RequestOption{ diff --git a/waku/v2/protocol/store/result.go b/waku/v2/protocol/store/result.go index 5ea4765ec..604d6453c 100644 --- a/waku/v2/protocol/store/result.go +++ b/waku/v2/protocol/store/result.go @@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse { return r.storeResponse } -func (r *Result) Next(ctx context.Context) error { +func (r *Result) Next(ctx context.Context, opts ...RequestOption) error { if r.cursor == nil { r.done = true r.messages = nil return nil } - newResult, err := r.store.next(ctx, r) + newResult, err := r.store.next(ctx, r, opts...) if err != nil { return err } From 95676e2ec80e3bee6bf0c9a35a242f0f713b02f9 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 18 Sep 2024 08:14:49 -0400 Subject: [PATCH 2/2] fix: code review --- waku/v2/node/wakunode2.go | 2 +- waku/v2/node/wakuoptions.go | 14 ++++++++++++++ waku/v2/protocol/store/client.go | 19 ++++++++++--------- waku/v2/protocol/store/client_test.go | 2 +- waku/v2/protocol/store/options.go | 1 + 5 files changed, 27 insertions(+), 11 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index f9dc443fc..10153fd6d 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) - w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) + w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit) if params.storeFactory != nil { w.storeFactory = params.storeFactory diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 445065de6..112cafe61 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -38,6 +38,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" ) // Default UserAgent @@ -94,6 +95,8 @@ type WakuNodeParameters struct { enableStore bool messageProvider legacy_store.MessageProvider + storeRateLimit rate.Limit + enableRendezvousPoint bool rendezvousDB *rendezvous.DB @@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{ WithCircuitRelayParams(2*time.Second, 3*time.Minute), WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), + WithWakuStoreRateLimit(8), // Value currently set in status.staging } // MultiAddresses return the list of multiaddresses configured in the node @@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption { } } +// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will +// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming +// the store protocol from a storenode +func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.storeRateLimit = value + return nil + } +} + // WithWakuStore enables the Waku V2 Store protocol and if the messages should // be stored or not in a message provider. func WithWakuStore() WakuNodeOption { diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 0b08d5f98..3398c4bf9 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -33,8 +33,6 @@ const MaxPageSize = 100 // DefaultPageSize is the default number of waku messages per page const DefaultPageSize = 20 -const maxQueriesPerSecond = 8 - const ok = uint32(200) var ( @@ -68,19 +66,22 @@ func (e *StoreError) Error() string { // WakuStore represents an instance of a store client type WakuStore struct { - h host.Host - timesource timesource.Timesource - log *zap.Logger - pm *peermanager.PeerManager - rateLimiters map[peer.ID]*rate.Limiter + h host.Host + timesource timesource.Timesource + log *zap.Logger + pm *peermanager.PeerManager + + defaultRatelimit rate.Limit + rateLimiters map[peer.ID]*rate.Limiter } // NewWakuStore is used to instantiate a StoreV3 client -func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore { +func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore { s := new(WakuStore) s.log = log.Named("store-client") s.timesource = timesource s.pm = pm + s.defaultRatelimit = defaultRatelimit s.rateLimiters = make(map[peer.ID]*rate.Limiter) if pm != nil { @@ -269,7 +270,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe if !params.skipRatelimit { rateLimiter, ok := s.rateLimiters[params.selectedPeer] if !ok { - rateLimiter = rate.NewLimiter(maxQueriesPerSecond, 1) + rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1) s.rateLimiters[params.selectedPeer] = rateLimiter } err := rateLimiter.Wait(ctx) diff --git a/waku/v2/protocol/store/client_test.go b/waku/v2/protocol/store/client_test.go index 357dc270b..733a27e93 100644 --- a/waku/v2/protocol/store/client_test.go +++ b/waku/v2/protocol/store/client_test.go @@ -69,7 +69,7 @@ func TestStoreClient(t *testing.T) { pm.Start(ctx) // Creating a storeV3 instance for all queries - wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger()) + wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger(), 8) wakuStore.SetHost(host) _, err = wakuRelay.Subscribe(context.Background(), protocol.NewContentFilter(pubsubTopic), relay.WithoutConsumer()) diff --git a/waku/v2/protocol/store/options.go b/waku/v2/protocol/store/options.go index a4d29f6f7..b8deba47c 100644 --- a/waku/v2/protocol/store/options.go +++ b/waku/v2/protocol/store/options.go @@ -116,6 +116,7 @@ func IncludeData(v bool) RequestOption { } } +// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429)) func SkipRateLimit() RequestOption { return func(params *Parameters) error { params.skipRatelimit = true