diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index 020bb23f5..5e8a35e1c 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -52,6 +52,7 @@ type Sub struct { type subscribeParameters struct { batchInterval time.Duration multiplexChannelBuffer int + preferredPeers peer.IDSlice } type SubscribeOptions func(*subscribeParameters) @@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions { } } +func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions { + return func(params *subscribeParameters) { + params.preferredPeers = peers + } +} + // Subscribe func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { sub := new(Sub) @@ -197,7 +204,16 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int options := make([]filter.FilterSubscribeOption, 0) options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) for _, p := range apiSub.Config.Peers { - options = append(options, filter.WithPeer(p)) + isExcludedPeer := false + for _, px := range peersToExclude { // configured peer can be excluded if sub fails with it. + if p == px { + isExcludedPeer = true + break + } + } + if !isExcludedPeer { + options = append(options, filter.WithPeer(p)) + } } if len(peersToExclude) > 0 { apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude)) diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index a43c3c396..942ac6852 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -2,10 +2,12 @@ package filter import ( "context" + "math/rand" "sync" "time" "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" "golang.org/x/exp/maps" @@ -61,7 +63,8 @@ type EnevelopeProcessor interface { OnNewEnvelope(env *protocol.Envelope) error } -func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, + envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx @@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { defer utils.LogOnPanic() ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} + if len(mgr.params.preferredPeers) > 0 { + //use one peer which is from preferred peers. + randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1) + randomPreferredPeer := mgr.params.preferredPeers[randomIndex] + config.Peers = []peer.ID{randomPreferredPeer} + } sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} diff --git a/waku/v2/api/filter/filter_test.go b/waku/v2/api/filter/filter_test.go index 8a5f2d408..e16f675dd 100644 --- a/waku/v2/api/filter/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) ctx, cancel := context.WithCancel(context.Background()) s.Log.Info("About to perform API Subscribe()") - params := subscribeParameters{300 * time.Second, 1024} + params := subscribeParameters{300 * time.Second, 1024, nil} apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, ¶ms) s.Require().NoError(err) s.Require().Equal(apiSub.ContentFilter, contentFilter) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 3b56d4700..878b0d48d 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -333,11 +333,20 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) } reqPeerCount := params.maxPeers - len(params.selectedPeers) + for _, p := range params.selectedPeers { + if params.peersToExclude == nil { + params.peersToExclude = make(peermanager.PeerSet) + } + //exclude peers that are preferredpeers so that they don't get selected again. + if _, ok := params.peersToExclude[p]; !ok { + params.peersToExclude[p] = struct{}{} + } + } if params.pm != nil && reqPeerCount > 0 { wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude))) - params.selectedPeers, err = wf.pm.SelectPeers( + selectedPeers, err := wf.pm.SelectPeers( peermanager.PeerSelectionCriteria{ SelectionType: params.peerSelectionType, Proto: FilterSubscribeID_v20beta1, @@ -350,7 +359,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, ) if err != nil { wf.log.Error("peer selection returned err", zap.Error(err)) - return nil, nil, err + if len(params.selectedPeers) == 0 { + return nil, nil, err + } + } + if len(selectedPeers) > 0 { + params.selectedPeers = append(params.selectedPeers, selectedPeers...) } } wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))