Skip to content

Commit

Permalink
feat: handle dial errors from new streams
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Sep 20, 2024
1 parent 4bba456 commit 2a730ab
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 47 deletions.
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)

func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
err := w.host.Connect(ctx, info)
w.peerConnector.HandleDialError(err, info.ID)
w.peermanager.HandleDialError(err, info.ID)

for _, addr := range info.Addrs {
// TODO: this is a temporary fix
Expand Down
34 changes: 7 additions & 27 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package peermanager

import (
"context"
"errors"
"math/rand"
"sync"
"sync/atomic"
Expand All @@ -19,9 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"

"github.com/libp2p/go-libp2p/core/event"
"go.uber.org/zap"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -41,9 +38,8 @@ type PeerConnectionStrategy struct {
*service.CommonDiscoveryService
subscriptions []subscription

backoff backoff.BackoffFactory
logger *zap.Logger
evtDialError event.Emitter
backoff backoff.BackoffFactory
logger *zap.Logger
}

type subscription struct {
Expand Down Expand Up @@ -159,11 +155,6 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) {
// Start attempts to connect to the peers passed in by peerCh.
// Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
var err error
c.evtDialError, err = c.host.EventBus().Emitter(new(utils.DialError))
if err != nil {
return err
}
return c.CommonDiscoveryService.Start(ctx, c.start)

}
Expand All @@ -179,9 +170,7 @@ func (c *PeerConnectionStrategy) start() error {

// Stop terminates the peer-connector
func (c *PeerConnectionStrategy) Stop() {
c.CommonDiscoveryService.Stop(func() {
c.evtDialError.Close()
})
c.CommonDiscoveryService.Stop(func() {})
}

func (c *PeerConnectionStrategy) isPaused() bool {
Expand Down Expand Up @@ -287,19 +276,10 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
defer cancel()
err := c.host.Connect(ctx, pi)
c.HandleDialError(err, pi.ID)
if err != nil {
c.pm.HandleDialError(err, pi.ID)
return
}
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
<-sem
}

func (c *PeerConnectionStrategy) HandleDialError(err error, peerID peer.ID) {
if err != nil && !errors.Is(err, context.Canceled) {
c.addConnectionBackoff(peerID)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
c.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
emitterErr := c.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
if emitterErr != nil {
c.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
}
31 changes: 31 additions & 0 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -87,6 +88,7 @@ type PeerManager struct {
TopicHealthNotifCh chan<- TopicHealthStatus
rttCache *FastestPeerSelector
RelayEnabled bool
evtDialError event.Emitter
}

// PeerSelection provides various options based on which Peer is selected from a list of peers.
Expand Down Expand Up @@ -249,6 +251,14 @@ func (pm *PeerManager) Start(ctx context.Context) {
go pm.connectivityLoop(ctx)
}
go pm.peerStoreLoop(ctx)

if pm.host != nil {
var err error
pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError))
if err != nil {
pm.logger.Error("failed to create dial error emitter", zap.Error(err))
}
}
}

func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
Expand Down Expand Up @@ -719,3 +729,24 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
pm.serviceSlots.getPeers(proto).add(peerID)
}

func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
if err == nil {
return
}
if !errors.Is(err, context.Canceled) {
if pm.peerConnector != nil {
pm.peerConnector.addConnectionBackoff(peerID)
}
if pm.host != nil {
pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
}
pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
if pm.evtDialError != nil {
emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
if emitterErr != nil {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
}
}
4 changes: 2 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peerID)
if wf.pm != nil {
wf.pm.HandleDialError(err, peerID)
}
return err
}
Expand Down
8 changes: 5 additions & 3 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
Expand All @@ -38,6 +38,7 @@ type (
log *zap.Logger
*service.CommonService
subscriptions *SubscribersMap
pm *peermanager.PeerManager

maxSubscriptions int
}
Expand All @@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
wf.maxSubscriptions = params.MaxSubscribers
if params.pm != nil {
params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField)
wf.pm = params.pm
}
return wf
}
Expand Down Expand Up @@ -274,8 +276,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
wf.metrics.RecordError(pushTimeoutFailure)
} else {
wf.metrics.RecordError(dialFailure)
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peerID)
if wf.pm != nil {
wf.pm.HandleDialError(err, peerID)
}
}
logger.Error("opening peer stream", zap.Error(err))
Expand Down
5 changes: 2 additions & 3 deletions waku/v2/protocol/legacy_store/waku_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor

stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
store.metrics.RecordError(dialFailure)
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(selectedPeer)
if store.pm != nil {
store.pm.HandleDialError(err, selectedPeer)
}
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p

stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure)
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(peerID)
if wakuLP.pm != nil {
wakuLP.pm.HandleDialError(err, peerID)
}
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion waku/v2/protocol/lightpush/waku_lightpush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/peermanager"

"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -264,6 +265,9 @@ func TestWakuLightPushCornerCases(t *testing.T) {

// Prepare peer manager instance to include in test
pm := peermanager.NewPeerManager(10, 10, nil, nil, true, utils.Logger())
pc, err := peermanager.NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 10*time.Second, utils.Logger())
require.NoError(t, err)
pm.SetPeerConnector(pc)

node1, sub1, host1 := makeWakuRelay(t, testTopic)
defer node1.Stop()
Expand All @@ -275,7 +279,7 @@ func TestWakuLightPushCornerCases(t *testing.T) {

lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger())
lightPushNode2.SetHost(host2)
err := lightPushNode2.Start(ctx)
err = lightPushNode2.Start(ctx)
require.NoError(t, err)
defer lightPushNode2.Stop()

Expand Down
4 changes: 1 addition & 3 deletions waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts

stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
if err != nil {
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(params.selectedPeer)
}
wakuPX.pm.HandleDialError(err, params.selectedPeer)
return err
}

Expand Down
5 changes: 1 addition & 4 deletions waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe

stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(params.selectedPeer)
}
s.pm.HandleDialError(err, params.selectedPeer)
return nil, err
}

Expand Down

0 comments on commit 2a730ab

Please sign in to comment.