Skip to content

Commit

Permalink
[P2P] Refactor P2P submodules (#895)
Browse files Browse the repository at this point in the history
## @Reviewer
This PR may be more digestible / reviewable on a commit-by-commit basis.
Commits are organized logically and any given line is only modified in a
single commit, with few exceptions*.

*(In the interest of preserving the git-time-continuum
:police_officer::rotating_light:, this applies in batches of commits
between comments or reviews *by humans*, only once "in review")

---

## Description

Refactor P2P module dependencies as submodules, ultimately to support
usage P2P module usage in the CLI.

## Issue

Related:

- #730 

Dependant(s):

- #891 
- #801 
- #892 

## Type of change

Please mark the relevant option(s):

- [ ] New feature, functionality or library
- [ ] Bug fix
- [x] Code health or cleanup
- [ ] Major breaking change
- [ ] Documentation
- [ ] Other <!-- add details here if it a different type of change -->

## List of changes

- decoupled P2P module dependencies from Consensus module (disambiguates
modules registry names)
  - added "current_height_provider" module registry slot
  - promoted `CurrentHeightProvider` to a submodule interface type
  - added `consensusCurrentHeightProvider` implementation
- promoted `Router` interface to a submodule interface type
  - added "staked_actor_router" modules registry slot
  - added "unstaked_actor_router" modules registry slot
  - converted `backgroundRouter` implementation to submodule
  - converted `rainTreeRouter` implementation to submodule
  - simplified router base config

## Testing

- [x] `make develop_test`; if any code changes were made
- [x] `make test_e2e` on [k8s
LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md);
if any code changes were made
- [x] `e2e-devnet-test` passes tests on
[DevNet](https://pocketnetwork.notion.site/How-to-DevNet-ff1598f27efe44c09f34e2aa0051f0dd);
if any code was changed
- [ ] [Docker Compose
LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md);
if any major functionality was changed or introduced
- [x] [k8s
LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md);
if any infrastructure or configuration changes were made

<!-- REMOVE this comment block after following the instructions
 If you added additional tests or infrastructure, describe it here.
 Bonus points for images and videos or gifs.
-->

## Required Checklist

- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have added, or updated, [`godoc` format
comments](https://go.dev/blog/godoc) on touched members (see:
[tip.golang.org/doc/comment](https://tip.golang.org/doc/comment))
- [x] I have tested my changes using the available tooling
- [ ] I have updated the corresponding CHANGELOG

### If Applicable Checklist

- [ ] I have updated the corresponding README(s); local and/or global
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added, or updated,
[mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding
README(s)
- [ ] I have added, or updated, documentation and
[mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*`
if I updated `shared/*`README(s)

---------

Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
bryanchriswhite and Olshansk authored Jul 13, 2023
1 parent a3a2b5c commit 41c3d32
Show file tree
Hide file tree
Showing 26 changed files with 483 additions and 279 deletions.
11 changes: 3 additions & 8 deletions app/client/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/pokt-network/pocket/app/client/cli/helpers"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/messaging"
Expand Down Expand Up @@ -227,19 +226,15 @@ func fetchPeerstore(cmd *cobra.Command) (typesP2P.Peerstore, error) {
if !ok || bus == nil {
return nil, errors.New("retrieving bus from CLI context")
}
modulesRegistry := bus.GetModulesRegistry()
// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider
// is retrievable as a proper submodule
pstoreProvider, err := modulesRegistry.GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
pstoreProvider, err := bus.GetModulesRegistry().GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
if err != nil {
return nil, errors.New("retrieving peerstore provider")
}
currentHeightProvider, err := modulesRegistry.GetModule(current_height_provider.ModuleName)
if err != nil {
return nil, errors.New("retrieving currentHeightProvider")
}
currentHeightProvider := bus.GetCurrentHeightProvider()

height := currentHeightProvider.(current_height_provider.CurrentHeightProvider).CurrentHeight()
height := currentHeightProvider.CurrentHeight()
pstore, err := pstoreProvider.(peerstore_provider.PeerstoreProvider).GetStakedPeerstoreAtHeight(height)
if err != nil {
return nil, fmt.Errorf("retrieving peerstore at height %d", height)
Expand Down
23 changes: 15 additions & 8 deletions app/client/cli/helpers/setup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package helpers

import (
"fmt"

"github.com/spf13/cobra"

"github.com/pokt-network/pocket/app/client/cli/flags"
Expand Down Expand Up @@ -30,7 +32,11 @@ func P2PDependenciesPreRunE(cmd *cobra.Command, _ []string) error {
if err := setupPeerstoreProvider(*runtimeMgr, flags.RemoteCLIURL); err != nil {
return err
}
setupCurrentHeightProvider(*runtimeMgr, flags.RemoteCLIURL)

if err := setupRPCCurrentHeightProvider(*runtimeMgr, flags.RemoteCLIURL); err != nil {
return err
}

setupAndStartP2PModule(*runtimeMgr)

return nil
Expand All @@ -44,15 +50,16 @@ func setupPeerstoreProvider(rm runtime.Manager, rpcURL string) error {
return nil
}

func setupCurrentHeightProvider(rm runtime.Manager, rpcURL string) {
// TECHDEBT(#810): simplify after current height provider is refactored as
// a submodule.
bus := rm.GetBus()
modulesRegistry := bus.GetModulesRegistry()
currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(
func setupRPCCurrentHeightProvider(rm runtime.Manager, rpcURL string) error {
// Ensure `CurrentHeightProvider` exists in the modules registry.
_, err := rpcCHP.Create(
rm.GetBus(),
rpcCHP.WithCustomRPCURL(rpcURL),
)
modulesRegistry.RegisterModule(currentHeightProvider)
if err != nil {
return fmt.Errorf("setting up current height provider: %w", err)
}
return nil
}

func setupAndStartP2PModule(rm runtime.Manager) {
Expand Down
6 changes: 6 additions & 0 deletions consensus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
typesCons "github.com/pokt-network/pocket/consensus/types"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider/consensus"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/runtime/genesis"
"github.com/pokt-network/pocket/shared/codec"
Expand Down Expand Up @@ -132,6 +133,11 @@ func (*consensusModule) Create(bus modules.Bus, options ...modules.ModuleOption)

bus.RegisterModule(m)

// Ensure `CurrentHeightProvider` submodule is registered.
if _, err = consensus.Create(bus); err != nil {
return nil, fmt.Errorf("failed to create current height provider: %w", err)
}

runtimeMgr := bus.GetRuntimeMgr()

consensusCfg := runtimeMgr.GetConfig().Consensus
Expand Down
45 changes: 30 additions & 15 deletions p2p/background/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/providers"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/unicast"
"github.com/pokt-network/pocket/p2p/utils"
Expand All @@ -27,9 +28,8 @@ import (
)

var (
_ typesP2P.Router = &backgroundRouter{}
_ modules.IntegrableModule = &backgroundRouter{}
_ backgroundRouterFactory = &backgroundRouter{}
_ typesP2P.Router = &backgroundRouter{}
_ backgroundRouterFactory = &backgroundRouter{}
)

type backgroundRouterFactory = modules.FactoryWithConfig[typesP2P.Router, *config.BackgroundConfig]
Expand Down Expand Up @@ -93,7 +93,7 @@ func (*backgroundRouter) Create(bus modules.Bus, cfg *config.BackgroundConfig) (
host: cfg.Host,
cancelReadSubscription: cancel,
}
rtr.SetBus(bus)
bus.RegisterModule(rtr)

bgRouterLogger.Info().Fields(map[string]any{
"host_id": cfg.Host.ID(),
Expand Down Expand Up @@ -127,6 +127,11 @@ func (rtr *backgroundRouter) Close() error {
)
}

// GetModuleName implements the respective `modules.Integrable` interface method.
func (rtr *backgroundRouter) GetModuleName() string {
return typesP2P.UnstakedActorRouterSubmoduleName
}

// Broadcast implements the respective `typesP2P.Router` interface method.
func (rtr *backgroundRouter) Broadcast(pocketEnvelopeBz []byte) error {
backgroundMsg := &typesP2P.BackgroundMessage{
Expand Down Expand Up @@ -215,7 +220,8 @@ func (rtr *backgroundRouter) setupUnicastRouter() error {
return nil
}

func (rtr *backgroundRouter) setupDependencies(ctx context.Context, cfg *config.BackgroundConfig) error {
// TECHBEDT(#810,#811): remove unused `BackgroundConfig`
func (rtr *backgroundRouter) setupDependencies(ctx context.Context, _ *config.BackgroundConfig) error {
// NB: The order in which the internal components are setup below is important
if err := rtr.setupUnicastRouter(); err != nil {
return err
Expand All @@ -237,21 +243,30 @@ func (rtr *backgroundRouter) setupDependencies(ctx context.Context, cfg *config.
return fmt.Errorf("setting up subscription: %w", err)
}

if err := rtr.setupPeerstore(
ctx,
cfg.PeerstoreProvider,
cfg.CurrentHeightProvider,
); err != nil {
if err := rtr.setupPeerstore(ctx); err != nil {
return fmt.Errorf("setting up peerstore: %w", err)
}
return nil
}

func (rtr *backgroundRouter) setupPeerstore(
ctx context.Context,
pstoreProvider providers.PeerstoreProvider,
currentHeightProvider providers.CurrentHeightProvider,
) (err error) {
func (rtr *backgroundRouter) setupPeerstore(ctx context.Context) (err error) {
rtr.logger.Warn().Msg("setting up peerstore...")

// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider
// is retrievable as a proper submodule
pstoreProviderModule, err := rtr.GetBus().GetModulesRegistry().
GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
if err != nil {
return fmt.Errorf("retrieving peerstore provider: %w", err)
}
pstoreProvider, ok := pstoreProviderModule.(providers.PeerstoreProvider)
if !ok {
return fmt.Errorf("unexpected peerstore provider type: %T", pstoreProviderModule)
}

rtr.logger.Debug().Msg("setupCurrentHeightProvider")
currentHeightProvider := rtr.GetBus().GetCurrentHeightProvider()

// seed initial peerstore with current on-chain peer info (i.e. staked actors)
rtr.pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(
currentHeightProvider.CurrentHeight(),
Expand Down
31 changes: 22 additions & 9 deletions p2p/background/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
mock_types "github.com/pokt-network/pocket/p2p/types/mocks"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/runtime"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/runtime/defaults"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
)

Expand Down Expand Up @@ -399,6 +402,7 @@ func newTestRouter(
return newRouterWithSelfPeerAndHost(t, selfPeer, host, handler)
}

// TECHDEBT(#796): de-dup & refactor
func newRouterWithSelfPeerAndHost(
t *testing.T,
selfPeer typesP2P.Peer,
Expand All @@ -415,16 +419,27 @@ func newRouterWithSelfPeerAndHost(
},
}).AnyTimes()

modulesRegistry := runtime.NewModulesRegistry()
busMock := mockModules.NewMockBus(ctrl)
busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes()
busMock.EXPECT().GetModulesRegistry().Return(modulesRegistry).AnyTimes()
busMock.EXPECT().RegisterModule(gomock.Any()).Do(func(m modules.Submodule) {
modulesRegistry.RegisterModule(m)
m.SetBus(busMock)
}).AnyTimes()

consensusMock := mockModules.NewMockConsensusModule(ctrl)
consensusMock.EXPECT().GetModuleName().Return(modules.ConsensusModuleName).AnyTimes()
consensusMock.EXPECT().GetBus().Return(busMock).AnyTimes()
consensusMock.EXPECT().SetBus(gomock.Any()).AnyTimes()
consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes()
busMock.EXPECT().GetCurrentHeightProvider().Return(consensusMock).AnyTimes()

pstore := make(typesP2P.PeerAddrMap)
pstoreProviderMock := mock_types.NewMockPeerstoreProvider(ctrl)
pstoreProviderMock.EXPECT().GetModuleName().Return(peerstore_provider.PeerstoreProviderSubmoduleName)
pstoreProviderMock.EXPECT().GetStakedPeerstoreAtHeight(gomock.Any()).Return(pstore, nil).AnyTimes()

busMock := mockModules.NewMockBus(ctrl)
busMock.EXPECT().GetConsensusModule().Return(consensusMock).AnyTimes()
busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes()
modulesRegistry.RegisterModule(pstoreProviderMock)

err := pstore.AddPeer(selfPeer)
require.NoError(t, err)
Expand All @@ -434,11 +449,9 @@ func newRouterWithSelfPeerAndHost(
}

router, err := Create(busMock, &config.BackgroundConfig{
Addr: selfPeer.GetAddress(),
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: consensusMock,
Host: host,
Handler: handler,
Addr: selfPeer.GetAddress(),
Host: host,
Handler: handler,
})
require.NoError(t, err)

Expand Down
9 changes: 8 additions & 1 deletion p2p/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@ func (m *p2pModule) bootstrap() error {
return fmt.Errorf("creating RPC peerstore provider: %w", err)
}

currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(rpcCHP.WithCustomRPCURL(bootstrapNode))
currentHeightProvider, err := rpcCHP.Create(
m.GetBus(),
rpcCHP.WithCustomRPCURL(bootstrapNode),
)
if err != nil {
m.logger.Warn().Err(err).Str("endpoint", bootstrapNode).Msg("Error getting current height from bootstrap node")
continue
}

pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(currentHeightProvider.CurrentHeight())
if err != nil {
Expand Down
53 changes: 17 additions & 36 deletions p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/multierr"

"github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/modules"
Expand All @@ -20,18 +19,16 @@ var (
_ typesP2P.RouterConfig = &RainTreeConfig{}
)

// baseConfig implements `RouterConfig` using the given libp2p host and current
// height and peerstore providers. Intended for internal use by other `RouterConfig`
// baseConfig implements `RouterConfig` using the given libp2p host, pokt address
// and handler function. Intended for internal use by other `RouterConfig`
// implementations with common config parameters.
//
// NB: intentionally *not* embedding `baseConfig` to improve readability of usages
// of would-be embedders (e.g. `BackgroundConfig`).
type baseConfig struct {
Host host.Host
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
Host host.Host
Addr crypto.Address
Handler func(data []byte) error
}

type UnicastRouterConfig struct {
Expand All @@ -44,20 +41,16 @@ type UnicastRouterConfig struct {

// BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`.
type BackgroundConfig struct {
Host host.Host
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
Host host.Host
Addr crypto.Address
Handler func(data []byte) error
}

// RainTreeConfig implements `RouterConfig` for use with `RainTreeRouter`.
type RainTreeConfig struct {
Host host.Host
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
Host host.Host
Addr crypto.Address
Handler func(data []byte) error
}

// IsValid implements the respective member of the `RouterConfig` interface.
Expand All @@ -66,18 +59,10 @@ func (cfg *baseConfig) IsValid() (err error) {
err = multierr.Append(err, fmt.Errorf("pokt address not configured"))
}

if cfg.CurrentHeightProvider == nil {
err = multierr.Append(err, fmt.Errorf("current height provider not configured"))
}

if cfg.Host == nil {
err = multierr.Append(err, fmt.Errorf("host not configured"))
}

if cfg.PeerstoreProvider == nil {
err = multierr.Append(err, fmt.Errorf("peerstore provider not configured"))
}

if cfg.Handler == nil {
err = multierr.Append(err, fmt.Errorf("handler not configured"))
}
Expand Down Expand Up @@ -111,23 +96,19 @@ func (cfg *UnicastRouterConfig) IsValid() (err error) {
// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *BackgroundConfig) IsValid() error {
baseCfg := baseConfig{
Host: cfg.Host,
Addr: cfg.Addr,
CurrentHeightProvider: cfg.CurrentHeightProvider,
PeerstoreProvider: cfg.PeerstoreProvider,
Handler: cfg.Handler,
Host: cfg.Host,
Addr: cfg.Addr,
Handler: cfg.Handler,
}
return baseCfg.IsValid()
}

// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *RainTreeConfig) IsValid() error {
baseCfg := baseConfig{
Host: cfg.Host,
Addr: cfg.Addr,
CurrentHeightProvider: cfg.CurrentHeightProvider,
PeerstoreProvider: cfg.PeerstoreProvider,
Handler: cfg.Handler,
Host: cfg.Host,
Addr: cfg.Addr,
Handler: cfg.Handler,
}
return baseCfg.IsValid()
}
7 changes: 6 additions & 1 deletion p2p/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error {
}

oldPeerList := m.stakedActorRouter.GetPeerstore().GetPeerList()
updatedPeerstore, err := m.pstoreProvider.GetStakedPeerstoreAtHeight(consensusNewHeightEvent.Height)
pstoreProvider, err := m.getPeerstoreProvider()
if err != nil {
return err
}

updatedPeerstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(consensusNewHeightEvent.Height)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 41c3d32

Please sign in to comment.