From fe24824e458717bd4ca844cd94be3be34611b239 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 19 Jun 2023 13:41:15 +0200 Subject: [PATCH 01/14] refactor: unicast router --- internal/testutil/proxy.go | 3 + p2p/config/config.go | 44 ++++++++++- p2p/raintree/logging.go | 31 -------- p2p/raintree/router.go | 147 ++++++++++++------------------------- p2p/raintree/testutil.go | 22 +++++- p2p/types/router.go | 2 +- p2p/unicast/logging.go | 44 +++++++++++ p2p/unicast/router.go | 139 +++++++++++++++++++++++++++++++++++ p2p/unicast/testutil.go | 10 +++ 9 files changed, 306 insertions(+), 136 deletions(-) create mode 100644 internal/testutil/proxy.go delete mode 100644 p2p/raintree/logging.go create mode 100644 p2p/unicast/logging.go create mode 100644 p2p/unicast/router.go create mode 100644 p2p/unicast/testutil.go diff --git a/internal/testutil/proxy.go b/internal/testutil/proxy.go new file mode 100644 index 000000000..a586a6c05 --- /dev/null +++ b/internal/testutil/proxy.go @@ -0,0 +1,3 @@ +package testutil + +type ProxyFactory[T any] func(target T) (proxy T) diff --git a/p2p/config/config.go b/p2p/config/config.go index e64fe8519..d98f4b969 100644 --- a/p2p/config/config.go +++ b/p2p/config/config.go @@ -4,9 +4,19 @@ import ( "fmt" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/protocol" + "go.uber.org/multierr" + "github.com/pokt-network/pocket/p2p/providers" + typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/shared/crypto" - "go.uber.org/multierr" + "github.com/pokt-network/pocket/shared/modules" +) + +var ( + _ typesP2P.RouterConfig = &baseConfig{} + _ typesP2P.RouterConfig = &UnicastRouterConfig{} + _ typesP2P.RouterConfig = &RainTreeConfig{} ) // baseConfig implements `RouterConfig` using the given libp2p host and current @@ -22,6 +32,14 @@ type baseConfig struct { PeerstoreProvider providers.PeerstoreProvider } +type UnicastRouterConfig struct { + Logger *modules.Logger + Host host.Host + ProtocolID protocol.ID + MessageHandler typesP2P.MessageHandler + PeerHandler func(peer typesP2P.Peer) error +} + // BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`. type BackgroundConfig struct { Host host.Host @@ -57,6 +75,30 @@ func (cfg *baseConfig) IsValid() (err error) { if cfg.PeerstoreProvider == nil { err = multierr.Append(err, fmt.Errorf("peerstore provider not configured")) } + return nil +} + +// IsValid implements the respective member of the `RouterConfig` interface. +func (cfg *UnicastRouterConfig) IsValid() (err error) { + if cfg.Logger == nil { + err = multierr.Append(err, fmt.Errorf("logger not configured")) + } + + if cfg.Host == nil { + err = multierr.Append(err, fmt.Errorf("host not configured")) + } + + if cfg.ProtocolID == "" { + err = multierr.Append(err, fmt.Errorf("protocol id not configured")) + } + + if cfg.MessageHandler == nil { + err = multierr.Append(err, fmt.Errorf("message handler not configured")) + } + + if cfg.PeerHandler == nil { + err = multierr.Append(err, fmt.Errorf("peer handler not configured")) + } return err } diff --git a/p2p/raintree/logging.go b/p2p/raintree/logging.go deleted file mode 100644 index bbe3e6c3b..000000000 --- a/p2p/raintree/logging.go +++ /dev/null @@ -1,31 +0,0 @@ -package raintree - -import ( - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" - - "github.com/pokt-network/pocket/logger" - "github.com/pokt-network/pocket/p2p/utils" -) - -// logStream logs the incoming stream and its scope stats -func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) { - rtr.logStreamScopeStats(stream) - - remotePeer, err := utils.PeerFromLibp2pStream(stream) - if err != nil { - rtr.logger.Debug().Err(err).Msg("getting remote remotePeer") - } else { - utils.LogIncomingMsg(rtr.logger, rtr.getHostname(), remotePeer) - } -} - -// logStreamScopeStats logs the incoming stream's scope stats -// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.27.0/core/network#StreamScope) -func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { - if err := utils.LogScopeStatFactory( - &logger.Global.Logger, - "stream scope (read-side)", - )(stream.Scope()); err != nil { - rtr.logger.Debug().Err(err).Msg("logging stream scope stats") - } -} diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index 707f0a0a9..a68fd293d 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -2,11 +2,8 @@ package raintree import ( "fmt" - "io" - "time" - libp2pHost "github.com/libp2p/go-libp2p/core/host" - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/pokt-network/pocket/p2p/unicast" "google.golang.org/protobuf/proto" "github.com/pokt-network/pocket/logger" @@ -22,15 +19,9 @@ import ( "github.com/pokt-network/pocket/shared/messaging" "github.com/pokt-network/pocket/shared/modules" "github.com/pokt-network/pocket/shared/modules/base_modules" - telemetry "github.com/pokt-network/pocket/telemetry" + "github.com/pokt-network/pocket/telemetry" ) -// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. -// TECHDEBT(#629): parameterize and expose via config. -// readStreamTimeout is the duration to wait for a read operation on a -// stream to complete, after which the stream is closed ("timed out"). -const readStreamTimeout = time.Second * 10 - var ( _ typesP2P.Router = &rainTreeRouter{} _ modules.IntegratableModule = &rainTreeRouter{} @@ -41,10 +32,11 @@ type rainTreeFactory = modules.FactoryWithConfig[typesP2P.Router, *config.RainTr type rainTreeRouter struct { base_modules.IntegratableModule + unicast.UnicastRouter logger *modules.Logger // handler is the function to call when a message is received. - handler typesP2P.RouterHandler + handler typesP2P.MessageHandler // host represents a libp2p libp2pNetwork node, it encapsulates a libp2p peerstore // & connection manager. `libp2p.New` configures and starts listening // according to options. @@ -84,7 +76,6 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type return nil, err } - rtr.host.SetStreamHandler(protocol.PoktProtocolID, rtr.handleStream) return typesP2P.Router(rtr), nil } @@ -191,9 +182,11 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres return nil } -// handleRainTreeMsg handles a RainTree message, continuing broadcast propagation -// if applicable. Returns the serialized `PocketEnvelope` data contained within. -func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { +// handleRainTreeMsg deserializes a RainTree message to extract the `PocketEnvelope` +// bytes contained within, continues broadcast propagation, if applicable, and +// passes them off to the application by calling the configured `rtr.handler`. +// Intended to be called in a go routine. +func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error { blockHeightInt := rtr.GetBus().GetConsensusModule().CurrentHeight() blockHeight := fmt.Sprintf("%d", blockHeightInt) @@ -207,25 +200,36 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { ) var rainTreeMsg typesP2P.RainTreeMessage - if err := proto.Unmarshal(data, &rainTreeMsg); err != nil { - return nil, err + if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil { + return err } + // TECHDEBT(#763): refactor as "pre-propagation validation" networkMessage := messaging.PocketEnvelope{} if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil { rtr.logger.Error().Err(err).Msg("Error decoding network message") - return nil, err + return err } + // -- // Continue RainTree propagation if rainTreeMsg.Level > 0 { if err := rtr.broadcastAtLevel(rainTreeMsg.Data, rainTreeMsg.Level-1); err != nil { - return nil, err + return err } } - // Return the data back to the caller so it can be handled by the app specific bus - return rainTreeMsg.Data, nil + // There was no error, but we don't need to forward this to the app-specific bus. + // For example, the message has already been handled by the application. + if rainTreeMsg.Data == nil { + return nil + } + + // call configured handler to forward to app-specific bus + if err := rtr.handler(rainTreeMsg.Data); err != nil { + rtr.logger.Error().Err(err).Msg("handling raintree message") + } + return nil } // GetPeerstore implements the respective member of `typesP2P.Router`. @@ -270,94 +274,41 @@ func (rtr *rainTreeRouter) Size() int { return rtr.peersManager.GetPeerstore().Size() } -// handleStream ensures the peerstore contains the remote peer and then reads -// the incoming stream in a new go routine. -func (rtr *rainTreeRouter) handleStream(stream libp2pNetwork.Stream) { - rtr.logger.Debug().Msg("handling incoming stream") - peer, err := utils.PeerFromLibp2pStream(stream) - if err != nil { - rtr.logger.Error().Err(err). - Str("address", peer.GetAddress().String()). - Msg("parsing remote peer identity") - - if err = stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream") - } - return - } - - if err := rtr.AddPeer(peer); err != nil { - rtr.logger.Error().Err(err). - Str("address", peer.GetAddress().String()). - Msg("adding remote peer to router") - } - - go rtr.readStream(stream) +// shouldSendToTarget returns false if target is self. +func shouldSendToTarget(target target) bool { + return !target.isSelf } -// readStream reads the incoming stream, extracts the serialized `PocketEnvelope` -// data from the incoming `RainTreeMessage`, and passes it to the application by -// calling the configured `rtr.handler`. Intended to be called in a go routine. -func (rtr *rainTreeRouter) readStream(stream libp2pNetwork.Stream) { - // Time out if no data is sent to free resources. - // NB: tests using libp2p's `mocknet` rely on this not returning an error. - if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { - // `SetReadDeadline` not supported by `mocknet` streams. - rtr.logger.Error().Err(err).Msg("setting stream read deadline") +func (rtr *rainTreeRouter) setupUnicastRouter() error { + unicastRouterCfg := config.UnicastRouterConfig{ + Logger: rtr.logger, + Host: rtr.host, + ProtocolID: protocol.PoktProtocolID, + MessageHandler: rtr.handleRainTreeMsg, + PeerHandler: rtr.AddPeer, } - // log incoming stream - rtr.logStream(stream) - - // read stream - rainTreeMsgBz, err := io.ReadAll(stream) + unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg) if err != nil { - rtr.logger.Error().Err(err).Msg("reading from stream") - if err := stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") - } - return - } - - // done reading; reset to signal this to remote peer - // NB: failing to reset the stream can easily max out the number of available - // network connections on the receiver's side. - if err := stream.Reset(); err != nil { - rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") - } - - // extract `PocketEnvelope` from `RainTreeMessage` (& continue propagation) - poktEnvelopeBz, err := rtr.handleRainTreeMsg(rainTreeMsgBz) - if err != nil { - rtr.logger.Error().Err(err).Msg("handling raintree message") - return - } - - // There was no error, but we don't need to forward this to the app-specific bus. - // For example, the message has already been handled by the application. - if poktEnvelopeBz == nil { - return - } - - // call configured handler to forward to app-specific bus - if err := rtr.handler(poktEnvelopeBz); err != nil { - rtr.logger.Error().Err(err).Msg("handling pocket envelope") + return fmt.Errorf("setting up unicast router: %w", err) } -} -// shouldSendToTarget returns false if target is self. -func shouldSendToTarget(target target) bool { - return !target.isSelf + rtr.UnicastRouter = *unicastRouter + return nil } func (rtr *rainTreeRouter) setupDependencies() error { + if err := rtr.setupUnicastRouter(); err != nil { + return err + } + pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight()) if err != nil { - return err + return fmt.Errorf("getting staked peerstore: %w", err) } if err := rtr.setupPeerManager(pstore); err != nil { - return err + return fmt.Errorf("setting up peer manager: %w", err) } if err := utils.PopulateLibp2pHost(rtr.host, pstore); err != nil { @@ -374,9 +325,3 @@ func (rtr *rainTreeRouter) setupPeerManager(pstore typesP2P.Peerstore) (err erro func (rtr *rainTreeRouter) getHostname() string { return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname } - -// newReadStreamDeadline returns a future deadline -// based on the read stream timeout duration. -func newReadStreamDeadline() time.Time { - return time.Now().Add(readStreamTimeout) -} diff --git a/p2p/raintree/testutil.go b/p2p/raintree/testutil.go index ebcb464b5..f12ef0a58 100644 --- a/p2p/raintree/testutil.go +++ b/p2p/raintree/testutil.go @@ -2,12 +2,30 @@ package raintree -import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" +import ( + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/regen-network/gocuke" + + "github.com/pokt-network/pocket/internal/testutil" + typesP2P "github.com/pokt-network/pocket/p2p/types" +) // RainTreeRouter exports `rainTreeRouter` for testing purposes. type RainTreeRouter = rainTreeRouter +type routerHandlerProxyFactory = testutil.ProxyFactory[typesP2P.MessageHandler] + // HandleStream exports `rainTreeRouter#handleStream` for testing purposes. func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) { - rtr.handleStream(stream) + rtr.UnicastRouter.HandleStream(stream) +} +func (rtr *rainTreeRouter) HandlerProxy( + t gocuke.TestingT, + handlerProxyFactory routerHandlerProxyFactory, +) { + t.Helper() + + // pass original handler to proxy factory & replace it with the proxy + origHandler := rtr.handler + rtr.handler = handlerProxyFactory(origHandler) } diff --git a/p2p/types/router.go b/p2p/types/router.go index f9c7f2d74..21a2b8010 100644 --- a/p2p/types/router.go +++ b/p2p/types/router.go @@ -22,7 +22,7 @@ type Router interface { RemovePeer(peer Peer) error } -type RouterHandler func(data []byte) error +type MessageHandler func(data []byte) error // RouterConfig is used to configure `Router` implementations and to test a // given configuration's validity. diff --git a/p2p/unicast/logging.go b/p2p/unicast/logging.go new file mode 100644 index 000000000..7e2539c63 --- /dev/null +++ b/p2p/unicast/logging.go @@ -0,0 +1,44 @@ +package unicast + +import ( + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + + "github.com/pokt-network/pocket/logger" + "github.com/pokt-network/pocket/p2p/utils" +) + +// TECHDEBT(#830): it would be nice to have at least one more degree of freedom with which +// to limit logging in areas where it is known to be excessive / high frequency. +// Especially applicable to debug log lines which only contribute in edge cases, +// unusual circumstances, or regressions (e.g. hitting OS resource limits because +// of too many concurrent streams). +// +// This could ultimately be actuated from the CLI via flags, configs, and/or env +// vars. Initially, weo could consider coupling to a `--verbose` persistent flag. +// + +// logStream logs the incoming stream and its scope stats +func (rtr *UnicastRouter) logStream(stream libp2pNetwork.Stream) { + rtr.logStreamScopeStats(stream) + + remotePeer, err := utils.PeerFromLibp2pStream(stream) + if err != nil { + rtr.logger.Debug().Err(err).Msg("getting remote remotePeer") + } else { + utils.LogIncomingMsg(rtr.logger, rtr.getHostname(), remotePeer) + } +} + +// logStreamScopeStats logs the incoming stream's scope stats +// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.27.0/core/network#StreamScope) +func (rtr *UnicastRouter) logStreamScopeStats(stream libp2pNetwork.Stream) { + if err := utils.LogScopeStatFactory( + &logger.Global.Logger, + "stream scope (read-side)", + )(stream.Scope()); err != nil { + rtr.logger.Debug().Err(err).Msg("logging stream scope stats") + } +} +func (rtr *UnicastRouter) getHostname() string { + return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname +} diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go new file mode 100644 index 000000000..fcb8112d8 --- /dev/null +++ b/p2p/unicast/router.go @@ -0,0 +1,139 @@ +package unicast + +import ( + "io" + "time" + + libp2pHost "github.com/libp2p/go-libp2p/core/host" + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + + "github.com/pokt-network/pocket/p2p/config" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/shared/modules" + "github.com/pokt-network/pocket/shared/modules/base_modules" +) + +// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions. +// TECHDEBT(#629): parameterize and expose via config. +// readStreamTimeout is the duration to wait for a read operation on a +// stream to complete, after which the stream is closed ("timed out"). +const readStreamTimeout = time.Second * 10 + +var _ unicastRouterFactory = &UnicastRouter{} + +// TODO_THIS_COMMIT: consider defining/(re)using `RouterFactory` type +type unicastRouterFactory = modules.FactoryWithConfig[*UnicastRouter, *config.UnicastRouterConfig] + +type UnicastRouter struct { + base_modules.IntegratableModule + + logger *modules.Logger + // messageHandler is the function to call when a message is received. + // host represents a libp2p network node, it encapsulates a libp2p peerstore + // & connection manager. `libp2p.New` configures and starts listening + // according to options. + // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) + host libp2pHost.Host + messageHandler typesP2P.MessageHandler + // TODO_THIS_COMMIT: consider defining alongside `MessageHandler` type + // OR removing `MessageHandler` type + peerHandler func(peer typesP2P.Peer) error +} + +func Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { + return new(UnicastRouter).Create(bus, cfg) +} + +func (*UnicastRouter) Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { + if err := cfg.IsValid(); err != nil { + return nil, err + } + + rtr := &UnicastRouter{ + logger: cfg.Logger, + host: cfg.Host, + messageHandler: cfg.MessageHandler, + peerHandler: cfg.PeerHandler, + } + rtr.SetBus(bus) + + // Don't handle incoming streams in client debug mode. + if !rtr.isClientDebugMode() { + rtr.host.SetStreamHandler(cfg.ProtocolID, rtr.handleStream) + } + + return rtr, nil +} + +// handleStream ensures the peerstore contains the remote peer and then reads +// the incoming stream in a new go routine. +func (rtr *UnicastRouter) handleStream(stream libp2pNetwork.Stream) { + rtr.logger.Debug().Msg("handling incoming stream") + peer, err := utils.PeerFromLibp2pStream(stream) + if err != nil { + rtr.logger.Error().Err(err). + Str("address", peer.GetAddress().String()). + Msg("parsing remote peer identity") + + if err = stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream") + } + return + } + + if err := rtr.peerHandler(peer); err != nil { + rtr.logger.Error().Err(err). + Str("address", peer.GetAddress().String()). + Msg("adding remote peer to router") + } + + go rtr.readStream(stream) +} + +// readStream reads the message bytes out of the incoming stream and passes it to +// the configured `rtr.messageHandler`. Intended to be called in a go routine. +func (rtr *UnicastRouter) readStream(stream libp2pNetwork.Stream) { + // Time out if no data is sent to free resources. + // NB: tests using libp2p's `mocknet` rely on this not returning an error. + if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { + // `SetReadDeadline` not supported by `mocknet` streams. + rtr.logger.Error().Err(err).Msg("setting stream read deadline") + } + + // log incoming stream + rtr.logStream(stream) + + // read stream + messageBz, err := io.ReadAll(stream) + if err != nil { + rtr.logger.Error().Err(err).Msg("reading from stream") + if err := stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") + } + return + } + + // done reading; reset to signal this to remote peer + // NB: failing to reset the stream can easily max out the number of available + // network connections on the receiver's side. + if err := stream.Reset(); err != nil { + rtr.logger.Error().Err(err).Msg("resetting stream (read-side)") + } + + if err := rtr.messageHandler(messageBz); err != nil { + rtr.logger.Error().Err(err).Msg("handling message") + return + } +} + +// isClientDebugMode returns the value of `ClientDebugMode` in the base config +func (rtr *UnicastRouter) isClientDebugMode() bool { + return rtr.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode +} + +// newReadStreamDeadline returns a future deadline +// based on the read stream timeout duration. +func newReadStreamDeadline() time.Time { + return time.Now().Add(readStreamTimeout) +} diff --git a/p2p/unicast/testutil.go b/p2p/unicast/testutil.go new file mode 100644 index 000000000..8a3f9caf3 --- /dev/null +++ b/p2p/unicast/testutil.go @@ -0,0 +1,10 @@ +//go:build test + +package unicast + +import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + +// HandleStream exports `unicastRouter#handleStream` for testing purposes. +func (rtr *UnicastRouter) HandleStream(stream libp2pNetwork.Stream) { + rtr.handleStream(stream) +} From 1277859b624ac503a5193826b5bdb111d5a65235 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 22 Jun 2023 17:38:06 +0200 Subject: [PATCH 02/14] chore: cleanup TODOs --- p2p/unicast/router.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go index fcb8112d8..bec52b36b 100644 --- a/p2p/unicast/router.go +++ b/p2p/unicast/router.go @@ -22,7 +22,6 @@ const readStreamTimeout = time.Second * 10 var _ unicastRouterFactory = &UnicastRouter{} -// TODO_THIS_COMMIT: consider defining/(re)using `RouterFactory` type type unicastRouterFactory = modules.FactoryWithConfig[*UnicastRouter, *config.UnicastRouterConfig] type UnicastRouter struct { @@ -36,8 +35,6 @@ type UnicastRouter struct { // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) host libp2pHost.Host messageHandler typesP2P.MessageHandler - // TODO_THIS_COMMIT: consider defining alongside `MessageHandler` type - // OR removing `MessageHandler` type peerHandler func(peer typesP2P.Peer) error } From 048e3065b451fa4029ea461e4f2418a679ade47f Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 23 Jun 2023 11:40:22 +0200 Subject: [PATCH 03/14] fix: gofmt --- p2p/unicast/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go index bec52b36b..88eef4929 100644 --- a/p2p/unicast/router.go +++ b/p2p/unicast/router.go @@ -35,7 +35,7 @@ type UnicastRouter struct { // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) host libp2pHost.Host messageHandler typesP2P.MessageHandler - peerHandler func(peer typesP2P.Peer) error + peerHandler func(peer typesP2P.Peer) error } func Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { From 9ab2a5d05da5083446d3fa2d705111d28c4a539c Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 26 Jun 2023 09:02:37 +0200 Subject: [PATCH 04/14] chore: fix typo in comment --- p2p/unicast/logging.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/unicast/logging.go b/p2p/unicast/logging.go index 7e2539c63..1539b9059 100644 --- a/p2p/unicast/logging.go +++ b/p2p/unicast/logging.go @@ -14,7 +14,7 @@ import ( // of too many concurrent streams). // // This could ultimately be actuated from the CLI via flags, configs, and/or env -// vars. Initially, weo could consider coupling to a `--verbose` persistent flag. +// vars. Initially, we could consider coupling to a `--verbose` persistent flag. // // logStream logs the incoming stream and its scope stats From dce1bac973fdc9db3b3084f2fb68a2520f9085d9 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 26 Jun 2023 09:02:51 +0200 Subject: [PATCH 05/14] chore: add debug log --- p2p/raintree/router.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index a68fd293d..c399e85aa 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -222,6 +222,7 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error { // There was no error, but we don't need to forward this to the app-specific bus. // For example, the message has already been handled by the application. if rainTreeMsg.Data == nil { + rtr.logger.Debug().Msg("no data in RainTree message") return nil } From 8dc2852875c81cc6b8bb5641d6a686e68da7fabb Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 26 Jun 2023 09:03:05 +0200 Subject: [PATCH 06/14] chore: fix field comment out of place --- p2p/unicast/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go index 88eef4929..de8112629 100644 --- a/p2p/unicast/router.go +++ b/p2p/unicast/router.go @@ -28,12 +28,12 @@ type UnicastRouter struct { base_modules.IntegratableModule logger *modules.Logger + host libp2pHost.Host // messageHandler is the function to call when a message is received. // host represents a libp2p network node, it encapsulates a libp2p peerstore // & connection manager. `libp2p.New` configures and starts listening // according to options. // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) - host libp2pHost.Host messageHandler typesP2P.MessageHandler peerHandler func(peer typesP2P.Peer) error } From a6d4b520b1e9947b370c2f770c7c38c54ba3ea0c Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 26 Jun 2023 09:25:51 +0200 Subject: [PATCH 07/14] fix: imports --- p2p/raintree/router.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index c399e85aa..60dbcbd86 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -2,6 +2,7 @@ package raintree import ( "fmt" + libp2pHost "github.com/libp2p/go-libp2p/core/host" "github.com/pokt-network/pocket/p2p/unicast" "google.golang.org/protobuf/proto" From c3bc4c7e06e21bd313f88e887f30f26c9a9fa810 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 27 Jun 2023 15:51:27 +0200 Subject: [PATCH 08/14] chore: cleanup unused test utils --- internal/testutil/proxy.go | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 internal/testutil/proxy.go diff --git a/internal/testutil/proxy.go b/internal/testutil/proxy.go deleted file mode 100644 index a586a6c05..000000000 --- a/internal/testutil/proxy.go +++ /dev/null @@ -1,3 +0,0 @@ -package testutil - -type ProxyFactory[T any] func(target T) (proxy T) From 3fdada607bec530abda34c17d9e1134ec809e05a Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 28 Jun 2023 09:13:18 +0200 Subject: [PATCH 09/14] chore: comment cleanup Co-authored-by: @Olshansk --- p2p/raintree/router.go | 3 +-- p2p/unicast/logging.go | 1 - p2p/unicast/router.go | 8 ++++++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index 60dbcbd86..01eab6d77 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -211,7 +211,6 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error { rtr.logger.Error().Err(err).Msg("Error decoding network message") return err } - // -- // Continue RainTree propagation if rainTreeMsg.Level > 0 { @@ -227,7 +226,7 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error { return nil } - // call configured handler to forward to app-specific bus + // Call configured message handler with the serialized `PocketEnvelope`. if err := rtr.handler(rainTreeMsg.Data); err != nil { rtr.logger.Error().Err(err).Msg("handling raintree message") } diff --git a/p2p/unicast/logging.go b/p2p/unicast/logging.go index 1539b9059..4a7f4faaf 100644 --- a/p2p/unicast/logging.go +++ b/p2p/unicast/logging.go @@ -15,7 +15,6 @@ import ( // // This could ultimately be actuated from the CLI via flags, configs, and/or env // vars. Initially, we could consider coupling to a `--verbose` persistent flag. -// // logStream logs the incoming stream and its scope stats func (rtr *UnicastRouter) logStream(stream libp2pNetwork.Stream) { diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go index de8112629..ebd5a4e7b 100644 --- a/p2p/unicast/router.go +++ b/p2p/unicast/router.go @@ -53,6 +53,10 @@ func (*UnicastRouter) Create(bus modules.Bus, cfg *config.UnicastRouterConfig) ( messageHandler: cfg.MessageHandler, peerHandler: cfg.PeerHandler, } + + // `UnicastRouter` is not a submodule and therefore does not register with the + // module registry. However, as it does depend on the bus and therefore MUST + // embed the base `IntegrableModule` and call `#SetBus()`. rtr.SetBus(bus) // Don't handle incoming streams in client debug mode. @@ -73,6 +77,9 @@ func (rtr *UnicastRouter) handleStream(stream libp2pNetwork.Stream) { Str("address", peer.GetAddress().String()). Msg("parsing remote peer identity") + // Reset stream to signal the sender to give up and move on. + // NB: failing to reset the stream can easily max out the number of available + // network connections on the receiver's side. if err = stream.Reset(); err != nil { rtr.logger.Error().Err(err).Msg("resetting stream") } @@ -85,6 +92,7 @@ func (rtr *UnicastRouter) handleStream(stream libp2pNetwork.Stream) { Msg("adding remote peer to router") } + // concurrently read messages out of incoming streams for handling. go rtr.readStream(stream) } From 39a78773a9b1012694986afc028591592b169d29 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 28 Jun 2023 09:13:33 +0200 Subject: [PATCH 10/14] chore: add submodule TECHDEBT comments --- p2p/module.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/p2p/module.go b/p2p/module.go index d9e164fc7..5f87d5b6e 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -43,11 +43,17 @@ type p2pModule struct { identity libp2p.Option listenAddrs libp2p.Option + // TECHDEBT(#810): register the providers to the module registry instead of + // holding a reference in the module struct and passing via router config. + // // Assigned during creation via `#setupDependencies()`. currentHeightProvider providers.CurrentHeightProvider pstoreProvider providers.PeerstoreProvider nonceDeduper *mempool.GenericFIFOSet[uint64, uint64] + // TECHDEBT(#810): register the routers to the module registry instead of + // holding a reference in the module struct. This will improve testability. + // // Assigned during `#Start()`. TLDR; `host` listens on instantiation. // and `router` depends on `host`. router typesP2P.Router @@ -252,6 +258,9 @@ func (m *p2pModule) setupPeerstoreProvider() error { if !ok { return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule) } + + // TECHDEBT(#810): register the provider to the module registry instead of + // holding a reference in the module struct and passing via router config. m.pstoreProvider = pstoreProvider return nil @@ -260,6 +269,7 @@ func (m *p2pModule) setupPeerstoreProvider() error { // setupCurrentHeightProvider attempts to retrieve the current height provider // from the bus registry, falls back to the consensus module if none is registered. func (m *p2pModule) setupCurrentHeightProvider() error { + // TECHDEBT(#810): simplify once submodules are more convenient to retrieve. m.logger.Debug().Msg("setupCurrentHeightProvider") currentHeightProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(current_height_provider.ModuleName) if err != nil { @@ -276,6 +286,9 @@ func (m *p2pModule) setupCurrentHeightProvider() error { if !ok { return fmt.Errorf("unexpected current height provider type: %T", currentHeightProviderModule) } + + // TECHDEBT(#810): register the provider to the module registry instead of + // holding a reference in the module struct and passing via router config. m.currentHeightProvider = currentHeightProvider return nil @@ -294,6 +307,8 @@ func (m *p2pModule) setupNonceDeduper() error { // setupRouter instantiates the configured router implementation. func (m *p2pModule) setupRouter() (err error) { + // TECHDEBT(#810): register the router to the module registry instead of + // holding a reference in the module struct. This will improve testability. m.router, err = raintree.NewRainTreeRouter( m.GetBus(), &config.RainTreeConfig{ From 049cbf5e03fca4831b84cc6cb50a3c6a4c0a038a Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 28 Jun 2023 09:13:44 +0200 Subject: [PATCH 11/14] chore: add missing godoc comments --- p2p/raintree/router.go | 2 ++ p2p/types/router.go | 15 +++++++++++++-- p2p/unicast/router.go | 5 ++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index 01eab6d77..3b0c59b34 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -259,6 +259,7 @@ func (rtr *rainTreeRouter) AddPeer(peer typesP2P.Peer) error { return nil } +// RemovePeer implements the respective member of `typesP2P.Router`. func (rtr *rainTreeRouter) RemovePeer(peer typesP2P.Peer) error { rtr.peersManager.HandleEvent( typesP2P.PeerManagerEvent{ @@ -280,6 +281,7 @@ func shouldSendToTarget(target target) bool { return !target.isSelf } +// setupUnicastRouter configures and assigns `rtr.UnicastRouter`. func (rtr *rainTreeRouter) setupUnicastRouter() error { unicastRouterCfg := config.UnicastRouterConfig{ Logger: rtr.logger, diff --git a/p2p/types/router.go b/p2p/types/router.go index 21a2b8010..37080acbd 100644 --- a/p2p/types/router.go +++ b/p2p/types/router.go @@ -15,10 +15,21 @@ type Router interface { Broadcast(data []byte) error Send(data []byte, address cryptoPocket.Address) error - // Address book helpers - // TECHDEBT: simplify - remove `GetPeerstore` + // GetPeerstore is used by the P2P module to update the staked actor router's + // (`RainTreeRouter`) peerstore. + // + // TECHDEBT(#859+): remove the need for this group of interface methods. + // All peer discovery logic should be encapsulated by the router. + // Adopt `HandleEvent(*anypb.Any) error` here instead and forward events + // from P2P module to routers. + // CONSIDERATION: Utility, Conseneus and P2P modules could share an interface + // containing this method (e.g. `BusEventHandler`). GetPeerstore() Peerstore + // AddPeer is used to add a peer to the routers peerstore. It is intended to + // support peer discovery. AddPeer(peer Peer) error + // RemovePeer is used to remove a peer to the routers peerstore. It is used + // during churn to purge offline peers from the routers peerstore. RemovePeer(peer Peer) error } diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go index ebd5a4e7b..bc461003e 100644 --- a/p2p/unicast/router.go +++ b/p2p/unicast/router.go @@ -35,7 +35,10 @@ type UnicastRouter struct { // according to options. // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) messageHandler typesP2P.MessageHandler - peerHandler func(peer typesP2P.Peer) error + // peerHandler is called whenever a new incoming stream is established. + // TECHDEBT(#749,#747): this may not be needed once we've adopted libp2p + // peer IDs and multiaddr natively. + peerHandler func(peer typesP2P.Peer) error } func Create(bus modules.Bus, cfg *config.UnicastRouterConfig) (*UnicastRouter, error) { From 79a1c6e6f517bd464fa53fe168f91a58c8768639 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 28 Jun 2023 09:14:10 +0200 Subject: [PATCH 12/14] chore: cleanup unused garbage --- p2p/raintree/testutil.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/p2p/raintree/testutil.go b/p2p/raintree/testutil.go index f12ef0a58..e2e81b636 100644 --- a/p2p/raintree/testutil.go +++ b/p2p/raintree/testutil.go @@ -4,28 +4,12 @@ package raintree import ( libp2pNetwork "github.com/libp2p/go-libp2p/core/network" - "github.com/regen-network/gocuke" - - "github.com/pokt-network/pocket/internal/testutil" - typesP2P "github.com/pokt-network/pocket/p2p/types" ) // RainTreeRouter exports `rainTreeRouter` for testing purposes. type RainTreeRouter = rainTreeRouter -type routerHandlerProxyFactory = testutil.ProxyFactory[typesP2P.MessageHandler] - // HandleStream exports `rainTreeRouter#handleStream` for testing purposes. func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) { rtr.UnicastRouter.HandleStream(stream) } -func (rtr *rainTreeRouter) HandlerProxy( - t gocuke.TestingT, - handlerProxyFactory routerHandlerProxyFactory, -) { - t.Helper() - - // pass original handler to proxy factory & replace it with the proxy - origHandler := rtr.handler - rtr.handler = handlerProxyFactory(origHandler) -} From a7c4bf6b4658dc3cb6b95e7da9d67f2ff67f513b Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 28 Jun 2023 09:14:19 +0200 Subject: [PATCH 13/14] fix: return error --- p2p/raintree/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index 3b0c59b34..bd0656ef7 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -228,7 +228,7 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error { // Call configured message handler with the serialized `PocketEnvelope`. if err := rtr.handler(rainTreeMsg.Data); err != nil { - rtr.logger.Error().Err(err).Msg("handling raintree message") + return fmt.Errorf("handling raintree message: %w", err) } return nil } From 0cf71de55ea5c5e9418682769d5b1876534a26f5 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 29 Jun 2023 08:58:29 +0200 Subject: [PATCH 14/14] chore: improve comment' Co-authored-by: Daniel Olshansky --- p2p/unicast/router.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/p2p/unicast/router.go b/p2p/unicast/router.go index bc461003e..18a41fc32 100644 --- a/p2p/unicast/router.go +++ b/p2p/unicast/router.go @@ -103,9 +103,11 @@ func (rtr *UnicastRouter) handleStream(stream libp2pNetwork.Stream) { // the configured `rtr.messageHandler`. Intended to be called in a go routine. func (rtr *UnicastRouter) readStream(stream libp2pNetwork.Stream) { // Time out if no data is sent to free resources. - // NB: tests using libp2p's `mocknet` rely on this not returning an error. if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil { - // `SetReadDeadline` not supported by `mocknet` streams. + // Not returning an error for testing purposes; i.e. `SetReadDeadline` is + // not supported by libp2p `mocknet` streams. This should only produce an + // error if a node advertises and listens via an unsupported transport + // protocol, which should never happen in prod. rtr.logger.Error().Err(err).Msg("setting stream read deadline") }