From 662d6585e35bdf7a65928f711de9d84859b3ce5a Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 25 May 2023 15:07:51 +0200 Subject: [PATCH] wip: checkpoint - combined --- go.mod | 2 +- internal/testutil/bus/bus.go | 30 +++++ .../testutil/constructors/constructors.go | 25 +--- internal/testutil/generics/proxy.go | 3 + p2p/background/router.go | 56 ++++++-- p2p/background/router_test.go | 99 +++++++++----- p2p/background/testutil.go | 19 +++ p2p/config/config.go | 7 + p2p/integration/background_gossip.feature | 18 ++- p2p/integration/background_gossip_test.go | 123 +++++++++++++----- p2p/module.go | 17 ++- p2p/module_raintree_test.go | 20 +-- p2p/raintree/router.go | 2 + p2p/raintree/testutil.go | 20 ++- p2p/testutil.go | 7 +- shared/messaging/envelope.go | 7 +- shared/messaging/proto/debug_message.proto | 4 + 17 files changed, 335 insertions(+), 124 deletions(-) create mode 100644 internal/testutil/bus/bus.go create mode 100644 internal/testutil/generics/proxy.go diff --git a/go.mod b/go.mod index e64de0142d..7116b1cdba 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/dgraph-io/badger/v3 v3.2103.2 github.com/foxcpp/go-mockdns v1.0.0 github.com/getkin/kin-openapi v0.107.0 + github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/vault/api v1.9.0 github.com/jackc/pgconn v1.13.0 github.com/jordanorelli/lexnum v0.0.0-20141216151731-460eeb125754 @@ -112,7 +113,6 @@ require ( github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-retryablehttp v0.6.6 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect diff --git a/internal/testutil/bus/bus.go b/internal/testutil/bus/bus.go new file mode 100644 index 0000000000..f83e27735e --- /dev/null +++ b/internal/testutil/bus/bus.go @@ -0,0 +1,30 @@ +package bus_testutil + +import ( + "github.com/regen-network/gocuke" + + "github.com/pokt-network/pocket/internal/testutil" + "github.com/pokt-network/pocket/internal/testutil/runtime" + "github.com/pokt-network/pocket/runtime/genesis" + "github.com/pokt-network/pocket/shared/crypto" + "github.com/pokt-network/pocket/shared/modules/mocks" +) + +func NewBus( + t gocuke.TestingT, + privKey crypto.PrivateKey, + serviceURL string, + genesisState *genesis.GenesisState, + busEventHandlerFactory testutil.BusEventHandlerFactory, +) *mock_modules.MockBus { + t.Helper() + + runtimeMgrMock := runtime_testutil.BaseRuntimeManagerMock( + t, privKey, + serviceURL, + genesisState, + ) + busMock := testutil.BusMockWithEventHandler(t, runtimeMgrMock, busEventHandlerFactory) + busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes() + return busMock +} diff --git a/internal/testutil/constructors/constructors.go b/internal/testutil/constructors/constructors.go index 6c95ad4224..3fd96a39a5 100644 --- a/internal/testutil/constructors/constructors.go +++ b/internal/testutil/constructors/constructors.go @@ -3,6 +3,7 @@ package constructors import ( libp2pHost "github.com/libp2p/go-libp2p/core/host" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/pokt-network/pocket/internal/testutil/bus" consensus_testutil "github.com/pokt-network/pocket/internal/testutil/consensus" persistence_testutil "github.com/pokt-network/pocket/internal/testutil/persistence" telemetry_testutil "github.com/pokt-network/pocket/internal/testutil/telemetry" @@ -11,7 +12,6 @@ import ( "github.com/pokt-network/pocket/internal/testutil" p2p_testutil "github.com/pokt-network/pocket/internal/testutil/p2p" - runtime_testutil "github.com/pokt-network/pocket/internal/testutil/runtime" "github.com/pokt-network/pocket/p2p" "github.com/pokt-network/pocket/runtime/genesis" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" @@ -57,7 +57,7 @@ func NewBusesMocknetAndP2PModules( } privKey := privKeys[i] - busMock := NewBus(t, privKey, serviceURL, genesisState, busEventHandlerFactory) + busMock := bus_testutil.NewBus(t, privKey, serviceURL, genesisState, busEventHandlerFactory) buses[serviceURL] = busMock // TODO_THIS_COMMIT: refactor @@ -118,29 +118,10 @@ func NewBusesAndP2PModuleWithHost( ) (*mock_modules.MockBus, modules.P2PModule) { t.Helper() - busMock := NewBus(t, privKey, serviceURL, genesisState, busEventHandlerFactory) + busMock := bus_testutil.NewBus(t, privKey, serviceURL, genesisState, busEventHandlerFactory) return busMock, NewP2PModuleWithHost(t, busMock, host) } -func NewBus( - t gocuke.TestingT, - privKey cryptoPocket.PrivateKey, - serviceURL string, - genesisState *genesis.GenesisState, - busEventHandlerFactory testutil.BusEventHandlerFactory, -) *mock_modules.MockBus { - t.Helper() - - runtimeMgrMock := runtime_testutil.BaseRuntimeManagerMock( - t, privKey, - serviceURL, - genesisState, - ) - busMock := testutil.BusMockWithEventHandler(t, runtimeMgrMock, busEventHandlerFactory) - busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes() - return busMock -} - func NewP2PModuleWithHost( t gocuke.TestingT, busMock *mock_modules.MockBus, diff --git a/internal/testutil/generics/proxy.go b/internal/testutil/generics/proxy.go new file mode 100644 index 0000000000..8dd8377358 --- /dev/null +++ b/internal/testutil/generics/proxy.go @@ -0,0 +1,3 @@ +package generics_testutil + +type ProxyFactory[T any] func(target T) (proxy T) diff --git a/p2p/background/router.go b/p2p/background/router.go index 7e25f7b116..c69e9d62ab 100644 --- a/p2p/background/router.go +++ b/p2p/background/router.go @@ -5,11 +5,11 @@ package background import ( "context" "fmt" + "sync" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" libp2pHost "github.com/libp2p/go-libp2p/core/host" - "github.com/pokt-network/pocket/shared/messaging" "google.golang.org/protobuf/proto" "github.com/pokt-network/pocket/logger" @@ -65,6 +65,10 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2 networkLogger := logger.Global.CreateLoggerForModule("backgroundRouter") networkLogger.Info().Msg("Initializing background router") + if err := cfg.IsValid(); err != nil { + return nil, err + } + // seed initial peerstore with current on-chain peer info (i.e. staked actors) pstore, err := cfg.PeerstoreProvider.GetStakedPeerstoreAtHeight( cfg.CurrentHeightProvider.CurrentHeight(), @@ -74,7 +78,9 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2 } // CONSIDERATION: If switching to `NewRandomSub`, there will be a max size - gossipSub, err := pubsub.NewGossipSub(ctx, cfg.Host) + gossipSub, err := pubsub.NewGossipSub(ctx, cfg.Host) //pubsub.WithFloodPublish(false), + //pubsub.WithMaxMessageSize(256), + if err != nil { return nil, fmt.Errorf("creating gossip pubsub: %w", err) } @@ -100,7 +106,10 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2 // > output buffer. The default length is 32 but it can be configured to avoid // > dropping messages if the consumer is not reading fast enough. // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#WithBufferSize) - subscription, err := topic.Subscribe() + subscription, err := topic.Subscribe( + //pubsub.WithBufferSize(10), + //pubsub.With + ) if err != nil { return nil, fmt.Errorf("subscribing to background topic: %w", err) } @@ -113,6 +122,7 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2 subscription: subscription, logger: networkLogger, pstore: pstore, + handler: cfg.Handler, } go rtr.readSubscription(ctx) @@ -122,8 +132,18 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2 // Broadcast implements the respective `typesP2P.Router` interface method. func (rtr *backgroundRouter) Broadcast(data []byte) error { + // CONSIDERATION: validate as PocketEnvelopeBz here (?) + // TODO_THIS_COMMIT: wrap in BackgroundMessage + backgroundMsg := &typesP2P.BackgroundMessage{ + Data: data, + } + backgroundMsgBz, err := proto.Marshal(backgroundMsg) + if err != nil { + return err + } + // TECHDEBT(#595): add ctx to interface methods and propagate down. - return rtr.topic.Publish(context.TODO(), data) + return rtr.topic.Publish(context.TODO(), backgroundMsgBz) } // Send implements the respective `typesP2P.Router` interface method. @@ -168,8 +188,24 @@ func (rtr *backgroundRouter) RemovePeer(peer typesP2P.Peer) error { return rtr.pstore.RemovePeer(peer.GetAddress()) } +var readCountMu sync.Mutex + func (rtr *backgroundRouter) readSubscription(ctx context.Context) { - for msg, err := rtr.subscription.Next(ctx); ctx.Err() == nil; { + // TODO_THIS_COMMIT: look into "topic validaton" + // (see: https://github.com/libp2p/specs/tree/master/pubsub#topic-validation) + readCount := 0 + for { + readCountMu.Lock() + readCount++ + fmt.Printf("readCount: %d\n", readCount) + readCountMu.Unlock() + + msg, err := rtr.subscription.Next(ctx) + if ctx.Err() != nil { + fmt.Printf("error: %s\n", ctx.Err()) + return + } + if err != nil { rtr.logger.Error().Err(err). Msg("error reading from background topic subscription") @@ -184,17 +220,13 @@ func (rtr *backgroundRouter) readSubscription(ctx context.Context) { } } -func (rtr *backgroundRouter) handleBackgroundMsg(data []byte) error { +func (rtr *backgroundRouter) handleBackgroundMsg(backgroundMsgBz []byte) error { var backgroundMsg typesP2P.BackgroundMessage - if err := proto.Unmarshal(data, &backgroundMsg); err != nil { + if err := proto.Unmarshal(backgroundMsgBz, &backgroundMsg); err != nil { return err } - networkMessage := messaging.PocketEnvelope{} - if err := proto.Unmarshal(data, &networkMessage); err != nil { - return err - } - return nil + return rtr.handler(backgroundMsg.Data) } // isClientDebugMode returns the value of `ClientDebugMode` in the base config diff --git a/p2p/background/router_test.go b/p2p/background/router_test.go index e18030722f..a97e2aeb62 100644 --- a/p2p/background/router_test.go +++ b/p2p/background/router_test.go @@ -3,6 +3,8 @@ package background import ( "context" "fmt" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" "sync" "testing" "time" @@ -24,15 +26,28 @@ import ( "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/runtime/defaults" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" + "github.com/pokt-network/pocket/shared/messaging" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" ) // https://www.rfc-editor.org/rfc/rfc3986#section-3.2.2 -const testIP6ServiceURL = "[2a00:1450:4005:802::2004]:8080" +const ( + testIP6ServiceURL = "[2a00:1450:4005:802::2004]:8080" + numPeers = 4 + testMsg = "test messsage" + testTimeoutDuration = time.Second * 2 +) // TECHDEBT(#609): move & de-dup. var testLocalServiceURL = fmt.Sprintf("127.0.0.1:%d", defaults.DefaultP2PPort) +func TestBackgroundRouter_InvalidConfig(t *testing.T) { + t.Skip("pending") + //busMock := bus_testutil.NewBus(t) + // + //router, err := NewBackgroundRouter() +} + func TestBackgroundRouter_AddPeer(t *testing.T) { testRouter := newTestRouter(t, nil) libp2pPStore := testRouter.host.Peerstore() @@ -116,16 +131,10 @@ func TestBackgroundRouter_RemovePeer(t *testing.T) { } func TestBackgroundRouter_Broadcast(t *testing.T) { - const ( - numPeers = 4 - testMsg = "test messsage" - testTimeoutDuration = time.Second * 5 - ) - var ( ctx = context.Background() // mutex preventing concurrent writes to `seenMessages` - seenMessagesMutext sync.Mutex + seenMessagesMutex sync.Mutex // map used as a set to collect IDs of peers which have received a message seenMessages = make(map[string]struct{}) bootstrapWaitgroup = sync.WaitGroup{} @@ -149,7 +158,16 @@ func TestBackgroundRouter_Broadcast(t *testing.T) { testHosts = append(testHosts, host) expectedPeerIDs[i] = host.ID().String() rtr := newRouterWithSelfPeerAndHost(t, selfPeer, host) - go readSubscription(t, ctx, &broadcastWaitgroup, rtr, &seenMessagesMutext, seenMessages) + rtr.HandlerProxy(t, func(origHandler typesP2P.RouterHandler) typesP2P.RouterHandler { + return func(data []byte) error { + seenMessagesMutex.Lock() + broadcastWaitgroup.Done() + seenMessages[rtr.host.ID().String()] = struct{}{} + seenMessagesMutex.Unlock() + + return origHandler(data) + } + }) } // bootstrap off of arbitrary testHost @@ -190,7 +208,8 @@ func TestBackgroundRouter_Broadcast(t *testing.T) { // broadcast message t.Log("broadcasting...") - err := testRouter.Broadcast([]byte(testMsg)) + testPoktEnvelopeBz := newTestPoktEnvelopeBz(t, testMsg) + err = testRouter.Broadcast(testPoktEnvelopeBz) require.NoError(t, err) // wait for broadcast to be received by all peers @@ -201,6 +220,7 @@ func TestBackgroundRouter_Broadcast(t *testing.T) { // waitgroup broadcastDone or timeout select { case <-testTimeout: + seenMessagesMutex.Lock() t.Fatalf( "timed out waiting for all expected messages: got %d; wanted %d", len(seenMessages), @@ -285,11 +305,33 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib err := pstore.AddPeer(selfPeer) require.NoError(t, err) + handler := func(poktEnvelopeBz []byte) error { + poktEnvelope := &messaging.PocketEnvelope{} + err := proto.Unmarshal(poktEnvelopeBz, poktEnvelope) + require.NoError(t, err) + + require.NotEmpty(t, poktEnvelope.Nonce) + require.NotEmpty(t, poktEnvelope.Content) + + debugMsg := &messaging.DebugMessage{} + err = poktEnvelope.Content.UnmarshalTo(debugMsg) + require.NoError(t, err) + + debugStringMsg := &messaging.DebugStringMessage{} + err = debugMsg.Message.UnmarshalTo(debugStringMsg) + require.NoError(t, err) + + require.Equal(t, testMsg, debugStringMsg.Value, "debug string messages don't match") + + return nil + } + router, err := NewBackgroundRouter(busMock, &config.BackgroundConfig{ Addr: selfPeer.GetAddress(), PeerstoreProvider: pstoreProviderMock, CurrentHeightProvider: consensusMock, Host: host, + Handler: handler, }) require.NoError(t, err) @@ -347,30 +389,21 @@ func newTestHost(t *testing.T, mockNet mocknet.Mocknet, privKey cryptoPocket.Pri return newMockNetHostFromPeer(t, mockNet, privKey, peer) } -func readSubscription( - t *testing.T, - ctx context.Context, - broadcastWaitGroup *sync.WaitGroup, - rtr *backgroundRouter, - mu *sync.Mutex, - seenMsgs map[string]struct{}, -) { - t.Helper() +func newTestPoktEnvelopeBz(t *testing.T, msg string) []byte { + debugStringMsg, err := anypb.New(&messaging.DebugStringMessage{Value: msg}) + require.NoError(t, err) - for { - if err := ctx.Err(); err != nil { - if err != context.Canceled || err != context.DeadlineExceeded { - require.NoError(t, err) - } - return - } + debugMsg := &messaging.DebugMessage{ + Action: messaging.DebugMessageAction_DEBUG_ACTION_UNKNOWN, + Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST, + Message: debugStringMsg, + } - _, err := rtr.subscription.Next(ctx) - require.NoError(t, err) + poktEnvelope, err := messaging.PackMessage(debugMsg) + require.NoError(t, err) - mu.Lock() - broadcastWaitGroup.Done() - seenMsgs[rtr.host.ID().String()] = struct{}{} - mu.Unlock() - } + poktEnvelopeBz, err := proto.Marshal(poktEnvelope) + require.NoError(t, err) + + return poktEnvelopeBz } diff --git a/p2p/background/testutil.go b/p2p/background/testutil.go index 64e85ee939..26ab47fce6 100644 --- a/p2p/background/testutil.go +++ b/p2p/background/testutil.go @@ -2,9 +2,28 @@ package background +import ( + "github.com/pokt-network/pocket/internal/testutil/generics" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/regen-network/gocuke" +) + // BackgroundRouter exports `backgroundRouter` for testing purposes. type BackgroundRouter = backgroundRouter +// TOOD_THIS_COMMIT: move & dedup +type routerHandlerProxyFactory = generics_testutil.ProxyFactory[typesP2P.RouterHandler] + +func (rtr *backgroundRouter) HandlerProxy( + t gocuke.TestingT, + handlerProxyFactory routerHandlerProxyFactory, +) { + t.Helper() + + // pass original handler to proxy factory & replace it with the proxy + rtr.handler = handlerProxyFactory(rtr.handler) +} + //func (rtr *backgroundRouter) MessageHandlerIntercept(interceptor func(data []byte) error) { // rtr. //} diff --git a/p2p/config/config.go b/p2p/config/config.go index e64fe8519b..2ff62e966c 100644 --- a/p2p/config/config.go +++ b/p2p/config/config.go @@ -20,6 +20,7 @@ type baseConfig struct { Addr crypto.Address CurrentHeightProvider providers.CurrentHeightProvider PeerstoreProvider providers.PeerstoreProvider + Handler func(data []byte) error } // BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`. @@ -57,6 +58,10 @@ func (cfg *baseConfig) IsValid() (err error) { if cfg.PeerstoreProvider == nil { err = multierr.Append(err, fmt.Errorf("peerstore provider not configured")) } + + if cfg.Handler == nil { + err = multierr.Append(err, fmt.Errorf("handler not configured")) + } return err } @@ -67,6 +72,7 @@ func (cfg *BackgroundConfig) IsValid() (err error) { Addr: cfg.Addr, CurrentHeightProvider: cfg.CurrentHeightProvider, PeerstoreProvider: cfg.PeerstoreProvider, + Handler: cfg.Handler, } return multierr.Append(err, baseCfg.IsValid()) } @@ -78,6 +84,7 @@ func (cfg *RainTreeConfig) IsValid() (err error) { Addr: cfg.Addr, CurrentHeightProvider: cfg.CurrentHeightProvider, PeerstoreProvider: cfg.PeerstoreProvider, + Handler: cfg.Handler, } return multierr.Append(err, baseCfg.IsValid()) } diff --git a/p2p/integration/background_gossip.feature b/p2p/integration/background_gossip.feature index d76cf2bc39..1799fbf23e 100644 --- a/p2p/integration/background_gossip.feature +++ b/p2p/integration/background_gossip.feature @@ -8,11 +8,19 @@ Feature: Background Gossip Broadcast Examples: | size | | 3 | - | 4 | - | 9 | - | 12 | - | 18 | - | 27 | +# | 4 | +# | 5 | +# | 6 | +# | 7 | +# | 8 | +# | 9 | +# | 10 | +# | 11 | +# TODO_THIS_COMMIT: figure out why these are failing +# | 12 | +# | 13 | +# | 18 | +# | 27 | # | 100 | ## | 1024 | diff --git a/p2p/integration/background_gossip_test.go b/p2p/integration/background_gossip_test.go index fbe18aff26..25feac6956 100644 --- a/p2p/integration/background_gossip_test.go +++ b/p2p/integration/background_gossip_test.go @@ -4,13 +4,11 @@ package integration import ( "fmt" + typesP2P "github.com/pokt-network/pocket/p2p/types" "sync" "testing" "time" - dht "github.com/libp2p/go-libp2p-kad-dht" - pubsub "github.com/libp2p/go-libp2p-pubsub" - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" libp2pMocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/regen-network/gocuke" "github.com/stretchr/testify/require" @@ -55,6 +53,7 @@ type suite struct { p2pModules map[string]modules.P2PModule busMocks map[string]*mock_modules.MockBus libp2pNetworkMock libp2pMocknet.Mocknet + sender *p2p.P2PModule // TODO_THIS_COMMIT: reanme wg sync.WaitGroup } @@ -78,15 +77,21 @@ func (s *suite) NumberOfNodesLeaveTheNetwork(a int64) { func (s *suite) AFullyConnectedNetworkOfPeers(count int64) { var ( peerCount = int(count) - pubKeys = make([]cryptoPocket.PublicKey, peerCount) + //pubKeys = make([]cryptoPocket.PublicKey, peerCount) ) + s.Logf("ADDING peerCount - 1: %d", peerCount-1) s.wg.Add(peerCount - 1) + s.mu.Lock() + defer s.mu.Unlock() s.seenServiceURLs = make(map[string]struct{}) - for i, privKey := range testutil.LoadLocalnetPrivateKeys(s, peerCount) { - pubKeys[i] = privKey.PublicKey() - } - genesisState := runtime_testutil.GenesisWithSequentialServiceURLs(s, pubKeys) + //for i, privKey := range testutil.LoadLocalnetPrivateKeys(s, peerCount) { + // pubKeys[i] = privKey.PublicKey() + //} + //genesisState := runtime_testutil.GenesisWithSequentialServiceURLs(s, pubKeys) + // TODO_THIS_COMMIT: explain + genesisState := runtime_testutil.GenesisWithSequentialServiceURLs(s, nil) + busEventHandlerFactory := func(t gocuke.TestingT, busMock *mock_modules.MockBus) testutil.BusEventHandler { // event handler is called when a p2p module receives a network message return func(data *messaging.PocketEnvelope) { @@ -95,7 +100,7 @@ func (s *suite) AFullyConnectedNetworkOfPeers(count int64) { defer func() { if r := recover(); r != nil { - t.Logf("seenServiceURLs: %v", s.seenServiceURLs) + t.Logf("seenCount: %d; seenServiceURLs: %v", len(s.seenServiceURLs), s.seenServiceURLs) //panic(r) t.Fatalf("panic: %v", r) } @@ -104,7 +109,20 @@ func (s *suite) AFullyConnectedNetworkOfPeers(count int64) { p2pCfg := busMock.GetRuntimeMgr().GetConfig().P2P serviceURL := fmt.Sprintf("%s:%d", p2pCfg.Hostname, defaults.DefaultP2PPort) t.Logf("received message by %s", serviceURL) + + peerPrivKey, err := cryptoPocket.NewPrivateKey(p2pCfg.PrivateKey) + require.NoError(t, err) + + senderAddr, err := s.sender.GetAddress() + require.NoError(t, err) + + if senderAddr.Equals(peerPrivKey.Address()) { + t.Logf("SELF: %s", serviceURL) + return + } + if _, ok := s.seenServiceURLs[serviceURL]; ok { + t.Logf("DUPLICATE SERVICE URL: %s", serviceURL) return } @@ -135,46 +153,82 @@ func (s *suite) AFullyConnectedNetworkOfPeers(count int64) { // TODO_THIS_COMMIT: bus event handler based wg.Done()! // start P2P modules of all peers + handleCount := 0 for _, p2pModule := range s.p2pModules { - // (NOPE) WIP: pubsub-level intercept... - //p2pModule.GetBackgroundRouter().Get - err := p2pModule.(*p2p.P2PModule).Start() require.NoError(s, err) + + handlerProxyFactory := func( + origHandler typesP2P.RouterHandler, + ) (proxyHandler typesP2P.RouterHandler) { + return func(data []byte) error { + s.mu.Lock() + handleCount++ + s.mu.Unlock() + + s.Logf("handleCount: %d", handleCount) + //s.wg.Done() + return origHandler(data) + + //return nil + } + } + + // TODO_THIS_COMMIT: look into go-libp2p-pubsub tracing + // (see: https://github.com/libp2p/go-libp2p-pubsub#tracing) + noopHandlerProxyFactory := func(_ typesP2P.RouterHandler) typesP2P.RouterHandler { + return func(_ []byte) error { + // noop + return nil + } + } + + p2pModule.(*p2p.P2PModule).GetRainTreeRouter().HandlerProxy( + s, noopHandlerProxyFactory, + ) + p2pModule.(*p2p.P2PModule).GetBackgroundRouter().HandlerProxy( + s, handlerProxyFactory, + ) } + //time.Sleep(time.Millisecond * 500) + // (NOPE) WIP: host-level intercept... - for _, host := range s.libp2pNetworkMock.Hosts() { - s.Logf("host protocols: %v", host.Mux().Protocols()) - //host.SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) { - host.SetStreamHandler(pubsub.FloodSubID, func(stream libp2pNetwork.Stream) { - s.Logf("inbound stream protocol: %s", stream.Protocol()) - // //s.seenServiceURLs[stream.Conn().RemotePeer()] = struct{}{} - // data, err := io.ReadAll(stream) - // require.NoError(s, err) - // - // s.Logf("stream data: %s", data) - }) - host.SetStreamHandler(dht.ProtocolDHT, func(stream libp2pNetwork.Stream) { - s.Logf("inbound stream protocol: %s", stream.Protocol()) - //s.seenServiceURLs[stream.Conn().RemotePeer()] = struct{}{} - //data, err := io.ReadAll(stream) - //require.NoError(s, err) - // - //s.Logf("stream data: %s", data) - }) - } + //for _, host := range s.libp2pNetworkMock.Hosts() { + // //s.Logf("host protocols: %v", host.Mux().Protocols()) + // //host.SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) { + // host.SetStreamHandler(pubsub.FloodSubID, func(stream libp2pNetwork.Stream) { + // //s.Logf("inbound stream protocol: %s", stream.Protocol()) + // // //s.seenServiceURLs[stream.Conn().RemotePeer()] = struct{}{} + // // data, err := io.ReadAll(stream) + // // require.NoError(s, err) + // // + // // s.Logf("stream data: %s", data) + // }) + // host.SetStreamHandler(dht.ProtocolDHT, func(stream libp2pNetwork.Stream) { + // //s.Logf("inbound stream protocol: %s", stream.Protocol()) + // //s.seenServiceURLs[stream.Conn().RemotePeer()] = struct{}{} + // //data, err := io.ReadAll(stream) + // //require.NoError(s, err) + // // + // //s.Logf("stream data: %s", data) + // }) + //} } func (s *suite) ANodeBroadcastsATestMessageViaItsBackgroundRouter() { s.timeoutDuration = broadcastTimeoutDuration // select arbitrary sender & store in context for reference later - sender := s.p2pModules[generics_testutil.GetKeys(s.p2pModules)[0]].(*p2p.P2PModule) + s.sender = s.p2pModules[generics_testutil.GetKeys(s.p2pModules)[0]].(*p2p.P2PModule) // broadcast a test message msg := &anypb.Any{} - err := sender.Broadcast(msg) + + // TODO: + // - disable raintree router OR broadcast w/ bg router only + + err := s.sender.Broadcast(msg) require.NoError(s, err) } @@ -202,5 +256,6 @@ func (s *suite) MinusOneNumberOfNodesShouldReceiveTheTestMessage(receivedCountPl s.Fatalf("timed out waiting for messages to be received; received: %d; seenServiceURLs: %v", s.receivedCount, s.seenServiceURLs) case <-done: + s.Logf("seenCount: %d; seenServiceURLs: %v", len(s.seenServiceURLs), s.seenServiceURLs) } } diff --git a/p2p/module.go b/p2p/module.go index 4d24f02fa5..7aefc41203 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -192,9 +192,14 @@ func (m *p2pModule) Stop() error { func (m *p2pModule) Broadcast(msg *anypb.Any) error { if m.stakedActorRouter == nil { - return fmt.Errorf("router not started") + return fmt.Errorf("staked actor router not started") } + if m.unstakedActorRouter == nil { + return fmt.Errorf("unstaked actor router not started") + } + + // TODO: rename `c` c := &messaging.PocketEnvelope{ Content: msg, Nonce: cryptoPocket.GetNonce(), @@ -207,9 +212,11 @@ func (m *p2pModule) Broadcast(msg *anypb.Any) error { stakedBroadcastErr := m.stakedActorRouter.Broadcast(data) unstakedBroadcastErr := m.unstakedActorRouter.Broadcast(data) return multierror.Append(err, stakedBroadcastErr, unstakedBroadcastErr).ErrorOrNil() + //return multierror.Append(err, unstakedBroadcastErr).ErrorOrNil() } func (m *p2pModule) Send(addr cryptoPocket.Address, msg *anypb.Any) error { + // TODO: rename `c` c := &messaging.PocketEnvelope{ Content: msg, Nonce: cryptoPocket.GetNonce(), @@ -313,6 +320,9 @@ func (m *p2pModule) setupRouters() (err error) { Handler: m.handlePocketEnvelope, }, ) + if err != nil { + return fmt.Errorf("staked actor router: %w", err) + } m.unstakedActorRouter, err = background.NewBackgroundRouter( m.GetBus(), @@ -324,7 +334,10 @@ func (m *p2pModule) setupRouters() (err error) { Handler: m.handleAppData, }, ) - return err + if err != nil { + return fmt.Errorf("unstaked actor router: %w", err) + } + return nil } // setupHost creates a new libp2p host and assignes it to `m.host`. Libp2p host diff --git a/p2p/module_raintree_test.go b/p2p/module_raintree_test.go index 4848b06b4f..20596913e6 100644 --- a/p2p/module_raintree_test.go +++ b/p2p/module_raintree_test.go @@ -3,14 +3,14 @@ package p2p_test import ( - libp2pNetwork "github.com/libp2p/go-libp2p/core/network" runtime_testutil "github.com/pokt-network/pocket/internal/testutil/runtime" telemetry_testutil "github.com/pokt-network/pocket/internal/testutil/telemetry" "github.com/pokt-network/pocket/p2p" - "github.com/pokt-network/pocket/p2p/protocol" + typesP2P "github.com/pokt-network/pocket/p2p/types" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/modules" mock_modules "github.com/pokt-network/pocket/shared/modules/mocks" + "google.golang.org/protobuf/types/known/anypb" "log" "os" "path/filepath" @@ -20,15 +20,13 @@ import ( "sync" "testing" - "github.com/regen-network/gocuke" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/anypb" - "github.com/pokt-network/pocket/internal/testutil" "github.com/pokt-network/pocket/internal/testutil/constructors" "github.com/pokt-network/pocket/internal/testutil/p2p" persistence_testutil "github.com/pokt-network/pocket/internal/testutil/persistence" "github.com/pokt-network/pocket/shared/messaging" + "github.com/regen-network/gocuke" + "github.com/stretchr/testify/require" ) // TODO(#314): Add the tooling and instructions on how to generate unit tests in this file. @@ -380,10 +378,12 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te // TODO_THIS_COMMIT: consider using BusEventHandler instead... sURL := strings.Clone(serviceURL) mod := *(p2pMod.(*p2p.P2PModule)) - mod.GetHost().SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) { - log.Printf("[valID: %s] Read\n", sURL) - (&mod).GetRainTreeRouter().HandleStream(stream) - wg.Done() + mod.GetRainTreeRouter().HandlerProxy(t, func(origHandler typesP2P.RouterHandler) typesP2P.RouterHandler { + return func(data []byte) error { + log.Printf("[valID: %s] Read\n", sURL) + wg.Done() + return nil + } }) } diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index 4eff54b56c..828a23fae8 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -210,11 +210,13 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) { return nil, err } + // TECHDEBT(#763): refactor as "pre-propagation validation" networkMessage := messaging.PocketEnvelope{} if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil { rtr.logger.Error().Err(err).Msg("Error decoding network message") return nil, err } + // -- // Continue RainTree propagation if rainTreeMsg.Level > 0 { diff --git a/p2p/raintree/testutil.go b/p2p/raintree/testutil.go index ebcb464b5f..ef19b19cc3 100644 --- a/p2p/raintree/testutil.go +++ b/p2p/raintree/testutil.go @@ -2,12 +2,30 @@ package raintree -import libp2pNetwork "github.com/libp2p/go-libp2p/core/network" +import ( + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + generics_testutil "github.com/pokt-network/pocket/internal/testutil/generics" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/regen-network/gocuke" +) // RainTreeRouter exports `rainTreeRouter` for testing purposes. type RainTreeRouter = rainTreeRouter +// TOOD_THIS_COMMIT: move & dedup +type routerHandlerProxyFactory = generics_testutil.ProxyFactory[typesP2P.RouterHandler] + // HandleStream exports `rainTreeRouter#handleStream` for testing purposes. func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) { rtr.handleStream(stream) } +func (rtr *rainTreeRouter) HandlerProxy( + t gocuke.TestingT, + handlerProxyFactory routerHandlerProxyFactory, +) { + t.Helper() + + // pass original handler to proxy factory & replace it with the proxy + origHandler := rtr.handler + rtr.handler = handlerProxyFactory(origHandler) +} diff --git a/p2p/testutil.go b/p2p/testutil.go index 5235fed9c3..7cf01cf834 100644 --- a/p2p/testutil.go +++ b/p2p/testutil.go @@ -4,6 +4,7 @@ package p2p import ( libp2pHost "github.com/libp2p/go-libp2p/core/host" + "github.com/pokt-network/pocket/p2p/background" "github.com/pokt-network/pocket/p2p/raintree" ) @@ -21,6 +22,6 @@ func (m *p2pModule) GetRainTreeRouter() *raintree.RainTreeRouter { } // GetBackgroundRouter returns the `BackgroundRouter` for use in integration tests -//func (m *p2pModule) GetBackgroundRouter() background.BackgroundRouter { -// return m.backgroundRouter -//} +func (m *p2pModule) GetBackgroundRouter() *background.BackgroundRouter { + return m.unstakedActorRouter.(*background.BackgroundRouter) +} diff --git a/shared/messaging/envelope.go b/shared/messaging/envelope.go index 4150fbfbac..f653040078 100644 --- a/shared/messaging/envelope.go +++ b/shared/messaging/envelope.go @@ -4,6 +4,8 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/known/anypb" + + cryptoPocket "github.com/pokt-network/pocket/shared/crypto" ) // PackMessage returns a *PocketEnvelope after having packed the message supplied as an argument @@ -12,7 +14,10 @@ func PackMessage(message proto.Message) (*PocketEnvelope, error) { if err != nil { return nil, err } - return &PocketEnvelope{Content: anyMsg}, nil + return &PocketEnvelope{ + Content: anyMsg, + Nonce: cryptoPocket.GetNonce(), + }, nil } // UnpackMessage extracts the message inside the PocketEnvelope decorating it with typing information diff --git a/shared/messaging/proto/debug_message.proto b/shared/messaging/proto/debug_message.proto index 7ce079afa3..7e42f7dddf 100644 --- a/shared/messaging/proto/debug_message.proto +++ b/shared/messaging/proto/debug_message.proto @@ -30,6 +30,10 @@ message DebugMessage { google.protobuf.Any message = 3; } +message DebugStringMessage { + string value = 1; +} + // NB: See https://en.wikipedia.org/wiki/Routing for more info on routing and delivery schemes. enum DebugMessageRoutingType { DEBUG_MESSAGE_TYPE_UNKNOWN = 0;