From 3839c981884914bba145e57cfa845968836bac6a Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 11 Jul 2023 13:05:52 +0200 Subject: [PATCH] refactor: `rainTreeRouter` as submodule --- p2p/config/config.go | 16 ++++----- p2p/module.go | 10 +++--- p2p/module_raintree_test.go | 8 +++++ p2p/raintree/peers_manager_test.go | 58 +++++++++++++++++------------- p2p/raintree/router.go | 45 ++++++++++++++--------- p2p/raintree/router_test.go | 35 +++++++++--------- p2p/raintree/utils_test.go | 57 ++++++++++++++++++++++++++--- p2p/utils_test.go | 38 ++++++++++++++++---- 8 files changed, 178 insertions(+), 89 deletions(-) diff --git a/p2p/config/config.go b/p2p/config/config.go index 1cd9db2844..f57187b1af 100644 --- a/p2p/config/config.go +++ b/p2p/config/config.go @@ -51,11 +51,9 @@ type BackgroundConfig struct { // RainTreeConfig implements `RouterConfig` for use with `RainTreeRouter`. type RainTreeConfig struct { - Host host.Host - Addr crypto.Address - CurrentHeightProvider providers.CurrentHeightProvider - PeerstoreProvider providers.PeerstoreProvider - Handler func(data []byte) error + Host host.Host + Addr crypto.Address + Handler func(data []byte) error } // IsValid implements the respective member of the `RouterConfig` interface. @@ -121,11 +119,9 @@ func (cfg *BackgroundConfig) IsValid() error { // IsValid implements the respective member of the `RouterConfig` interface. func (cfg *RainTreeConfig) IsValid() error { baseCfg := baseConfig{ - Host: cfg.Host, - Addr: cfg.Addr, - CurrentHeightProvider: cfg.CurrentHeightProvider, - PeerstoreProvider: cfg.PeerstoreProvider, - Handler: cfg.Handler, + Host: cfg.Host, + Addr: cfg.Addr, + Handler: cfg.Handler, } return baseCfg.IsValid() } diff --git a/p2p/module.go b/p2p/module.go index 5fee67d2e5..91c2a63346 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -397,14 +397,12 @@ func (m *p2pModule) setupStakedRouter() (err error) { } m.logger.Debug().Msg("setting up staked actor router") - m.stakedActorRouter, err = raintree.NewRainTreeRouter( + m.stakedActorRouter, err = raintree.Create( m.GetBus(), &config.RainTreeConfig{ - Addr: m.address, - CurrentHeightProvider: m.currentHeightProvider, - PeerstoreProvider: m.pstoreProvider, - Host: m.host, - Handler: m.handlePocketEnvelope, + Addr: m.address, + Host: m.host, + Handler: m.handlePocketEnvelope, }, ) if err != nil { diff --git a/p2p/module_raintree_test.go b/p2p/module_raintree_test.go index 3dc98a9883..a7fa6c46ac 100644 --- a/p2p/module_raintree_test.go +++ b/p2p/module_raintree_test.go @@ -251,6 +251,14 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te persistenceMock := preparePersistenceMock(t, busMocks[i], genesisMock) consensusMock := prepareConsensusMock(t, busMocks[i]) + currentHeightProviderMock := prepareCurrentHeightProviderMock(t, busMocks[i]) + + busMocks[i].RegisterModule(currentHeightProviderMock) + busMocks[i].EXPECT(). + GetCurrentHeightProvider(). + Return(currentHeightProviderMock). + AnyTimes() + telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &readWriteWaitGroup, expectedWrites) prepareBusMock(busMocks[i], persistenceMock, consensusMock, telemetryMock) diff --git a/p2p/raintree/peers_manager_test.go b/p2p/raintree/peers_manager_test.go index 9e3fa2502b..ad28a560c3 100644 --- a/p2p/raintree/peers_manager_test.go +++ b/p2p/raintree/peers_manager_test.go @@ -18,8 +18,10 @@ import ( "github.com/pokt-network/pocket/p2p/config" typesP2P "github.com/pokt-network/pocket/p2p/types" mocksP2P "github.com/pokt-network/pocket/p2p/types/mocks" + "github.com/pokt-network/pocket/runtime" "github.com/pokt-network/pocket/runtime/configs" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" + "github.com/pokt-network/pocket/shared/modules" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" ) @@ -93,22 +95,21 @@ func TestRainTree_Peerstore_HandleUpdate(t *testing.T) { }) require.NoError(t, err) - mockBus := mockBus(ctrl) - pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) - currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0) + mockBus := mockBus(ctrl, pstore) + mockBus.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) { + m.SetBus(mockBus) + }).AnyTimes() libp2pMockNet, err := mocknet.WithNPeers(1) require.NoError(t, err) rtCfg := &config.RainTreeConfig{ - Host: libp2pMockNet.Hosts()[0], - Addr: pubKey.Address(), - PeerstoreProvider: pstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: libp2pMockNet.Hosts()[0], + Addr: pubKey.Address(), + Handler: noopHandler, } - router, err := NewRainTreeRouter(mockBus, rtCfg) + router, err := Create(mockBus, rtCfg) require.NoError(t, err) rainTree := router.(*rainTreeRouter) @@ -142,7 +143,7 @@ func BenchmarkPeerstoreUpdates(b *testing.B) { // {1000000000, 19}, } - // the test will add this arbitrary number of addresses after the initial initialization (done via NewRainTreeRouter) + // the test will add this arbitrary number of addresses after the initial initialization (done via Create) // this is to add extra subsequent work that -should- grow linearly and it's actually going to test AddressBook updates // not simply initializations. numAddressesToBeAdded := 1000 @@ -158,9 +159,7 @@ func BenchmarkPeerstoreUpdates(b *testing.B) { }) require.NoError(b, err) - mockBus := mockBus(ctrl) - pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) - currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0) + mockBus := mockBus(ctrl, pstore) libp2pPStore, err := pstoremem.NewPeerstore() require.NoError(b, err) @@ -169,14 +168,12 @@ func BenchmarkPeerstoreUpdates(b *testing.B) { hostMock.EXPECT().Peerstore().Return(libp2pPStore).AnyTimes() rtCfg := &config.RainTreeConfig{ - Host: hostMock, - Addr: pubKey.Address(), - PeerstoreProvider: pstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: hostMock, + Addr: pubKey.Address(), + Handler: noopHandler, } - router, err := NewRainTreeRouter(mockBus, rtCfg) + router, err := Create(mockBus, rtCfg) require.NoError(b, err) rainTree := router.(*rainTreeRouter) @@ -272,7 +269,14 @@ func TestRainTree_MessageTargets_TwentySevenNodes(t *testing.T) { func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeMessageProp) { ctrl := gomock.NewController(t) + modulesRegistry := runtime.NewModulesRegistry() busMock := mockModules.NewMockBus(ctrl) + busMock.EXPECT().GetModulesRegistry().Return(modulesRegistry).AnyTimes() + busMock.EXPECT().RegisterModule(gomock.Any()).Do(func(arg modules.Submodule) { + module := arg.(modules.Submodule) + modulesRegistry.RegisterModule(module) + module.SetBus(busMock) + }).AnyTimes() consensusMock := mockModules.NewMockConsensusModule(ctrl) consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes() busMock.EXPECT().GetConsensusModule().Return(consensusMock).AnyTimes() @@ -283,10 +287,16 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM runtimeMgrMock.EXPECT().GetConfig().Return(configs.NewDefaultConfig()).AnyTimes() mockAlphabetValidatorServiceURLsDNS(t) + + // TECHDEBT(#810): simplify once `bus.GetPeerstoreProvider()` is available. pstore := getAlphabetPeerstore(t, expectedMsgProp.numNodes) pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) + busMock.RegisterModule(pstoreProviderMock) + currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 1) + busMock.EXPECT().GetCurrentHeightProvider().Return(currentHeightProviderMock).AnyTimes() + libp2pPStore, err := pstoremem.NewPeerstore() require.NoError(t, err) @@ -296,14 +306,12 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM hostMock.EXPECT().ID().Return(libp2pPeer.ID("")).AnyTimes() rtCfg := &config.RainTreeConfig{ - Host: hostMock, - Addr: []byte{expectedMsgProp.orig}, - PeerstoreProvider: pstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: hostMock, + Addr: []byte{expectedMsgProp.orig}, + Handler: noopHandler, } - router, err := NewRainTreeRouter(busMock, rtCfg) + router, err := Create(busMock, rtCfg) require.NoError(t, err) rainTree := router.(*rainTreeRouter) diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index e2a5d8855e..e4f626ee64 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -48,7 +48,7 @@ type rainTreeRouter struct { currentHeightProvider modules.CurrentHeightProvider } -func NewRainTreeRouter(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Router, error) { +func Create(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Router, error) { return new(rainTreeRouter).Create(bus, cfg) } @@ -59,17 +59,29 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type } rtr := &rainTreeRouter{ - host: cfg.Host, - selfAddr: cfg.Addr, - pstoreProvider: cfg.PeerstoreProvider, - currentHeightProvider: cfg.CurrentHeightProvider, - logger: rainTreeLogger, - handler: cfg.Handler, + host: cfg.Host, + selfAddr: cfg.Addr, + logger: rainTreeLogger, + handler: cfg.Handler, } - rtr.SetBus(bus) + bus.RegisterModule(rtr) - height := rtr.currentHeightProvider.CurrentHeight() - pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(height) + currentHeightProvider := bus.GetCurrentHeightProvider() + // TECHDEBT(#810, 811): 🙄 cleanup; avoid holding a reference + rtr.currentHeightProvider = currentHeightProvider + // TECHDEBT(#810, 811): use `bus.GetPeerstoreProvider()` once available. + pstoreProviderModule, err := bus.GetModulesRegistry().GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) + if err != nil { + return nil, err + } + + pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider) + if !ok { + return nil, fmt.Errorf("unexpected peerstore provider module type: %T", pstoreProviderModule) + } + + height := currentHeightProvider.CurrentHeight() + pstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(height) if err != nil { return nil, fmt.Errorf("getting staked peerstore at height %d: %w", height, err) } @@ -81,7 +93,7 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type "peerstore_size": pstore.Size(), }).Msg("initializing raintree router") - if err := rtr.setupDependencies(); err != nil { + if err := rtr.setupDependencies(pstore); err != nil { return nil, err } @@ -92,6 +104,10 @@ func (rtr *rainTreeRouter) Close() error { return nil } +func (rtr *rainTreeRouter) GetModuleName() string { + return typesP2P.StakedActorRouterSubmoduleName +} + // NetworkBroadcast implements the respective member of `typesP2P.Router`. func (rtr *rainTreeRouter) Broadcast(data []byte) error { return rtr.broadcastAtLevel(data, rtr.peersManager.GetMaxNumLevels()) @@ -317,16 +333,11 @@ func (rtr *rainTreeRouter) setupUnicastRouter() error { return nil } -func (rtr *rainTreeRouter) setupDependencies() error { +func (rtr *rainTreeRouter) setupDependencies(pstore typesP2P.Peerstore) error { if err := rtr.setupUnicastRouter(); err != nil { return err } - pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight()) - if err != nil { - return fmt.Errorf("getting staked peerstore: %w", err) - } - if err := rtr.setupPeerManager(pstore); err != nil { return fmt.Errorf("setting up peer manager: %w", err) } diff --git a/p2p/raintree/router_test.go b/p2p/raintree/router_test.go index 2865a584b9..20567775b5 100644 --- a/p2p/raintree/router_test.go +++ b/p2p/raintree/router_test.go @@ -9,12 +9,14 @@ import ( libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto" libp2pHost "github.com/libp2p/go-libp2p/core/host" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" + "github.com/pokt-network/pocket/p2p/config" typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/p2p/utils" "github.com/pokt-network/pocket/runtime/defaults" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" - "github.com/stretchr/testify/require" + "github.com/pokt-network/pocket/shared/modules" ) // TECHDEBT(#609): move & de-dup. @@ -48,19 +50,18 @@ func TestRainTreeRouter_AddPeer(t *testing.T) { require.NoError(t, err) expectedPStoreSize++ - busMock := mockBus(ctrl) - peerstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) - currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0) + busMock := mockBus(ctrl, pstore) + busMock.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) { + m.SetBus(busMock) + }).AnyTimes() rtCfg := &config.RainTreeConfig{ - Host: host, - Addr: selfAddr, - PeerstoreProvider: peerstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: host, + Addr: selfAddr, + Handler: noopHandler, } - router, err := NewRainTreeRouter(busMock, rtCfg) + router, err := Create(busMock, rtCfg) require.NoError(t, err) rtRouter := router.(*rainTreeRouter) @@ -112,18 +113,14 @@ func TestRainTreeRouter_RemovePeer(t *testing.T) { require.NoError(t, err) expectedPStoreSize++ - busMock := mockBus(ctrl) - peerstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) - currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0) + busMock := mockBus(ctrl, pstore) rtCfg := &config.RainTreeConfig{ - Host: host, - Addr: selfAddr, - PeerstoreProvider: peerstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: host, + Addr: selfAddr, + Handler: noopHandler, } - router, err := NewRainTreeRouter(busMock, rtCfg) + router, err := Create(busMock, rtCfg) require.NoError(t, err) rainTree := router.(*rainTreeRouter) diff --git a/p2p/raintree/utils_test.go b/p2p/raintree/utils_test.go index 82e99438af..f2502acfb4 100644 --- a/p2p/raintree/utils_test.go +++ b/p2p/raintree/utils_test.go @@ -2,14 +2,24 @@ package raintree import ( "github.com/golang/mock/gomock" + "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" typesP2P "github.com/pokt-network/pocket/p2p/types" mocksP2P "github.com/pokt-network/pocket/p2p/types/mocks" "github.com/pokt-network/pocket/runtime/configs" + "github.com/pokt-network/pocket/shared/modules" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" ) -func mockBus(ctrl *gomock.Controller) *mockModules.MockBus { +// TECHDEBT(#796): refactor/de-dup & separate definitions of mocks from one another. +func mockBus(ctrl *gomock.Controller, pstore typesP2P.Peerstore) *mockModules.MockBus { + if pstore == nil { + pstore = &typesP2P.PeerAddrMap{} + } + busMock := mockModules.NewMockBus(ctrl) + busMock.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) { + m.SetBus(busMock) + }).AnyTimes() busMock.EXPECT().GetPersistenceModule().Return(nil).AnyTimes() consensusMock := mockModules.NewMockConsensusModule(ctrl) consensusMock.EXPECT().CurrentHeight().Return(uint64(0)).AnyTimes() @@ -17,17 +27,54 @@ func mockBus(ctrl *gomock.Controller) *mockModules.MockBus { runtimeMgrMock := mockModules.NewMockRuntimeMgr(ctrl) busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes() runtimeMgrMock.EXPECT().GetConfig().Return(configs.NewDefaultConfig()).AnyTimes() + + currentHeightProviderMock := mockModules.NewMockCurrentHeightProvider(ctrl) + currentHeightProviderMock.EXPECT().CurrentHeight().Return(uint64(0)).AnyTimes() + busMock.EXPECT(). + GetCurrentHeightProvider(). + Return(currentHeightProviderMock). + AnyTimes() + + peerstoreProviderMock := mocksP2P.NewMockPeerstoreProvider(ctrl) + peerstoreProviderMock.EXPECT(). + GetStakedPeerstoreAtHeight(gomock.Any()). + Return(pstore, nil). + AnyTimes() + + modulesRegistryMock := mockModules.NewMockModulesRegistry(ctrl) + modulesRegistryMock.EXPECT(). + GetModule(gomock.Eq(peerstore_provider.PeerstoreProviderSubmoduleName)). + Return(peerstoreProviderMock, nil). + AnyTimes() + modulesRegistryMock.EXPECT(). + GetModule(gomock.Eq(modules.CurrentHeightProviderSubmoduleName)). + Return(currentHeightProviderMock, nil). + AnyTimes() + busMock.EXPECT().GetModulesRegistry().Return(modulesRegistryMock).AnyTimes() + return busMock } -func mockPeerstoreProvider(ctrl *gomock.Controller, pstore typesP2P.Peerstore) *mocksP2P.MockPeerstoreProvider { +func mockPeerstoreProvider( + ctrl *gomock.Controller, + pstore typesP2P.Peerstore, +) *mocksP2P.MockPeerstoreProvider { peerstoreProviderMock := mocksP2P.NewMockPeerstoreProvider(ctrl) - peerstoreProviderMock.EXPECT().GetStakedPeerstoreAtHeight(gomock.Any()).Return(pstore, nil).AnyTimes() + peerstoreProviderMock.EXPECT().SetBus(gomock.Any()).AnyTimes() + peerstoreProviderMock.EXPECT().GetBus().AnyTimes() + peerstoreProviderMock.EXPECT(). + GetStakedPeerstoreAtHeight(gomock.Any()). + Return(pstore, nil). + AnyTimes() + peerstoreProviderMock.EXPECT(). + GetModuleName(). + Return(peerstore_provider.PeerstoreProviderSubmoduleName). + AnyTimes() return peerstoreProviderMock } -func mockCurrentHeightProvider(ctrl *gomock.Controller, height uint64) *mocksP2P.MockCurrentHeightProvider { - currentHeightProviderMock := mocksP2P.NewMockCurrentHeightProvider(ctrl) +func mockCurrentHeightProvider(ctrl *gomock.Controller, height uint64) *mockModules.MockCurrentHeightProvider { + currentHeightProviderMock := mockModules.NewMockCurrentHeightProvider(ctrl) currentHeightProviderMock.EXPECT().CurrentHeight().Return(height).AnyTimes() return currentHeightProviderMock } diff --git a/p2p/utils_test.go b/p2p/utils_test.go index 43006c4819..b50c103a7a 100644 --- a/p2p/utils_test.go +++ b/p2p/utils_test.go @@ -21,6 +21,7 @@ import ( typesP2P "github.com/pokt-network/pocket/p2p/types" mock_types "github.com/pokt-network/pocket/p2p/types/mocks" "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/runtime" "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/runtime/configs/types" "github.com/pokt-network/pocket/runtime/defaults" @@ -210,13 +211,15 @@ func createMockBus( ctrl := gomock.NewController(t) mockBus := mockModules.NewMockBus(ctrl) mockBus.EXPECT().GetRuntimeMgr().Return(runtimeMgr).AnyTimes() - mockBus.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) { - m.SetBus(mockBus) - }).AnyTimes() - // TECHDEBT: modules registry mock behavior should be defined separately. - mockModulesRegistry := mockModules.NewMockModulesRegistry(ctrl) - mockModulesRegistry.EXPECT().GetModule(peerstore_provider.PeerstoreProviderSubmoduleName).Return(nil, runtime.ErrModuleNotRegistered(peerstore_provider.PeerstoreProviderSubmoduleName)).AnyTimes() - mockBus.EXPECT().GetModulesRegistry().Return(mockModulesRegistry).AnyTimes() + modulesRegistry := runtime.NewModulesRegistry() + mockBus.EXPECT(). + RegisterModule(gomock.Any()). + DoAndReturn(func(arg any) { + module := arg.(modules.Submodule) + modulesRegistry.RegisterModule(module) + module.SetBus(mockBus) + }).AnyTimes() + mockBus.EXPECT().GetModulesRegistry().Return(modulesRegistry).AnyTimes() mockBus.EXPECT().PublishEventToBus(gomock.AssignableToTypeOf(&messaging.PocketEnvelope{})). Do(func(envelope *messaging.PocketEnvelope) { fmt.Println("[valId: unknown] Read") @@ -292,6 +295,27 @@ func prepareCurrentHeightProviderMock(t *testing.T, busMock *mockModules.MockBus return currentHeightProviderMock } +func preparePeerstoreProviderMock( + t *testing.T, + busMock *mockModules.MockBus, + pstore typesP2P.Peerstore, +) *mock_types.MockPeerstoreProvider { + ctrl := gomock.NewController(t) + peerstoreProviderMock := mock_types.NewMockPeerstoreProvider(ctrl) + peerstoreProviderMock.EXPECT(). + GetStakedPeerstoreAtHeight(gomock.Any()). + Return(pstore, nil). + AnyTimes() + + peerstoreProviderMock.EXPECT().GetBus().Return(busMock).AnyTimes() + peerstoreProviderMock.EXPECT().SetBus(busMock).AnyTimes() + peerstoreProviderMock.EXPECT().GetModuleName(). + Return(peerstore_provider.PeerstoreProviderSubmoduleName). + AnyTimes() + + return peerstoreProviderMock +} + // Persistence mock - only needed for validatorMap access func preparePersistenceMock(t *testing.T, busMock *mockModules.MockBus, genesisState *genesis.GenesisState) *mockModules.MockPersistenceModule { ctrl := gomock.NewController(t)