From 9f7734d9d9dbe6374af059e01e7c73882288d46a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 25 Dec 2024 13:18:48 +0530 Subject: [PATCH] fix_: handle network change for filter subs --- go.mod | 2 +- go.sum | 4 +- .../waku/v2/api/filter/filter_manager.go | 68 +++++++++++++------ .../go-waku/waku/v2/protocol/filter/client.go | 2 +- vendor/modules.txt | 2 +- wakuv2/waku.go | 2 +- 6 files changed, 54 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index c64aa93c86..00307fab95 100644 --- a/go.mod +++ b/go.mod @@ -97,7 +97,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241219102436-278907543b02 + github.com/waku-org/go-waku v0.8.1-0.20241225074646-a49ea9cc6ee8 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 6fe5b8f253..4aa091f9b5 100644 --- a/go.sum +++ b/go.sum @@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241219102436-278907543b02 h1:4XOKp1EwJ8h5HAnuNXBbhz8zbmjnsZLunuwdMNUYlTQ= -github.com/waku-org/go-waku v0.8.1-0.20241219102436-278907543b02/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI= +github.com/waku-org/go-waku v0.8.1-0.20241225074646-a49ea9cc6ee8 h1:pk31tEMRa1qjHg9drNXFo1VA6haarsgJsjjhn0p7dUE= +github.com/waku-org/go-waku v0.8.1-0.20241225074646-a49ea9cc6ee8/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go index 665d577bd0..f561624a80 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go @@ -26,6 +26,7 @@ import ( // filterSubscriptions is the map of filter subscription IDs to subscriptions const filterSubBatchSize = 90 +const initNetworkConnType = 255 type appFilterMap map[string]filterConfig @@ -43,6 +44,7 @@ type FilterManager struct { filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} waitingToSubQueue chan filterConfig envProcessor EnevelopeProcessor + networkConnType byte } type SubDetails struct { @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) + mgr.networkConnType = initNetworkConnType //parsing the subscribe params only to read the batchInterval passed. mgr.params = new(subscribeParameters) @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() { } } -// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch -// once batchlimit is hit, all filters are subscribed to and new batch is created. +// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch +// once batch-limit is hit, all filters are subscribed to and new batch is created. // if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) { @@ -182,37 +185,62 @@ func (mgr *FilterManager) NetworkChange() { mgr.node.PingPeers() // ping all peers to check if subscriptions are alive } +func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) { + if len(mgr.waitingToSubQueue) > 0 { + for af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + mgr.waitingToSubQueue <- af + } + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") + break + } + } + } +} + +func (mgr *FilterManager) resubscribeAllSubscriptions() { + mgr.Lock() + mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", len(mgr.filterSubscriptions))) + for _, asub := range mgr.filterSubscriptions { + asub.sub.cleanup() + } + mgr.Unlock() + + mgr.Lock() + for filterID, config := range mgr.filterConfigs { + mgr.SubscribeFilter(filterID, config.contentFilter) + } + mgr.Unlock() + +} + // OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa // Note that pubsubTopic specific change can be triggered by specifying pubsubTopic, // if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online. -func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) { +func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) { subs := mgr.node.Subscriptions() mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + if mgr.networkConnType != initNetworkConnType && //checking for initialization condition + mgr.networkConnType != connType { // this means ip address of the node has changed which can cause issues in filter-push and hence resubscribing all filters + // resubscribe all existing filters + mgr.resubscribeAllSubscriptions() + } if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online mgr.onlineChecker.SetOnline(newStatus) mgr.NetworkChange() mgr.logger.Debug("switching from offline to online") mgr.Lock() - if len(mgr.waitingToSubQueue) > 0 { - for af := range mgr.waitingToSubQueue { - // TODO: change the below logic once topic specific health is implemented for lightClients - if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { - // check if any filter subs are pending and subscribe them - mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) - go mgr.subscribeAndRunLoop(af) - } else { - mgr.waitingToSubQueue <- af - } - if len(mgr.waitingToSubQueue) == 0 { - mgr.logger.Debug("no pending subscriptions") - break - } - } - } + mgr.checkAndProcessQueue(pubsubTopic) mgr.Unlock() } - + mgr.networkConnType = connType mgr.onlineChecker.SetOnline(newStatus) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 8fbcd91c13..c1e762e9bf 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea } if !wf.subscriptions.IsSubscribedTo(peerID) { - logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) + logger.Warn("received message push from unknown peer") wf.metrics.RecordError(unknownPeerMessagePush) //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us if err := stream.Reset(); err != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index 22cfff331b..e81d94d7f2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241219102436-278907543b02 +# github.com/waku-org/go-waku v0.8.1-0.20241225074646-a49ea9cc6ee8 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index c3e77ba99c..df9eb72865 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1740,7 +1740,7 @@ func (w *Waku) ConnectionChanged(state connection.State) { //TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114 go func() { defer gocommon.LogOnPanic() - w.filterManager.OnConnectionStatusChange("", isOnline) + w.filterManager.OnConnectionStatusChange("", isOnline, byte(state.Type)) }() w.handleNetworkChangeFromApp(state) } else {