Skip to content

Commit

Permalink
feat: ratelimit store queries and add options to Next (#1221)
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Sep 18, 2024
1 parent 991e872 commit f0acee4
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 12 deletions.
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)

w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit)

if params.storeFactory != nil {
w.storeFactory = params.storeFactory
Expand Down
14 changes: 14 additions & 0 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
)

// Default UserAgent
Expand Down Expand Up @@ -94,6 +95,8 @@ type WakuNodeParameters struct {
enableStore bool
messageProvider legacy_store.MessageProvider

storeRateLimit rate.Limit

enableRendezvousPoint bool
rendezvousDB *rendezvous.DB

Expand Down Expand Up @@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
WithWakuStoreRateLimit(8), // Value currently set in status.staging
}

// MultiAddresses return the list of multiaddresses configured in the node
Expand Down Expand Up @@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
}
}

// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will
// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming
// the store protocol from a storenode
func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.storeRateLimit = value
return nil
}
}

// WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider.
func WithWakuStore() WakuNodeOption {
Expand Down
45 changes: 37 additions & 8 deletions waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -69,14 +70,19 @@ type WakuStore struct {
timesource timesource.Timesource
log *zap.Logger
pm *peermanager.PeerManager

defaultRatelimit rate.Limit
rateLimiters map[peer.ID]*rate.Limiter
}

// NewWakuStore is used to instantiate a StoreV3 client
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore {
s := new(WakuStore)
s.log = log.Named("store-client")
s.timesource = timesource
s.pm = pm
s.defaultRatelimit = defaultRatelimit
s.rateLimiters = make(map[peer.ID]*rate.Limiter)

if pm != nil {
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
Expand Down Expand Up @@ -171,7 +177,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
return nil, err
}

response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer)
response, err := s.queryFrom(ctx, storeRequest, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -211,7 +217,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
return len(result.messages) != 0, nil
}

func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
if r.IsComplete() {
return &Result{
store: s,
Expand All @@ -223,11 +229,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
}, nil
}

params := new(Parameters)
params.selectedPeer = r.PeerID()
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
err := opt(params)
if err != nil {
return nil, err
}
}

storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
storeRequest.PaginationCursor = r.Cursor()

response, err := s.queryFrom(ctx, storeRequest, r.PeerID())
response, err := s.queryFrom(ctx, storeRequest, params)
if err != nil {
return nil, err
}
Expand All @@ -245,16 +262,28 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {

}

func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))

logger.Debug("sending store request")

stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
if !params.skipRatelimit {
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
if !ok {
rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1)
s.rateLimiters[params.selectedPeer] = rateLimiter
}
err := rateLimiter.Wait(ctx)
if err != nil {
return nil, err
}
}

stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(selectedPeer)
ps.AddConnFailure(params.selectedPeer)
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/store/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestStoreClient(t *testing.T) {
pm.Start(ctx)

// Creating a storeV3 instance for all queries
wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger())
wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger(), 8)
wakuStore.SetHost(host)

_, err = wakuRelay.Subscribe(context.Background(), protocol.NewContentFilter(pubsubTopic), relay.WithoutConsumer())
Expand Down
9 changes: 9 additions & 0 deletions waku/v2/protocol/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Parameters struct {
pageLimit uint64
forward bool
includeData bool
skipRatelimit bool
}

type RequestOption func(*Parameters) error
Expand Down Expand Up @@ -115,6 +116,14 @@ func IncludeData(v bool) RequestOption {
}
}

// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429))
func SkipRateLimit() RequestOption {
return func(params *Parameters) error {
params.skipRatelimit = true
return nil
}
}

// Default options to be used when querying a store node for results
func DefaultOptions() []RequestOption {
return []RequestOption{
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/store/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse {
return r.storeResponse
}

func (r *Result) Next(ctx context.Context) error {
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
if r.cursor == nil {
r.done = true
r.messages = nil
return nil
}

newResult, err := r.store.next(ctx, r)
newResult, err := r.store.next(ctx, r, opts...)
if err != nil {
return err
}
Expand Down

0 comments on commit f0acee4

Please sign in to comment.