From 41c3d320f303c991f4c4b0372309428f9e7f4bc6 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 13 Jul 2023 14:27:15 +0200 Subject: [PATCH] [P2P] Refactor P2P submodules (#895) ## @Reviewer This PR may be more digestible / reviewable on a commit-by-commit basis. Commits are organized logically and any given line is only modified in a single commit, with few exceptions*. *(In the interest of preserving the git-time-continuum :police_officer::rotating_light:, this applies in batches of commits between comments or reviews *by humans*, only once "in review") --- ## Description Refactor P2P module dependencies as submodules, ultimately to support usage P2P module usage in the CLI. ## Issue Related: - #730 Dependant(s): - #891 - #801 - #892 ## Type of change Please mark the relevant option(s): - [ ] New feature, functionality or library - [ ] Bug fix - [x] Code health or cleanup - [ ] Major breaking change - [ ] Documentation - [ ] Other ## List of changes - decoupled P2P module dependencies from Consensus module (disambiguates modules registry names) - added "current_height_provider" module registry slot - promoted `CurrentHeightProvider` to a submodule interface type - added `consensusCurrentHeightProvider` implementation - promoted `Router` interface to a submodule interface type - added "staked_actor_router" modules registry slot - added "unstaked_actor_router" modules registry slot - converted `backgroundRouter` implementation to submodule - converted `rainTreeRouter` implementation to submodule - simplified router base config ## Testing - [x] `make develop_test`; if any code changes were made - [x] `make test_e2e` on [k8s LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md); if any code changes were made - [x] `e2e-devnet-test` passes tests on [DevNet](https://pocketnetwork.notion.site/How-to-DevNet-ff1598f27efe44c09f34e2aa0051f0dd); if any code was changed - [ ] [Docker Compose LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md); if any major functionality was changed or introduced - [x] [k8s LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md); if any infrastructure or configuration changes were made ## Required Checklist - [x] I have performed a self-review of my own code - [x] I have commented my code, particularly in hard-to-understand areas - [x] I have added, or updated, [`godoc` format comments](https://go.dev/blog/godoc) on touched members (see: [tip.golang.org/doc/comment](https://tip.golang.org/doc/comment)) - [x] I have tested my changes using the available tooling - [ ] I have updated the corresponding CHANGELOG ### If Applicable Checklist - [ ] I have updated the corresponding README(s); local and/or global - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have added, or updated, [mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding README(s) - [ ] I have added, or updated, documentation and [mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*` if I updated `shared/*`README(s) --------- Co-authored-by: Daniel Olshansky --- app/client/cli/debug.go | 11 +- app/client/cli/helpers/setup.go | 23 ++-- consensus/module.go | 6 + p2p/background/router.go | 45 +++++--- p2p/background/router_test.go | 31 ++++-- p2p/bootstrap.go | 9 +- p2p/config/config.go | 53 +++------ p2p/event_handler.go | 7 +- p2p/module.go | 104 ++++++------------ p2p/module_raintree_test.go | 8 ++ p2p/module_test.go | 21 +++- .../consensus/provider.go | 30 +++++ .../current_height_provider.go | 14 --- .../current_height_provider/rpc/provider.go | 59 +++++----- p2p/providers/providers.go | 2 - p2p/raintree/peers_manager_test.go | 57 +++++----- p2p/raintree/peerstore_utils.go | 32 +++++- p2p/raintree/router.go | 57 ++++++---- p2p/raintree/router_test.go | 35 +++--- p2p/raintree/utils_test.go | 57 +++++++++- p2p/transport_encryption_test.go | 8 ++ p2p/types/router.go | 21 +++- p2p/utils_test.go | 52 +++++++-- runtime/bus.go | 4 + shared/modules/bus_module.go | 3 + .../current_height_provider_submodule.go | 13 +++ 26 files changed, 483 insertions(+), 279 deletions(-) create mode 100644 p2p/providers/current_height_provider/consensus/provider.go delete mode 100644 p2p/providers/current_height_provider/current_height_provider.go create mode 100644 shared/modules/current_height_provider_submodule.go diff --git a/app/client/cli/debug.go b/app/client/cli/debug.go index 23b647933..f23a278d7 100644 --- a/app/client/cli/debug.go +++ b/app/client/cli/debug.go @@ -11,7 +11,6 @@ import ( "github.com/pokt-network/pocket/app/client/cli/helpers" "github.com/pokt-network/pocket/logger" - "github.com/pokt-network/pocket/p2p/providers/current_height_provider" "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/shared/messaging" @@ -227,19 +226,15 @@ func fetchPeerstore(cmd *cobra.Command) (typesP2P.Peerstore, error) { if !ok || bus == nil { return nil, errors.New("retrieving bus from CLI context") } - modulesRegistry := bus.GetModulesRegistry() // TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider // is retrievable as a proper submodule - pstoreProvider, err := modulesRegistry.GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) + pstoreProvider, err := bus.GetModulesRegistry().GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) if err != nil { return nil, errors.New("retrieving peerstore provider") } - currentHeightProvider, err := modulesRegistry.GetModule(current_height_provider.ModuleName) - if err != nil { - return nil, errors.New("retrieving currentHeightProvider") - } + currentHeightProvider := bus.GetCurrentHeightProvider() - height := currentHeightProvider.(current_height_provider.CurrentHeightProvider).CurrentHeight() + height := currentHeightProvider.CurrentHeight() pstore, err := pstoreProvider.(peerstore_provider.PeerstoreProvider).GetStakedPeerstoreAtHeight(height) if err != nil { return nil, fmt.Errorf("retrieving peerstore at height %d", height) diff --git a/app/client/cli/helpers/setup.go b/app/client/cli/helpers/setup.go index 5f5546a91..956102a04 100644 --- a/app/client/cli/helpers/setup.go +++ b/app/client/cli/helpers/setup.go @@ -1,6 +1,8 @@ package helpers import ( + "fmt" + "github.com/spf13/cobra" "github.com/pokt-network/pocket/app/client/cli/flags" @@ -30,7 +32,11 @@ func P2PDependenciesPreRunE(cmd *cobra.Command, _ []string) error { if err := setupPeerstoreProvider(*runtimeMgr, flags.RemoteCLIURL); err != nil { return err } - setupCurrentHeightProvider(*runtimeMgr, flags.RemoteCLIURL) + + if err := setupRPCCurrentHeightProvider(*runtimeMgr, flags.RemoteCLIURL); err != nil { + return err + } + setupAndStartP2PModule(*runtimeMgr) return nil @@ -44,15 +50,16 @@ func setupPeerstoreProvider(rm runtime.Manager, rpcURL string) error { return nil } -func setupCurrentHeightProvider(rm runtime.Manager, rpcURL string) { - // TECHDEBT(#810): simplify after current height provider is refactored as - // a submodule. - bus := rm.GetBus() - modulesRegistry := bus.GetModulesRegistry() - currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider( +func setupRPCCurrentHeightProvider(rm runtime.Manager, rpcURL string) error { + // Ensure `CurrentHeightProvider` exists in the modules registry. + _, err := rpcCHP.Create( + rm.GetBus(), rpcCHP.WithCustomRPCURL(rpcURL), ) - modulesRegistry.RegisterModule(currentHeightProvider) + if err != nil { + return fmt.Errorf("setting up current height provider: %w", err) + } + return nil } func setupAndStartP2PModule(rm runtime.Manager) { diff --git a/consensus/module.go b/consensus/module.go index 2b32b62eb..2c1697aef 100644 --- a/consensus/module.go +++ b/consensus/module.go @@ -11,6 +11,7 @@ import ( consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry" typesCons "github.com/pokt-network/pocket/consensus/types" "github.com/pokt-network/pocket/logger" + "github.com/pokt-network/pocket/p2p/providers/current_height_provider/consensus" "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/runtime/genesis" "github.com/pokt-network/pocket/shared/codec" @@ -132,6 +133,11 @@ func (*consensusModule) Create(bus modules.Bus, options ...modules.ModuleOption) bus.RegisterModule(m) + // Ensure `CurrentHeightProvider` submodule is registered. + if _, err = consensus.Create(bus); err != nil { + return nil, fmt.Errorf("failed to create current height provider: %w", err) + } + runtimeMgr := bus.GetRuntimeMgr() consensusCfg := runtimeMgr.GetConfig().Consensus diff --git a/p2p/background/router.go b/p2p/background/router.go index d5119b395..5e6254d20 100644 --- a/p2p/background/router.go +++ b/p2p/background/router.go @@ -17,6 +17,7 @@ import ( "github.com/pokt-network/pocket/p2p/config" "github.com/pokt-network/pocket/p2p/protocol" "github.com/pokt-network/pocket/p2p/providers" + "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/p2p/unicast" "github.com/pokt-network/pocket/p2p/utils" @@ -27,9 +28,8 @@ import ( ) var ( - _ typesP2P.Router = &backgroundRouter{} - _ modules.IntegrableModule = &backgroundRouter{} - _ backgroundRouterFactory = &backgroundRouter{} + _ typesP2P.Router = &backgroundRouter{} + _ backgroundRouterFactory = &backgroundRouter{} ) type backgroundRouterFactory = modules.FactoryWithConfig[typesP2P.Router, *config.BackgroundConfig] @@ -93,7 +93,7 @@ func (*backgroundRouter) Create(bus modules.Bus, cfg *config.BackgroundConfig) ( host: cfg.Host, cancelReadSubscription: cancel, } - rtr.SetBus(bus) + bus.RegisterModule(rtr) bgRouterLogger.Info().Fields(map[string]any{ "host_id": cfg.Host.ID(), @@ -127,6 +127,11 @@ func (rtr *backgroundRouter) Close() error { ) } +// GetModuleName implements the respective `modules.Integrable` interface method. +func (rtr *backgroundRouter) GetModuleName() string { + return typesP2P.UnstakedActorRouterSubmoduleName +} + // Broadcast implements the respective `typesP2P.Router` interface method. func (rtr *backgroundRouter) Broadcast(pocketEnvelopeBz []byte) error { backgroundMsg := &typesP2P.BackgroundMessage{ @@ -215,7 +220,8 @@ func (rtr *backgroundRouter) setupUnicastRouter() error { return nil } -func (rtr *backgroundRouter) setupDependencies(ctx context.Context, cfg *config.BackgroundConfig) error { +// TECHBEDT(#810,#811): remove unused `BackgroundConfig` +func (rtr *backgroundRouter) setupDependencies(ctx context.Context, _ *config.BackgroundConfig) error { // NB: The order in which the internal components are setup below is important if err := rtr.setupUnicastRouter(); err != nil { return err @@ -237,21 +243,30 @@ func (rtr *backgroundRouter) setupDependencies(ctx context.Context, cfg *config. return fmt.Errorf("setting up subscription: %w", err) } - if err := rtr.setupPeerstore( - ctx, - cfg.PeerstoreProvider, - cfg.CurrentHeightProvider, - ); err != nil { + if err := rtr.setupPeerstore(ctx); err != nil { return fmt.Errorf("setting up peerstore: %w", err) } return nil } -func (rtr *backgroundRouter) setupPeerstore( - ctx context.Context, - pstoreProvider providers.PeerstoreProvider, - currentHeightProvider providers.CurrentHeightProvider, -) (err error) { +func (rtr *backgroundRouter) setupPeerstore(ctx context.Context) (err error) { + rtr.logger.Warn().Msg("setting up peerstore...") + + // TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider + // is retrievable as a proper submodule + pstoreProviderModule, err := rtr.GetBus().GetModulesRegistry(). + GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) + if err != nil { + return fmt.Errorf("retrieving peerstore provider: %w", err) + } + pstoreProvider, ok := pstoreProviderModule.(providers.PeerstoreProvider) + if !ok { + return fmt.Errorf("unexpected peerstore provider type: %T", pstoreProviderModule) + } + + rtr.logger.Debug().Msg("setupCurrentHeightProvider") + currentHeightProvider := rtr.GetBus().GetCurrentHeightProvider() + // seed initial peerstore with current on-chain peer info (i.e. staked actors) rtr.pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight( currentHeightProvider.CurrentHeight(), diff --git a/p2p/background/router_test.go b/p2p/background/router_test.go index 61cb5f153..3c93cf758 100644 --- a/p2p/background/router_test.go +++ b/p2p/background/router_test.go @@ -22,13 +22,16 @@ import ( "github.com/pokt-network/pocket/internal/testutil" "github.com/pokt-network/pocket/p2p/config" "github.com/pokt-network/pocket/p2p/protocol" + "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" typesP2P "github.com/pokt-network/pocket/p2p/types" mock_types "github.com/pokt-network/pocket/p2p/types/mocks" "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/runtime" "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/runtime/defaults" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/messaging" + "github.com/pokt-network/pocket/shared/modules" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" ) @@ -399,6 +402,7 @@ func newTestRouter( return newRouterWithSelfPeerAndHost(t, selfPeer, host, handler) } +// TECHDEBT(#796): de-dup & refactor func newRouterWithSelfPeerAndHost( t *testing.T, selfPeer typesP2P.Peer, @@ -415,16 +419,27 @@ func newRouterWithSelfPeerAndHost( }, }).AnyTimes() + modulesRegistry := runtime.NewModulesRegistry() + busMock := mockModules.NewMockBus(ctrl) + busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes() + busMock.EXPECT().GetModulesRegistry().Return(modulesRegistry).AnyTimes() + busMock.EXPECT().RegisterModule(gomock.Any()).Do(func(m modules.Submodule) { + modulesRegistry.RegisterModule(m) + m.SetBus(busMock) + }).AnyTimes() + consensusMock := mockModules.NewMockConsensusModule(ctrl) + consensusMock.EXPECT().GetModuleName().Return(modules.ConsensusModuleName).AnyTimes() + consensusMock.EXPECT().GetBus().Return(busMock).AnyTimes() + consensusMock.EXPECT().SetBus(gomock.Any()).AnyTimes() consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes() + busMock.EXPECT().GetCurrentHeightProvider().Return(consensusMock).AnyTimes() pstore := make(typesP2P.PeerAddrMap) pstoreProviderMock := mock_types.NewMockPeerstoreProvider(ctrl) + pstoreProviderMock.EXPECT().GetModuleName().Return(peerstore_provider.PeerstoreProviderSubmoduleName) pstoreProviderMock.EXPECT().GetStakedPeerstoreAtHeight(gomock.Any()).Return(pstore, nil).AnyTimes() - - busMock := mockModules.NewMockBus(ctrl) - busMock.EXPECT().GetConsensusModule().Return(consensusMock).AnyTimes() - busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes() + modulesRegistry.RegisterModule(pstoreProviderMock) err := pstore.AddPeer(selfPeer) require.NoError(t, err) @@ -434,11 +449,9 @@ func newRouterWithSelfPeerAndHost( } router, err := Create(busMock, &config.BackgroundConfig{ - Addr: selfPeer.GetAddress(), - PeerstoreProvider: pstoreProviderMock, - CurrentHeightProvider: consensusMock, - Host: host, - Handler: handler, + Addr: selfPeer.GetAddress(), + Host: host, + Handler: handler, }) require.NoError(t, err) diff --git a/p2p/bootstrap.go b/p2p/bootstrap.go index 58b923a5d..258fb19a2 100644 --- a/p2p/bootstrap.go +++ b/p2p/bootstrap.go @@ -67,7 +67,14 @@ func (m *p2pModule) bootstrap() error { return fmt.Errorf("creating RPC peerstore provider: %w", err) } - currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(rpcCHP.WithCustomRPCURL(bootstrapNode)) + currentHeightProvider, err := rpcCHP.Create( + m.GetBus(), + rpcCHP.WithCustomRPCURL(bootstrapNode), + ) + if err != nil { + m.logger.Warn().Err(err).Str("endpoint", bootstrapNode).Msg("Error getting current height from bootstrap node") + continue + } pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(currentHeightProvider.CurrentHeight()) if err != nil { diff --git a/p2p/config/config.go b/p2p/config/config.go index 60361f662..90350444b 100644 --- a/p2p/config/config.go +++ b/p2p/config/config.go @@ -7,7 +7,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/multierr" - "github.com/pokt-network/pocket/p2p/providers" typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/modules" @@ -20,18 +19,16 @@ var ( _ typesP2P.RouterConfig = &RainTreeConfig{} ) -// baseConfig implements `RouterConfig` using the given libp2p host and current -// height and peerstore providers. Intended for internal use by other `RouterConfig` +// baseConfig implements `RouterConfig` using the given libp2p host, pokt address +// and handler function. Intended for internal use by other `RouterConfig` // implementations with common config parameters. // // NB: intentionally *not* embedding `baseConfig` to improve readability of usages // of would-be embedders (e.g. `BackgroundConfig`). type baseConfig struct { - Host host.Host - Addr crypto.Address - CurrentHeightProvider providers.CurrentHeightProvider - PeerstoreProvider providers.PeerstoreProvider - Handler func(data []byte) error + Host host.Host + Addr crypto.Address + Handler func(data []byte) error } type UnicastRouterConfig struct { @@ -44,20 +41,16 @@ type UnicastRouterConfig struct { // BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`. type BackgroundConfig struct { - Host host.Host - Addr crypto.Address - CurrentHeightProvider providers.CurrentHeightProvider - PeerstoreProvider providers.PeerstoreProvider - Handler func(data []byte) error + Host host.Host + Addr crypto.Address + Handler func(data []byte) error } // RainTreeConfig implements `RouterConfig` for use with `RainTreeRouter`. type RainTreeConfig struct { - Host host.Host - Addr crypto.Address - CurrentHeightProvider providers.CurrentHeightProvider - PeerstoreProvider providers.PeerstoreProvider - Handler func(data []byte) error + Host host.Host + Addr crypto.Address + Handler func(data []byte) error } // IsValid implements the respective member of the `RouterConfig` interface. @@ -66,18 +59,10 @@ func (cfg *baseConfig) IsValid() (err error) { err = multierr.Append(err, fmt.Errorf("pokt address not configured")) } - if cfg.CurrentHeightProvider == nil { - err = multierr.Append(err, fmt.Errorf("current height provider not configured")) - } - if cfg.Host == nil { err = multierr.Append(err, fmt.Errorf("host not configured")) } - if cfg.PeerstoreProvider == nil { - err = multierr.Append(err, fmt.Errorf("peerstore provider not configured")) - } - if cfg.Handler == nil { err = multierr.Append(err, fmt.Errorf("handler not configured")) } @@ -111,11 +96,9 @@ func (cfg *UnicastRouterConfig) IsValid() (err error) { // IsValid implements the respective member of the `RouterConfig` interface. func (cfg *BackgroundConfig) IsValid() error { baseCfg := baseConfig{ - Host: cfg.Host, - Addr: cfg.Addr, - CurrentHeightProvider: cfg.CurrentHeightProvider, - PeerstoreProvider: cfg.PeerstoreProvider, - Handler: cfg.Handler, + Host: cfg.Host, + Addr: cfg.Addr, + Handler: cfg.Handler, } return baseCfg.IsValid() } @@ -123,11 +106,9 @@ func (cfg *BackgroundConfig) IsValid() error { // IsValid implements the respective member of the `RouterConfig` interface. func (cfg *RainTreeConfig) IsValid() error { baseCfg := baseConfig{ - Host: cfg.Host, - Addr: cfg.Addr, - CurrentHeightProvider: cfg.CurrentHeightProvider, - PeerstoreProvider: cfg.PeerstoreProvider, - Handler: cfg.Handler, + Host: cfg.Host, + Addr: cfg.Addr, + Handler: cfg.Handler, } return baseCfg.IsValid() } diff --git a/p2p/event_handler.go b/p2p/event_handler.go index ba0885839..b1184e860 100644 --- a/p2p/event_handler.go +++ b/p2p/event_handler.go @@ -31,7 +31,12 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error { } oldPeerList := m.stakedActorRouter.GetPeerstore().GetPeerList() - updatedPeerstore, err := m.pstoreProvider.GetStakedPeerstoreAtHeight(consensusNewHeightEvent.Height) + pstoreProvider, err := m.getPeerstoreProvider() + if err != nil { + return err + } + + updatedPeerstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(consensusNewHeightEvent.Height) if err != nil { return err } diff --git a/p2p/module.go b/p2p/module.go index e6263d1b4..6bb8f479a 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -16,7 +16,6 @@ import ( "github.com/pokt-network/pocket/p2p/background" "github.com/pokt-network/pocket/p2p/config" "github.com/pokt-network/pocket/p2p/providers" - "github.com/pokt-network/pocket/p2p/providers/current_height_provider" "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" persPSP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/persistence" "github.com/pokt-network/pocket/p2p/raintree" @@ -47,13 +46,8 @@ type p2pModule struct { identity libp2p.Option listenAddrs libp2p.Option - // TECHDEBT(#810): register the providers to the module registry instead of - // holding a reference in the module struct and passing via router config. - // // Assigned during creation via `#setupDependencies()`. - currentHeightProvider providers.CurrentHeightProvider - pstoreProvider providers.PeerstoreProvider - nonceDeduper *mempool.GenericFIFOSet[uint64, uint64] + nonceDeduper *mempool.GenericFIFOSet[uint64, uint64] // TECHDEBT(#810): register the routers to the module registry instead of // holding a reference in the module struct. This will improve testability. @@ -282,10 +276,6 @@ func (m *p2pModule) GetAddress() (cryptoPocket.Address, error) { // setupDependencies sets up the module's current height and peerstore providers. func (m *p2pModule) setupDependencies() error { - if err := m.setupCurrentHeightProvider(); err != nil { - return err - } - if err := m.setupPeerstoreProvider(); err != nil { return err } @@ -307,57 +297,19 @@ func (m *p2pModule) setupPeerstoreProvider() error { GetModulesRegistry(). GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) if err != nil { + // TECHDEBT: compare against `runtime.ErrModuleNotRegistered(...)`. m.logger.Debug().Msg("creating new persistence peerstore...") - pstoreProvider, err := persPSP.Create(m.GetBus()) - if err != nil { + // Ensure a peerstore provider exists by creating a `persistencePeerstoreProvider`. + if _, err := persPSP.Create(m.GetBus()); err != nil { return err } - - m.pstoreProvider = pstoreProvider return nil } - m.logger.Debug().Msg("loaded persistence peerstore...") - pstoreProvider, ok := pstoreProviderModule.(providers.PeerstoreProvider) - if !ok { + m.logger.Debug().Msg("loaded peerstore provider...") + if _, ok := pstoreProviderModule.(providers.PeerstoreProvider); !ok { return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule) } - - // TECHDEBT(#810): register the provider to the module registry instead of - // holding a reference in the module struct and passing via router config. - m.pstoreProvider = pstoreProvider - - return nil -} - -// setupCurrentHeightProvider attempts to retrieve the current height provider -// from the bus registry, falls back to the consensus module if none is registered. -func (m *p2pModule) setupCurrentHeightProvider() error { - // TECHDEBT(#810): simplify once submodules are more convenient to retrieve. - m.logger.Debug().Msg("setupCurrentHeightProvider") - currentHeightProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(current_height_provider.ModuleName) - if err != nil { - // TECHDEBT(#810): add a `consensusCurrentHeightProvider` submodule to wrap - // the consensus module usage (similar to how `persistencePeerstoreProvider` - // wraps persistence). - currentHeightProviderModule = m.GetBus().GetConsensusModule() - } - - if currentHeightProviderModule == nil { - return errors.New("no current height provider or consensus module registered") - } - - m.logger.Debug().Msg("loaded current height provider") - - currentHeightProvider, ok := currentHeightProviderModule.(providers.CurrentHeightProvider) - if !ok { - return fmt.Errorf("unexpected current height provider type: %T", currentHeightProviderModule) - } - - // TECHDEBT(#810): register the provider to the module registry instead of - // holding a reference in the module struct and passing via router config. - m.currentHeightProvider = currentHeightProvider - return nil } @@ -402,14 +354,12 @@ func (m *p2pModule) setupStakedRouter() (err error) { } m.logger.Debug().Msg("setting up staked actor router") - m.stakedActorRouter, err = raintree.NewRainTreeRouter( + m.stakedActorRouter, err = raintree.Create( m.GetBus(), &config.RainTreeConfig{ - Addr: m.address, - CurrentHeightProvider: m.currentHeightProvider, - PeerstoreProvider: m.pstoreProvider, - Host: m.host, - Handler: m.handlePocketEnvelope, + Addr: m.address, + Host: m.host, + Handler: m.handlePocketEnvelope, }, ) if err != nil { @@ -431,11 +381,9 @@ func (m *p2pModule) setupUnstakedRouter() (err error) { m.unstakedActorRouter, err = background.Create( m.GetBus(), &config.BackgroundConfig{ - Addr: m.address, - CurrentHeightProvider: m.currentHeightProvider, - PeerstoreProvider: m.pstoreProvider, - Host: m.host, - Handler: m.handlePocketEnvelope, + Addr: m.address, + Host: m.host, + Handler: m.handlePocketEnvelope, }, ) if err != nil { @@ -546,7 +494,7 @@ func (m *p2pModule) isNonceAlreadyObserved(nonce utils.Nonce) bool { } func (m *p2pModule) redundantNonceTelemetry(nonce utils.Nonce) { - blockHeight := m.currentHeightProvider.CurrentHeight() + blockHeight := m.GetBus().GetCurrentHeightProvider().CurrentHeight() m.GetBus(). GetTelemetryModule(). GetEventMetricsAgent(). @@ -569,11 +517,31 @@ func (m *p2pModule) getMultiaddr() (multiaddr.Multiaddr, error) { } func (m *p2pModule) getStakedPeerstore() (typesP2P.Peerstore, error) { - return m.pstoreProvider.GetStakedPeerstoreAtHeight( - m.currentHeightProvider.CurrentHeight(), + pstoreProvider, err := m.getPeerstoreProvider() + if err != nil { + return nil, err + } + + return pstoreProvider.GetStakedPeerstoreAtHeight( + m.GetBus().GetCurrentHeightProvider().CurrentHeight(), ) } +// TECHDEBT(#810, #811): replace with `bus.GetPeerstoreProvider()` once available. +func (m *p2pModule) getPeerstoreProvider() (peerstore_provider.PeerstoreProvider, error) { + pstoreProviderModule, err := m.GetBus(). + GetModulesRegistry(). + GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) + if err != nil { + return nil, err + } + pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider) + if !ok { + return nil, fmt.Errorf("peerstore provider not available") + } + return pstoreProvider, nil +} + // isStakedActor returns whether the current node is a staked actor at the current height. // Return an error if a peerstore can't be provided. func (m *p2pModule) isStakedActor() (bool, error) { diff --git a/p2p/module_raintree_test.go b/p2p/module_raintree_test.go index 3dc98a988..a7fa6c46a 100644 --- a/p2p/module_raintree_test.go +++ b/p2p/module_raintree_test.go @@ -251,6 +251,14 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te persistenceMock := preparePersistenceMock(t, busMocks[i], genesisMock) consensusMock := prepareConsensusMock(t, busMocks[i]) + currentHeightProviderMock := prepareCurrentHeightProviderMock(t, busMocks[i]) + + busMocks[i].RegisterModule(currentHeightProviderMock) + busMocks[i].EXPECT(). + GetCurrentHeightProvider(). + Return(currentHeightProviderMock). + AnyTimes() + telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &readWriteWaitGroup, expectedWrites) prepareBusMock(busMocks[i], persistenceMock, consensusMock, telemetryMock) diff --git a/p2p/module_test.go b/p2p/module_test.go index 949a3a3b8..e8dde82fd 100644 --- a/p2p/module_test.go +++ b/p2p/module_test.go @@ -11,6 +11,7 @@ import ( libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto" libp2pHost "github.com/libp2p/go-libp2p/core/host" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -22,7 +23,6 @@ import ( "github.com/pokt-network/pocket/shared/messaging" "github.com/pokt-network/pocket/shared/modules" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" - "github.com/stretchr/testify/require" ) // TECHDEBT(#609): move & de-dup. @@ -128,6 +128,14 @@ func Test_Create_configureBootstrapNodes(t *testing.T) { mockConsensusModule := mockModules.NewMockConsensusModule(ctrl) mockConsensusModule.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes() mockBus.EXPECT().GetConsensusModule().Return(mockConsensusModule).AnyTimes() + + currentHeightProviderMock := prepareCurrentHeightProviderMock(t, mockBus) + mockBus.RegisterModule(currentHeightProviderMock) + + pstore := new(typesP2P.PeerAddrMap) + pstoreProviderMock := preparePeerstoreProviderMock(t, mockBus, pstore) + mockBus.RegisterModule(pstoreProviderMock) + mockRuntimeMgr.EXPECT().GetConfig().Return(&configs.Config{ PrivateKey: privKey.String(), P2P: &configs.P2PConfig{ @@ -248,6 +256,17 @@ func newP2PModule(t *testing.T, privKey cryptoPocket.PrivateKey, opts ...modules consensusModuleMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes() mockBus.EXPECT().GetConsensusModule().Return(consensusModuleMock).AnyTimes() + currentHeightProviderMock := prepareCurrentHeightProviderMock(t, mockBus) + mockBus.RegisterModule(currentHeightProviderMock) + mockBus.EXPECT(). + GetCurrentHeightProvider(). + Return(currentHeightProviderMock). + AnyTimes() + + pstore := new(typesP2P.PeerAddrMap) + pstoreProviderMock := preparePeerstoreProviderMock(t, mockBus, pstore) + mockBus.RegisterModule(pstoreProviderMock) + telemetryModuleMock := baseTelemetryMock(t, nil) mockBus.EXPECT().GetTelemetryModule().Return(telemetryModuleMock).AnyTimes() diff --git a/p2p/providers/current_height_provider/consensus/provider.go b/p2p/providers/current_height_provider/consensus/provider.go new file mode 100644 index 000000000..76160e74b --- /dev/null +++ b/p2p/providers/current_height_provider/consensus/provider.go @@ -0,0 +1,30 @@ +package consensus + +import ( + "github.com/pokt-network/pocket/shared/modules" + "github.com/pokt-network/pocket/shared/modules/base_modules" +) + +var _ modules.CurrentHeightProvider = &consensusCurrentHeightProvider{} + +type consensusCurrentHeightProvider struct { + base_modules.IntegrableModule +} + +func Create(bus modules.Bus) (modules.CurrentHeightProvider, error) { + return new(consensusCurrentHeightProvider).Create(bus) +} + +func (*consensusCurrentHeightProvider) Create(bus modules.Bus) (modules.CurrentHeightProvider, error) { + consCHP := &consensusCurrentHeightProvider{} + bus.RegisterModule(consCHP) + return consCHP, nil +} + +func (consCHP *consensusCurrentHeightProvider) GetModuleName() string { + return modules.CurrentHeightProviderSubmoduleName +} + +func (consCHP *consensusCurrentHeightProvider) CurrentHeight() uint64 { + return consCHP.GetBus().GetConsensusModule().CurrentHeight() +} diff --git a/p2p/providers/current_height_provider/current_height_provider.go b/p2p/providers/current_height_provider/current_height_provider.go deleted file mode 100644 index 953577fa2..000000000 --- a/p2p/providers/current_height_provider/current_height_provider.go +++ /dev/null @@ -1,14 +0,0 @@ -package current_height_provider - -//go:generate mockgen -package=mock_types -destination=../../types/mocks/current_height_provider_mock.go github.com/pokt-network/pocket/p2p/providers/current_height_provider CurrentHeightProvider - -import "github.com/pokt-network/pocket/shared/modules" - -const ModuleName = "current_height_provider" - -type CurrentHeightProvider interface { - modules.IntegrableModule - modules.InterruptableModule - - CurrentHeight() uint64 -} diff --git a/p2p/providers/current_height_provider/rpc/provider.go b/p2p/providers/current_height_provider/rpc/provider.go index 9c569c988..fbbe39905 100644 --- a/p2p/providers/current_height_provider/rpc/provider.go +++ b/p2p/providers/current_height_provider/rpc/provider.go @@ -7,40 +7,55 @@ import ( "time" "github.com/pokt-network/pocket/app/client/cli/flags" - "github.com/pokt-network/pocket/p2p/providers/current_height_provider" "github.com/pokt-network/pocket/rpc" "github.com/pokt-network/pocket/shared/modules" "github.com/pokt-network/pocket/shared/modules/base_modules" ) -var _ current_height_provider.CurrentHeightProvider = &rpcCurrentHeightProvider{} +var _ modules.CurrentHeightProvider = &rpcCurrentHeightProvider{} type rpcCurrentHeightProvider struct { base_modules.IntegrableModule - base_modules.InterruptableModule rpcURL string rpcClient *rpc.ClientWithResponses } -func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) { +func Create( + bus modules.Bus, + options ...modules.CurrentHeightProviderOption, +) (modules.CurrentHeightProvider, error) { return new(rpcCurrentHeightProvider).Create(bus, options...) } // Create implements current_height_provider.CurrentHeightProvider -func (*rpcCurrentHeightProvider) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) { - return NewRPCCurrentHeightProvider(options...), nil +func (*rpcCurrentHeightProvider) Create( + bus modules.Bus, + options ...modules.CurrentHeightProviderOption, +) (modules.CurrentHeightProvider, error) { + rpcHeightProvider := &rpcCurrentHeightProvider{ + rpcURL: flags.RemoteCLIURL, + } + bus.RegisterModule(rpcHeightProvider) + + for _, o := range options { + o(rpcHeightProvider) + } + + rpcHeightProvider.initRPCClient() + + return rpcHeightProvider, nil } // GetModuleName implements current_height_provider.CurrentHeightProvider func (*rpcCurrentHeightProvider) GetModuleName() string { - return current_height_provider.ModuleName + return modules.CurrentHeightProviderSubmoduleName } -func (rchp *rpcCurrentHeightProvider) CurrentHeight() uint64 { +func (rpcCHP *rpcCurrentHeightProvider) CurrentHeight() uint64 { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - response, err := rchp.rpcClient.GetV1ConsensusStateWithResponse(ctx) + response, err := rpcCHP.rpcClient.GetV1ConsensusStateWithResponse(ctx) if err != nil { cancel() log.Fatalf("could not get consensus state from RPC: %v", err) @@ -54,33 +69,19 @@ func (rchp *rpcCurrentHeightProvider) CurrentHeight() uint64 { return uint64(response.JSONDefault.Height) } -func NewRPCCurrentHeightProvider(options ...modules.ModuleOption) *rpcCurrentHeightProvider { - rchp := &rpcCurrentHeightProvider{ - rpcURL: flags.RemoteCLIURL, - } - - for _, o := range options { - o(rchp) - } - - rchp.initRPCClient() - - return rchp -} - -func (rchp *rpcCurrentHeightProvider) initRPCClient() { - rpcClient, err := rpc.NewClientWithResponses(rchp.rpcURL) +func (rpcCHP *rpcCurrentHeightProvider) initRPCClient() { + rpcClient, err := rpc.NewClientWithResponses(rpcCHP.rpcURL) if err != nil { log.Fatalf("could not create RPC client: %v", err) } - rchp.rpcClient = rpcClient + rpcCHP.rpcClient = rpcClient } // options // WithCustomRPCURL allows to specify a custom RPC URL -func WithCustomRPCURL(rpcURL string) modules.ModuleOption { - return func(rabp modules.InjectableModule) { - rabp.(*rpcCurrentHeightProvider).rpcURL = rpcURL +func WithCustomRPCURL(rpcURL string) modules.CurrentHeightProviderOption { + return func(chp modules.CurrentHeightProvider) { + chp.(*rpcCurrentHeightProvider).rpcURL = rpcURL } } diff --git a/p2p/providers/providers.go b/p2p/providers/providers.go index 7d12a3295..18294eaa9 100644 --- a/p2p/providers/providers.go +++ b/p2p/providers/providers.go @@ -1,9 +1,7 @@ package providers import ( - "github.com/pokt-network/pocket/p2p/providers/current_height_provider" "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" ) type PeerstoreProvider = peerstore_provider.PeerstoreProvider -type CurrentHeightProvider = current_height_provider.CurrentHeightProvider diff --git a/p2p/raintree/peers_manager_test.go b/p2p/raintree/peers_manager_test.go index 9e3fa2502..37b39ad33 100644 --- a/p2p/raintree/peers_manager_test.go +++ b/p2p/raintree/peers_manager_test.go @@ -18,8 +18,10 @@ import ( "github.com/pokt-network/pocket/p2p/config" typesP2P "github.com/pokt-network/pocket/p2p/types" mocksP2P "github.com/pokt-network/pocket/p2p/types/mocks" + "github.com/pokt-network/pocket/runtime" "github.com/pokt-network/pocket/runtime/configs" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" + "github.com/pokt-network/pocket/shared/modules" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" ) @@ -93,22 +95,21 @@ func TestRainTree_Peerstore_HandleUpdate(t *testing.T) { }) require.NoError(t, err) - mockBus := mockBus(ctrl) - pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) - currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0) + mockBus := mockBus(ctrl, pstore) + mockBus.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) { + m.SetBus(mockBus) + }).AnyTimes() libp2pMockNet, err := mocknet.WithNPeers(1) require.NoError(t, err) rtCfg := &config.RainTreeConfig{ - Host: libp2pMockNet.Hosts()[0], - Addr: pubKey.Address(), - PeerstoreProvider: pstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: libp2pMockNet.Hosts()[0], + Addr: pubKey.Address(), + Handler: noopHandler, } - router, err := NewRainTreeRouter(mockBus, rtCfg) + router, err := Create(mockBus, rtCfg) require.NoError(t, err) rainTree := router.(*rainTreeRouter) @@ -142,7 +143,7 @@ func BenchmarkPeerstoreUpdates(b *testing.B) { // {1000000000, 19}, } - // the test will add this arbitrary number of addresses after the initial initialization (done via NewRainTreeRouter) + // the test will add this arbitrary number of addresses after the initial initialization (done via Create) // this is to add extra subsequent work that -should- grow linearly and it's actually going to test AddressBook updates // not simply initializations. numAddressesToBeAdded := 1000 @@ -158,9 +159,7 @@ func BenchmarkPeerstoreUpdates(b *testing.B) { }) require.NoError(b, err) - mockBus := mockBus(ctrl) - pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) - currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0) + mockBus := mockBus(ctrl, pstore) libp2pPStore, err := pstoremem.NewPeerstore() require.NoError(b, err) @@ -169,14 +168,12 @@ func BenchmarkPeerstoreUpdates(b *testing.B) { hostMock.EXPECT().Peerstore().Return(libp2pPStore).AnyTimes() rtCfg := &config.RainTreeConfig{ - Host: hostMock, - Addr: pubKey.Address(), - PeerstoreProvider: pstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: hostMock, + Addr: pubKey.Address(), + Handler: noopHandler, } - router, err := NewRainTreeRouter(mockBus, rtCfg) + router, err := Create(mockBus, rtCfg) require.NoError(b, err) rainTree := router.(*rainTreeRouter) @@ -272,7 +269,13 @@ func TestRainTree_MessageTargets_TwentySevenNodes(t *testing.T) { func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeMessageProp) { ctrl := gomock.NewController(t) + modulesRegistry := runtime.NewModulesRegistry() busMock := mockModules.NewMockBus(ctrl) + busMock.EXPECT().GetModulesRegistry().Return(modulesRegistry).AnyTimes() + busMock.EXPECT().RegisterModule(gomock.Any()).Do(func(m modules.Submodule) { + modulesRegistry.RegisterModule(m) + m.SetBus(busMock) + }).AnyTimes() consensusMock := mockModules.NewMockConsensusModule(ctrl) consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes() busMock.EXPECT().GetConsensusModule().Return(consensusMock).AnyTimes() @@ -283,10 +286,16 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM runtimeMgrMock.EXPECT().GetConfig().Return(configs.NewDefaultConfig()).AnyTimes() mockAlphabetValidatorServiceURLsDNS(t) + + // TECHDEBT(#810): simplify once `bus.GetPeerstoreProvider()` is available. pstore := getAlphabetPeerstore(t, expectedMsgProp.numNodes) pstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) + busMock.RegisterModule(pstoreProviderMock) + currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 1) + busMock.EXPECT().GetCurrentHeightProvider().Return(currentHeightProviderMock).AnyTimes() + libp2pPStore, err := pstoremem.NewPeerstore() require.NoError(t, err) @@ -296,14 +305,12 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM hostMock.EXPECT().ID().Return(libp2pPeer.ID("")).AnyTimes() rtCfg := &config.RainTreeConfig{ - Host: hostMock, - Addr: []byte{expectedMsgProp.orig}, - PeerstoreProvider: pstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: hostMock, + Addr: []byte{expectedMsgProp.orig}, + Handler: noopHandler, } - router, err := NewRainTreeRouter(busMock, rtCfg) + router, err := Create(busMock, rtCfg) require.NoError(t, err) rainTree := router.(*rainTreeRouter) diff --git a/p2p/raintree/peerstore_utils.go b/p2p/raintree/peerstore_utils.go index c8d10b6b1..aab31a5ba 100644 --- a/p2p/raintree/peerstore_utils.go +++ b/p2p/raintree/peerstore_utils.go @@ -1,8 +1,11 @@ package raintree import ( + "fmt" "math" "strconv" + + "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" ) // Refer to the P2P specification for a formal description and proof of how the constants are selected @@ -17,9 +20,17 @@ const ( func (rtr *rainTreeRouter) getPeerstoreSize(level uint32, height uint64) int { peersView, maxNumLevels := rtr.peersManager.getPeersViewWithLevels() + // TECHDEBT(#810, 811): use `bus.GetPeerstoreProvider()` instead once available. + pstoreProvider, err := rtr.getPeerstoreProvider() + if err != nil { + // Should never happen; enforced by a `rtr.getPeerstoreProvider()` call + // & error handling in `rtr.broadcastAtLevel()`. + panic(fmt.Sprintf("Error retrieving peerstore provider: %s", err.Error())) + } + // if we are propagating a message from a previous height, we need to instantiate an ephemeral rainTreePeersManager (without add/remove) - if height < rtr.currentHeightProvider.CurrentHeight() { - peersMgr, err := newPeersManagerWithPeerstoreProvider(rtr.selfAddr, rtr.pstoreProvider, height) + if height < rtr.GetBus().GetCurrentHeightProvider().CurrentHeight() { + peersMgr, err := newPeersManagerWithPeerstoreProvider(rtr.selfAddr, pstoreProvider, height) if err != nil { rtr.logger.Fatal().Err(err).Msg("Error initializing rainTreeRouter rainTreePeersManager") } @@ -30,9 +41,24 @@ func (rtr *rainTreeRouter) getPeerstoreSize(level uint32, height uint64) int { return int(float64(len(peersView.GetAddrs())) * (shrinkageCoefficient)) } +// TECHDEBT(#810, 811): replace with `bus.GetPeerstoreProvider()` once available. +func (rtr *rainTreeRouter) getPeerstoreProvider() (peerstore_provider.PeerstoreProvider, error) { + pstoreProviderModule, err := rtr.GetBus().GetModulesRegistry(). + GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) + if err != nil { + return nil, err + } + + pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider) + if !ok { + return nil, fmt.Errorf("unexpected peerstore provider module type: %T", pstoreProviderModule) + } + return pstoreProvider, nil +} + // getTargetsAtLevel returns the targets for a given level func (rtr *rainTreeRouter) getTargetsAtLevel(level uint32) []target { - height := rtr.currentHeightProvider.CurrentHeight() + height := rtr.GetBus().GetCurrentHeightProvider().CurrentHeight() pstoreSizeAtHeight := rtr.getPeerstoreSize(level, height) firstTarget := rtr.getTarget(firstMsgTargetPercentage, pstoreSizeAtHeight, level) secondTarget := rtr.getTarget(secondMsgTargetPercentage, pstoreSizeAtHeight, level) diff --git a/p2p/raintree/router.go b/p2p/raintree/router.go index a96c52665..90ff95bc4 100644 --- a/p2p/raintree/router.go +++ b/p2p/raintree/router.go @@ -9,8 +9,6 @@ import ( "github.com/pokt-network/pocket/logger" "github.com/pokt-network/pocket/p2p/config" "github.com/pokt-network/pocket/p2p/protocol" - "github.com/pokt-network/pocket/p2p/providers" - "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/p2p/unicast" "github.com/pokt-network/pocket/p2p/utils" @@ -43,34 +41,39 @@ type rainTreeRouter struct { // (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme) host libp2pHost.Host // selfAddr is the pocket address representing this host. - selfAddr cryptoPocket.Address - peersManager *rainTreePeersManager - pstoreProvider peerstore_provider.PeerstoreProvider - currentHeightProvider providers.CurrentHeightProvider + selfAddr cryptoPocket.Address + peersManager *rainTreePeersManager } -func NewRainTreeRouter(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Router, error) { +func Create(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Router, error) { return new(rainTreeRouter).Create(bus, cfg) } -func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (typesP2P.Router, error) { +func (*rainTreeRouter) Create( + bus modules.Bus, + cfg *config.RainTreeConfig, +) (typesP2P.Router, error) { rainTreeLogger := logger.Global.CreateLoggerForModule("rainTreeRouter") if err := cfg.IsValid(); err != nil { return nil, err } rtr := &rainTreeRouter{ - host: cfg.Host, - selfAddr: cfg.Addr, - pstoreProvider: cfg.PeerstoreProvider, - currentHeightProvider: cfg.CurrentHeightProvider, - logger: rainTreeLogger, - handler: cfg.Handler, + host: cfg.Host, + selfAddr: cfg.Addr, + logger: rainTreeLogger, + handler: cfg.Handler, } - rtr.SetBus(bus) + bus.RegisterModule(rtr) - height := rtr.currentHeightProvider.CurrentHeight() - pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(height) + currentHeightProvider := bus.GetCurrentHeightProvider() + pstoreProvider, err := rtr.getPeerstoreProvider() + if err != nil { + return nil, err + } + + height := currentHeightProvider.CurrentHeight() + pstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(height) if err != nil { return nil, fmt.Errorf("getting staked peerstore at height %d: %w", height, err) } @@ -82,7 +85,7 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type "peerstore_size": pstore.Size(), }).Msg("initializing raintree router") - if err := rtr.setupDependencies(); err != nil { + if err := rtr.setupDependencies(pstore); err != nil { return nil, err } @@ -93,6 +96,10 @@ func (rtr *rainTreeRouter) Close() error { return nil } +func (rtr *rainTreeRouter) GetModuleName() string { + return typesP2P.StakedActorRouterSubmoduleName +} + // NetworkBroadcast implements the respective member of `typesP2P.Router`. func (rtr *rainTreeRouter) Broadcast(data []byte) error { return rtr.broadcastAtLevel(data, rtr.peersManager.GetMaxNumLevels()) @@ -115,6 +122,13 @@ func (rtr *rainTreeRouter) broadcastAtLevel(data []byte, level uint32) error { return err } + // TECHDEBT(#810, #811): remove once `bus.GetPeerstoreProvider()` is available. + // Pre-handling the error from `rtr.getPeerstoreProvider()` before it is called + // downstream in a context without an error return value. + if _, err = rtr.getPeerstoreProvider(); err != nil { + return err + } + for _, target := range rtr.getTargetsAtLevel(level) { if shouldSendToTarget(target) { if err = rtr.sendInternal(msgBz, target.address); err != nil { @@ -318,16 +332,11 @@ func (rtr *rainTreeRouter) setupUnicastRouter() error { return nil } -func (rtr *rainTreeRouter) setupDependencies() error { +func (rtr *rainTreeRouter) setupDependencies(pstore typesP2P.Peerstore) error { if err := rtr.setupUnicastRouter(); err != nil { return err } - pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight()) - if err != nil { - return fmt.Errorf("getting staked peerstore: %w", err) - } - if err := rtr.setupPeerManager(pstore); err != nil { return fmt.Errorf("setting up peer manager: %w", err) } diff --git a/p2p/raintree/router_test.go b/p2p/raintree/router_test.go index 2865a584b..20567775b 100644 --- a/p2p/raintree/router_test.go +++ b/p2p/raintree/router_test.go @@ -9,12 +9,14 @@ import ( libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto" libp2pHost "github.com/libp2p/go-libp2p/core/host" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" + "github.com/pokt-network/pocket/p2p/config" typesP2P "github.com/pokt-network/pocket/p2p/types" "github.com/pokt-network/pocket/p2p/utils" "github.com/pokt-network/pocket/runtime/defaults" cryptoPocket "github.com/pokt-network/pocket/shared/crypto" - "github.com/stretchr/testify/require" + "github.com/pokt-network/pocket/shared/modules" ) // TECHDEBT(#609): move & de-dup. @@ -48,19 +50,18 @@ func TestRainTreeRouter_AddPeer(t *testing.T) { require.NoError(t, err) expectedPStoreSize++ - busMock := mockBus(ctrl) - peerstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) - currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0) + busMock := mockBus(ctrl, pstore) + busMock.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) { + m.SetBus(busMock) + }).AnyTimes() rtCfg := &config.RainTreeConfig{ - Host: host, - Addr: selfAddr, - PeerstoreProvider: peerstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: host, + Addr: selfAddr, + Handler: noopHandler, } - router, err := NewRainTreeRouter(busMock, rtCfg) + router, err := Create(busMock, rtCfg) require.NoError(t, err) rtRouter := router.(*rainTreeRouter) @@ -112,18 +113,14 @@ func TestRainTreeRouter_RemovePeer(t *testing.T) { require.NoError(t, err) expectedPStoreSize++ - busMock := mockBus(ctrl) - peerstoreProviderMock := mockPeerstoreProvider(ctrl, pstore) - currentHeightProviderMock := mockCurrentHeightProvider(ctrl, 0) + busMock := mockBus(ctrl, pstore) rtCfg := &config.RainTreeConfig{ - Host: host, - Addr: selfAddr, - PeerstoreProvider: peerstoreProviderMock, - CurrentHeightProvider: currentHeightProviderMock, - Handler: noopHandler, + Host: host, + Addr: selfAddr, + Handler: noopHandler, } - router, err := NewRainTreeRouter(busMock, rtCfg) + router, err := Create(busMock, rtCfg) require.NoError(t, err) rainTree := router.(*rainTreeRouter) diff --git a/p2p/raintree/utils_test.go b/p2p/raintree/utils_test.go index 82e99438a..f2502acfb 100644 --- a/p2p/raintree/utils_test.go +++ b/p2p/raintree/utils_test.go @@ -2,14 +2,24 @@ package raintree import ( "github.com/golang/mock/gomock" + "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" typesP2P "github.com/pokt-network/pocket/p2p/types" mocksP2P "github.com/pokt-network/pocket/p2p/types/mocks" "github.com/pokt-network/pocket/runtime/configs" + "github.com/pokt-network/pocket/shared/modules" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" ) -func mockBus(ctrl *gomock.Controller) *mockModules.MockBus { +// TECHDEBT(#796): refactor/de-dup & separate definitions of mocks from one another. +func mockBus(ctrl *gomock.Controller, pstore typesP2P.Peerstore) *mockModules.MockBus { + if pstore == nil { + pstore = &typesP2P.PeerAddrMap{} + } + busMock := mockModules.NewMockBus(ctrl) + busMock.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) { + m.SetBus(busMock) + }).AnyTimes() busMock.EXPECT().GetPersistenceModule().Return(nil).AnyTimes() consensusMock := mockModules.NewMockConsensusModule(ctrl) consensusMock.EXPECT().CurrentHeight().Return(uint64(0)).AnyTimes() @@ -17,17 +27,54 @@ func mockBus(ctrl *gomock.Controller) *mockModules.MockBus { runtimeMgrMock := mockModules.NewMockRuntimeMgr(ctrl) busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes() runtimeMgrMock.EXPECT().GetConfig().Return(configs.NewDefaultConfig()).AnyTimes() + + currentHeightProviderMock := mockModules.NewMockCurrentHeightProvider(ctrl) + currentHeightProviderMock.EXPECT().CurrentHeight().Return(uint64(0)).AnyTimes() + busMock.EXPECT(). + GetCurrentHeightProvider(). + Return(currentHeightProviderMock). + AnyTimes() + + peerstoreProviderMock := mocksP2P.NewMockPeerstoreProvider(ctrl) + peerstoreProviderMock.EXPECT(). + GetStakedPeerstoreAtHeight(gomock.Any()). + Return(pstore, nil). + AnyTimes() + + modulesRegistryMock := mockModules.NewMockModulesRegistry(ctrl) + modulesRegistryMock.EXPECT(). + GetModule(gomock.Eq(peerstore_provider.PeerstoreProviderSubmoduleName)). + Return(peerstoreProviderMock, nil). + AnyTimes() + modulesRegistryMock.EXPECT(). + GetModule(gomock.Eq(modules.CurrentHeightProviderSubmoduleName)). + Return(currentHeightProviderMock, nil). + AnyTimes() + busMock.EXPECT().GetModulesRegistry().Return(modulesRegistryMock).AnyTimes() + return busMock } -func mockPeerstoreProvider(ctrl *gomock.Controller, pstore typesP2P.Peerstore) *mocksP2P.MockPeerstoreProvider { +func mockPeerstoreProvider( + ctrl *gomock.Controller, + pstore typesP2P.Peerstore, +) *mocksP2P.MockPeerstoreProvider { peerstoreProviderMock := mocksP2P.NewMockPeerstoreProvider(ctrl) - peerstoreProviderMock.EXPECT().GetStakedPeerstoreAtHeight(gomock.Any()).Return(pstore, nil).AnyTimes() + peerstoreProviderMock.EXPECT().SetBus(gomock.Any()).AnyTimes() + peerstoreProviderMock.EXPECT().GetBus().AnyTimes() + peerstoreProviderMock.EXPECT(). + GetStakedPeerstoreAtHeight(gomock.Any()). + Return(pstore, nil). + AnyTimes() + peerstoreProviderMock.EXPECT(). + GetModuleName(). + Return(peerstore_provider.PeerstoreProviderSubmoduleName). + AnyTimes() return peerstoreProviderMock } -func mockCurrentHeightProvider(ctrl *gomock.Controller, height uint64) *mocksP2P.MockCurrentHeightProvider { - currentHeightProviderMock := mocksP2P.NewMockCurrentHeightProvider(ctrl) +func mockCurrentHeightProvider(ctrl *gomock.Controller, height uint64) *mockModules.MockCurrentHeightProvider { + currentHeightProviderMock := mockModules.NewMockCurrentHeightProvider(ctrl) currentHeightProviderMock.EXPECT().CurrentHeight().Return(height).AnyTimes() return currentHeightProviderMock } diff --git a/p2p/transport_encryption_test.go b/p2p/transport_encryption_test.go index 799340a17..0d88f7607 100644 --- a/p2p/transport_encryption_test.go +++ b/p2p/transport_encryption_test.go @@ -64,6 +64,14 @@ func TestP2pModule_RainTreeRouter_Insecure_Error(t *testing.T) { persistenceMock := preparePersistenceMock(t, busMock, genesisStateMock) busMock.EXPECT().GetPersistenceModule().Return(persistenceMock).AnyTimes() + currentHeightProviderMock := prepareCurrentHeightProviderMock(t, busMock) + busMock.RegisterModule(currentHeightProviderMock) + busMock.EXPECT().GetCurrentHeightProvider().Return(currentHeightProviderMock).AnyTimes() + + pstore := new(typesP2P.PeerAddrMap) + pstoreProviderMock := preparePeerstoreProviderMock(t, busMock, pstore) + busMock.RegisterModule(pstoreProviderMock) + telemetryMock.EXPECT().GetBus().Return(busMock).AnyTimes() telemetryMock.EXPECT().SetBus(busMock).AnyTimes() diff --git a/p2p/types/router.go b/p2p/types/router.go index c3776a114..1ac5ad39c 100644 --- a/p2p/types/router.go +++ b/p2p/types/router.go @@ -7,10 +7,25 @@ import ( "github.com/pokt-network/pocket/shared/modules" ) -// TECHDEBT(olshansky): When we delete `stdnetwork` and only go with `raintree`, this interface -// can be simplified greatly. +// TECHDEBT(#880, #811): move these definitions to /shared/modules & /shared/modules/types packages. +// `Peerstore` would have to be moved as well which may create an import cycle. + +// NOTE: this is the first case in the codebase where we need multiple +// instances of a submodule to be dependency-injectable. +// +// CONSIDERATION: this is inconsistent with existing submodule "name" naming +// conventions as the "name" doesn't match the name of the interface. This is +// implied by the fact above, as two distinct "names" are needed to disambiguate +// in the module registry. These names are also distinct from the names of the +// respective `Router` implementations; our reasoning is that these names better +// reflect the separation of concerns from the P2P module's perspective. +const ( + StakedActorRouterSubmoduleName = "staked_actor_router" + UnstakedActorRouterSubmoduleName = "unstaked_actor_router" +) + type Router interface { - modules.IntegrableModule + modules.Submodule Broadcast(data []byte) error Send(data []byte, address cryptoPocket.Address) error diff --git a/p2p/utils_test.go b/p2p/utils_test.go index 678f2f2b8..43222a0bb 100644 --- a/p2p/utils_test.go +++ b/p2p/utils_test.go @@ -17,7 +17,6 @@ import ( "github.com/stretchr/testify/require" "github.com/pokt-network/pocket/internal/testutil" - "github.com/pokt-network/pocket/p2p/providers/current_height_provider" "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" typesP2P "github.com/pokt-network/pocket/p2p/types" mock_types "github.com/pokt-network/pocket/p2p/types/mocks" @@ -212,13 +211,14 @@ func createMockBus( ctrl := gomock.NewController(t) mockBus := mockModules.NewMockBus(ctrl) mockBus.EXPECT().GetRuntimeMgr().Return(runtimeMgr).AnyTimes() - mockBus.EXPECT().RegisterModule(gomock.Any()).DoAndReturn(func(m modules.Submodule) { - m.SetBus(mockBus) - }).AnyTimes() - mockModulesRegistry := mockModules.NewMockModulesRegistry(ctrl) - mockModulesRegistry.EXPECT().GetModule(peerstore_provider.PeerstoreProviderSubmoduleName).Return(nil, runtime.ErrModuleNotRegistered(peerstore_provider.PeerstoreProviderSubmoduleName)).AnyTimes() - mockModulesRegistry.EXPECT().GetModule(current_height_provider.ModuleName).Return(nil, runtime.ErrModuleNotRegistered(current_height_provider.ModuleName)).AnyTimes() - mockBus.EXPECT().GetModulesRegistry().Return(mockModulesRegistry).AnyTimes() + modulesRegistry := runtime.NewModulesRegistry() + mockBus.EXPECT(). + RegisterModule(gomock.Any()). + DoAndReturn(func(m modules.Submodule) { + modulesRegistry.RegisterModule(m) + m.SetBus(mockBus) + }).AnyTimes() + mockBus.EXPECT().GetModulesRegistry().Return(modulesRegistry).AnyTimes() mockBus.EXPECT().PublishEventToBus(gomock.AssignableToTypeOf(&messaging.PocketEnvelope{})). Do(func(envelope *messaging.PocketEnvelope) { fmt.Println("[valId: unknown] Read") @@ -279,6 +279,42 @@ func prepareConsensusMock(t *testing.T, busMock *mockModules.MockBus) *mockModul return consensusMock } +func prepareCurrentHeightProviderMock(t *testing.T, busMock *mockModules.MockBus) *mockModules.MockCurrentHeightProvider { + ctrl := gomock.NewController(t) + currentHeightProviderMock := mockModules.NewMockCurrentHeightProvider(ctrl) + currentHeightProviderMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes() + + currentHeightProviderMock.EXPECT().GetBus().Return(busMock).AnyTimes() + currentHeightProviderMock.EXPECT().SetBus(busMock).AnyTimes() + currentHeightProviderMock.EXPECT().GetModuleName(). + Return(modules.CurrentHeightProviderSubmoduleName). + AnyTimes() + busMock.RegisterModule(currentHeightProviderMock) + + return currentHeightProviderMock +} + +func preparePeerstoreProviderMock( + t *testing.T, + busMock *mockModules.MockBus, + pstore typesP2P.Peerstore, +) *mock_types.MockPeerstoreProvider { + ctrl := gomock.NewController(t) + peerstoreProviderMock := mock_types.NewMockPeerstoreProvider(ctrl) + peerstoreProviderMock.EXPECT(). + GetStakedPeerstoreAtHeight(gomock.Any()). + Return(pstore, nil). + AnyTimes() + + peerstoreProviderMock.EXPECT().GetBus().Return(busMock).AnyTimes() + peerstoreProviderMock.EXPECT().SetBus(busMock).AnyTimes() + peerstoreProviderMock.EXPECT().GetModuleName(). + Return(peerstore_provider.PeerstoreProviderSubmoduleName). + AnyTimes() + + return peerstoreProviderMock +} + // Persistence mock - only needed for validatorMap access func preparePersistenceMock(t *testing.T, busMock *mockModules.MockBus, genesisState *genesis.GenesisState) *mockModules.MockPersistenceModule { ctrl := gomock.NewController(t) diff --git a/runtime/bus.go b/runtime/bus.go index 12d95e007..9bf4fc3f2 100644 --- a/runtime/bus.go +++ b/runtime/bus.go @@ -135,6 +135,10 @@ func (m *bus) GetBulkStoreCacher() modules.BulkStoreCacher { return getModuleFromRegistry[modules.BulkStoreCacher](m, modules.BulkStoreCacherModuleName) } +func (m *bus) GetCurrentHeightProvider() modules.CurrentHeightProvider { + return getModuleFromRegistry[modules.CurrentHeightProvider](m, modules.CurrentHeightProviderSubmoduleName) +} + // getModuleFromRegistry is a helper function to get a module from the registry that handles errors and casting via generics func getModuleFromRegistry[T modules.Submodule](m *bus, moduleName string) T { mod, err := m.modulesRegistry.GetModule(moduleName) diff --git a/shared/modules/bus_module.go b/shared/modules/bus_module.go index 28d4bb252..50b2e23f5 100644 --- a/shared/modules/bus_module.go +++ b/shared/modules/bus_module.go @@ -34,6 +34,9 @@ type Bus interface { GetStateMachineModule() StateMachineModule GetIBCModule() IBCModule + // Pocket submodules + GetCurrentHeightProvider() CurrentHeightProvider + // Runtime GetRuntimeMgr() RuntimeMgr diff --git a/shared/modules/current_height_provider_submodule.go b/shared/modules/current_height_provider_submodule.go new file mode 100644 index 000000000..7f83408a5 --- /dev/null +++ b/shared/modules/current_height_provider_submodule.go @@ -0,0 +1,13 @@ +package modules + +//go:generate mockgen -package=mock_modules -destination=./mocks/current_height_provider_submodule_mock.go github.com/pokt-network/pocket/shared/modules CurrentHeightProvider + +const CurrentHeightProviderSubmoduleName = "current_height_provider" + +type CurrentHeightProvider interface { + Submodule + + CurrentHeight() uint64 +} + +type CurrentHeightProviderOption func(CurrentHeightProvider)