Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ratelimit store queries and add options to Next #1221

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
)

Expand All @@ -32,6 +33,8 @@ const MaxPageSize = 100
// DefaultPageSize is the default number of waku messages per page
const DefaultPageSize = 20

const maxQueriesPerSecond = 8
richard-ramos marked this conversation as resolved.
Show resolved Hide resolved

const ok = uint32(200)

var (
Expand Down Expand Up @@ -65,10 +68,11 @@ func (e *StoreError) Error() string {

// WakuStore represents an instance of a store client
type WakuStore struct {
h host.Host
timesource timesource.Timesource
log *zap.Logger
pm *peermanager.PeerManager
h host.Host
timesource timesource.Timesource
log *zap.Logger
pm *peermanager.PeerManager
rateLimiters map[peer.ID]*rate.Limiter
}

// NewWakuStore is used to instantiate a StoreV3 client
Expand All @@ -77,6 +81,7 @@ func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource,
s.log = log.Named("store-client")
s.timesource = timesource
s.pm = pm
s.rateLimiters = make(map[peer.ID]*rate.Limiter)

if pm != nil {
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
Expand Down Expand Up @@ -171,7 +176,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
return nil, err
}

response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer)
response, err := s.queryFrom(ctx, storeRequest, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -211,7 +216,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
return len(result.messages) != 0, nil
}

func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
if r.IsComplete() {
return &Result{
store: s,
Expand All @@ -223,11 +228,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
}, nil
}

params := new(Parameters)
params.selectedPeer = r.PeerID()
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
err := opt(params)
if err != nil {
return nil, err
}
}

storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
storeRequest.PaginationCursor = r.Cursor()

response, err := s.queryFrom(ctx, storeRequest, r.PeerID())
response, err := s.queryFrom(ctx, storeRequest, params)
if err != nil {
return nil, err
}
Expand All @@ -245,16 +261,28 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {

}

func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))

logger.Debug("sending store request")

stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
if !params.skipRatelimit {
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
if !ok {
rateLimiter = rate.NewLimiter(maxQueriesPerSecond, 1)
s.rateLimiters[params.selectedPeer] = rateLimiter
}
err := rateLimiter.Wait(ctx)
if err != nil {
return nil, err
}
}

stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(selectedPeer)
ps.AddConnFailure(params.selectedPeer)
}
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions waku/v2/protocol/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Parameters struct {
pageLimit uint64
forward bool
includeData bool
skipRatelimit bool
}

type RequestOption func(*Parameters) error
Expand Down Expand Up @@ -115,6 +116,13 @@ func IncludeData(v bool) RequestOption {
}
}

func SkipRateLimit() RequestOption {
richard-ramos marked this conversation as resolved.
Show resolved Hide resolved
return func(params *Parameters) error {
params.skipRatelimit = true
return nil
}
}

// Default options to be used when querying a store node for results
func DefaultOptions() []RequestOption {
return []RequestOption{
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/store/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse {
return r.storeResponse
}

func (r *Result) Next(ctx context.Context) error {
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
if r.cursor == nil {
r.done = true
r.messages = nil
return nil
}

newResult, err := r.store.next(ctx, r)
newResult, err := r.store.next(ctx, r, opts...)
if err != nil {
return err
}
Expand Down
Loading