Skip to content

Commit

Permalink
[P2P] refactor: message handling (#763)
Browse files Browse the repository at this point in the history
## Description

### Before

The P2P module is responsible for handling incoming messages from the
router's host. When it receives a message, it calls into the router to
unpack the `RainTreeMessage` (and facilitate further broadcast
propagation). The resulting `PocketEnvelope` is returned to the P2P
module to be emitted over the application event bus.

```mermaid
classDiagram
    class RainTreeMessage {
        <<protobuf>>
        +Level uint32
        +Data []byte
        +Nonce uint64
    }
    
    class PocketEnvelope {
        <<protobuf>>
        +Content *anypb.Any   
    }

    RainTreeMessage --* PocketEnvelope : serialized as `Data`
    
      
    class P2PModule {
        -handleNetworkData([]byte) error
        -handleStream(stream libp2pNetwork.Stream)
        -readStream(stream libp2pNetwork.Stream)
    }

    class RainTreeRouter {
        +HandleNetworkData func([]byte) ([]byte, error)
    }

    RainTreeRouter --> P2PModule
    P2PModule --> RainTreeRouter
    RainTreeRouter --o RainTreeMessage
    P2PModule --o PocketEnvelope
```

### After

The router encapsulates handling incoming `RainTreeMessage`s, unpacking
them and passing them along to the P2P module as serialized
`PocketEnvelope`s. The P2P module then deserializes and emits them over
the application event bus.

```mermaid
classDiagram
    class RainTreeMessage {
        <<protobuf>>
        +Level uint32
        +Data []byte
        +Nonce uint64
    }
    
    class PocketEnvelope {
        <<protobuf>>
        +Content *anypb.Any
    }

    RainTreeMessage --* PocketEnvelope : serialized as `Data`
    
    class P2PModule {
        -handlePocketEnvelope([]byte) error
    }

    class RainTreeRouter {
        -handler RouterHandler
        -handleRainTreeMsg([]byte) ([]byte, error)
        -handleStream(stream libp2pNetwork.Stream)
        -readStream(stream libp2pNetwork.Stream)
    }

    RainTreeRouter --> P2PModule : `handler` == `handlePocketEnvelope`
    RainTreeRouter --o RainTreeMessage

    P2PModule --o PocketEnvelope
```

## Issue

Second deliverable in #762 

## Type of change

Please mark the relevant option(s):

- [ ] New feature, functionality or library
- [ ] Bug fix
- [x] Code health or cleanup
- [ ] Major breaking change
- [ ] Documentation
- [ ] Other <!-- add details here if it a different type of change -->

## List of changes

- Added `Handler` field to `RainTreeConfig` & `BackgroundConfig`
- Refactored `rainTreeRouter` logging methods
- Renamed `rainTreeRouter#HandleNetworkData()` to `#handleRainTreeMsg()`
- Renamed `p2pModule#handleNetworkData()` to `#handleAppData()`
- Added -tags=test to all test make targets
- Fixed mockdns usage in `TestP2PModule_Insecure_Error` test
- Moved message handling from p2p module to router

## Testing

- [ ] `make develop_test`; if any code changes were made
- [ ] `make test_e2e` on [k8s
LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md);
if any code changes were made
- [ ] `e2e-devnet-test` passes tests on
[DevNet](https://pocketnetwork.notion.site/How-to-DevNet-ff1598f27efe44c09f34e2aa0051f0dd);
if any code was changed
- [x] [Docker Compose
LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md);
if any major functionality was changed or introduced
- [x] [k8s
LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md);
if any infrastructure or configuration changes were made

## Required Checklist

- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have added, or updated, [`godoc` format
comments](https://go.dev/blog/godoc) on touched members (see:
[tip.golang.org/doc/comment](https://tip.golang.org/doc/comment))
- [ ] I have tested my changes using the available tooling
- [ ] I have updated the corresponding CHANGELOG

### If Applicable Checklist

- [ ] I have updated the corresponding README(s); local and/or global
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added, or updated,
[mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding
README(s)
- [ ] I have added, or updated, documentation and
[mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*`
if I updated `shared/*`README(s)
  • Loading branch information
bryanchriswhite authored May 31, 2023
1 parent 5d1944c commit fba11c3
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 151 deletions.
52 changes: 26 additions & 26 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -339,102 +339,102 @@ generate_node_state_machine_diagram: ## (Re)generates the Node State Machine dia

.PHONY: test_all
test_all: ## Run all go unit tests
go test -p=1 -count=1 ./...
go test -p=1 -count=1 -tags=test ./...

.PHONY: test_e2e
test_e2e: kubectl_check ## Run all E2E tests
echo "IMPROVE(#759): Make sure you ran 'make localnet_up' in case this fails with infrastructure related errors."
go test ${VERBOSE_TEST} -count=1 ./e2e/tests/... -tags=e2e
go test ${VERBOSE_TEST} -count=1 -tags=test,e2e ./e2e/tests/...

.PHONY: test_all_with_json_coverage
test_all_with_json_coverage: generate_rpc_openapi ## Run all go unit tests, output results & coverage into json & coverage files
go test -p=1 -count=1 -json ./... -covermode=count -coverprofile=coverage.out | tee test_results.json | jq
go test -p=1 -count=1 -tags=test -json ./... -covermode=count -coverprofile=coverage.out | tee test_results.json | jq

.PHONY: test_race
test_race: ## Identify all unit tests that may result in race conditions
go test ${VERBOSE_TEST} -race -count=1 ./...
go test ${VERBOSE_TEST} -race -count=1 -tags=test ./...

.PHONY: test_app
test_app: ## Run all go app module unit tests
go test ${VERBOSE_TEST} -p=1 -count=1 ./app/...
go test ${VERBOSE_TEST} -p=1 -count=1 -tags=test ./app/...

.PHONY: test_utility
test_utility: ## Run all go utility module unit tests
go test ${VERBOSE_TEST} -p=1 -count=1 ./utility/...
go test ${VERBOSE_TEST} -p=1 -count=1 -tags=test ./utility/...

.PHONY: test_shared
test_shared: ## Run all go unit tests in the shared module
go test ${VERBOSE_TEST} -p=1 -count=1 ./shared/...
go test ${VERBOSE_TEST} -p=1 -count=1 -tags=test ./shared/...

.PHONY: test_consensus
test_consensus: ## Run all go unit tests in the consensus module
go test ${VERBOSE_TEST} -p=1 -count=1 ./consensus/...
go test ${VERBOSE_TEST} -p=1 -count=1 -tags=test ./consensus/...

# These tests are isolated to a single package which enables logs to be streamed in realtime. More details here: https://stackoverflow.com/a/74903989/768439
.PHONY: test_consensus_e2e
test_consensus_e2e: ## Run all go t2e unit tests in the consensus module w/ log streaming
go test ${VERBOSE_TEST} -count=1 ./consensus/e2e_tests/...
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/e2e_tests/...

.PHONY: test_consensus_concurrent_tests
test_consensus_concurrent_tests: ## Run unit tests in the consensus module that could be prone to race conditions (#192)
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -run ^TestPacemakerTimeoutIncreasesRound$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -run ^TestHotstuff4Nodes1BlockHappyPath$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -race -run ^TestPacemakerTimeoutIncreasesRound$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -race -run ^TestHotstuff4Nodes1BlockHappyPath$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -tags=test -run ^TestPacemakerTimeoutIncreasesRound$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -tags=test -run ^TestHotstuff4Nodes1BlockHappyPath$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -tags=test -race -run ^TestPacemakerTimeoutIncreasesRound$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -tags=test -race -run ^TestHotstuff4Nodes1BlockHappyPath$ ./consensus/e2e_tests; done;

.PHONY: test_hotstuff
test_hotstuff: ## Run all go unit tests related to hotstuff consensus
go test ${VERBOSE_TEST} -count=1 ./consensus/e2e_tests -run Hotstuff
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/e2e_tests -run Hotstuff

.PHONY: test_pacemaker
test_pacemaker: ## Run all go unit tests related to hotstuff pacemaker
go test ${VERBOSE_TEST} -count=1 ./consensus/e2e_tests -run Pacemaker
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/e2e_tests -run Pacemaker

.PHONY: test_statesync
test_statesync: ## Run all go unit tests related to hotstuff statesync
go test -v ${VERBOSE_TEST} -count=1 -run StateSync ./consensus/e2e_tests
go test -v ${VERBOSE_TEST} -count=1 -tags=test -run StateSync ./consensus/e2e_tests

.PHONY: test_vrf
test_vrf: ## Run all go unit tests in the VRF library
go test ${VERBOSE_TEST} -count=1 ./consensus/leader_election/vrf
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/leader_election/vrf

.PHONY: test_sortition
test_sortition: ## Run all go unit tests in the Sortition library
go test ${VERBOSE_TEST} -count=1 ./consensus/leader_election/sortition
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/leader_election/sortition

.PHONY: test_persistence
test_persistence: ## Run all go unit tests in the Persistence module
go test ${VERBOSE_TEST} -count=1 -p=1 ./persistence/...
go test ${VERBOSE_TEST} -count=1 -tags=test -p=1 ./persistence/...

.PHONY: test_persistence_state_hash
test_persistence_state_hash: ## Run all go unit tests in the Persistence module related to the state hash
go test ${VERBOSE_TEST} -count=1 -run TestStateHash ./persistence/...
go test ${VERBOSE_TEST} -count=1 -tags=test -run TestStateHash ./persistence/...

.PHONY: test_p2p
test_p2p: ## Run all p2p related tests
go test ${VERBOSE_TEST} -count=1 ./p2p/...
go test ${VERBOSE_TEST} -count=1 -tags=test ./p2p/...

.PHONY: test_p2p_raintree
test_p2p_raintree: ## Run all p2p raintree related tests
go test ${VERBOSE_TEST} -count=1 -run RainTreeNetwork -count=1 ./p2p/...
go test ${VERBOSE_TEST} -count=1 -tags=test -run RainTreeNetwork -count=1 ./p2p/...

.PHONY: test_p2p_raintree_addrbook
test_p2p_raintree_addrbook: ## Run all p2p raintree addr book related tests
go test ${VERBOSE_TEST} -count=1 -run RainTreeAddrBook -count=1 ./p2p/...
go test ${VERBOSE_TEST} -count=1 -tags=test -run RainTreeAddrBook -count=1 ./p2p/...

# TIP: For benchmarks, consider appending `-run=^#` to avoid running unit tests in the same package

.PHONY: benchmark_persistence_state_hash
benchmark_persistence_state_hash: ## Benchmark the state hash computation
go test ${VERBOSE_TEST} -count=1 -cpu 1,2 -benchtime=1s -benchmem -bench=. -run BenchmarkStateHash -count=1 ./persistence/...
go test ${VERBOSE_TEST} -count=1 -tags=test -cpu 1,2 -benchtime=1s -benchmem -bench=. -run BenchmarkStateHash -count=1 ./persistence/...

.PHONY: benchmark_sortition
benchmark_sortition: ## Benchmark the Sortition library
go test ${VERBOSE_TEST} -count=1 -bench=. -run ^# ./consensus/leader_election/sortition
go test ${VERBOSE_TEST} -count=1 -tags=test -bench=. -run ^# ./consensus/leader_election/sortition

.PHONY: benchmark_p2p_addrbook
benchmark_p2p_peerstore: ## Run P2P peerstore benchmarks
go test ${VERBOSE_TEST} -count=1 -bench=. -run BenchmarkPeerstore ./p2p/...
go test ${VERBOSE_TEST} -count=1 -tags=test -bench=. -run BenchmarkPeerstore ./p2p/...

### Inspired by @goldinguy_ in this post: https://goldin.io/blog/stop-using-todo ###
# TODO - General Purpose catch-all.
Expand Down
10 changes: 10 additions & 0 deletions p2p/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.52] - 2023-05-26

- Added `Handler` field to `RainTreeConfig` & `BackgroundConfig`
- Refactored `rainTreeRouter` logging methods
- Renamed `rainTreeRouter#HandleNetworkData()` to `#handleRainTreeMsg()`
- Renamed `p2pModule#handleNetworkData()` to `#handlePocketEnvelope()`
- Added -tags=test to all test make targets
- Fixed mockdns usage in `TestP2PModule_Insecure_Error` test
- Moved message handling from p2p module to router

## [0.0.0.51] - 2023-05-23

- Use the shared codec module when marshaling the data sent over the wire
Expand Down
2 changes: 2 additions & 0 deletions p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type BackgroundConfig struct {
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
}

// RainTreeConfig implements `RouterConfig` for use with `RainTreeRouter`.
Expand All @@ -36,6 +37,7 @@ type RainTreeConfig struct {
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error

MaxNonces uint64
}
Expand Down
119 changes: 7 additions & 112 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@ package p2p
import (
"errors"
"fmt"
"io"
"time"

"github.com/libp2p/go-libp2p"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/multiformats/go-multiaddr"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/providers"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
Expand All @@ -32,12 +27,6 @@ import (
"google.golang.org/protobuf/types/known/anypb"
)

// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions.
// TECHDEBT(#629): parameterize and expose via config.
// readStreamTimeout is the duration to wait for a read operation on a
// stream to complete, after which the stream is closed ("timed out").
const readStreamTimeout = time.Second * 10

var _ modules.P2PModule = &p2pModule{}

type p2pModule struct {
Expand Down Expand Up @@ -173,11 +162,6 @@ func (m *p2pModule) Start() (err error) {
return fmt.Errorf("setting up router: %w", err)
}

// Don't handle incoming streams in client debug mode.
if !m.isClientDebugMode() {
m.host.SetStreamHandler(protocol.PoktProtocolID, m.handleStream)
}

m.GetBus().
GetTelemetryModule().
GetTimeSeriesAgent().
Expand Down Expand Up @@ -290,6 +274,7 @@ func (m *p2pModule) setupRouter() (err error) {
CurrentHeightProvider: m.currentHeightProvider,
PeerstoreProvider: m.pstoreProvider,
Host: m.host,
Handler: m.handlePocketEnvelope,
MaxNonces: m.cfg.MaxNonces,
},
)
Expand Down Expand Up @@ -340,100 +325,16 @@ func (m *p2pModule) isClientDebugMode() bool {
return m.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode
}

// handleStream is called each time a peer establishes a new stream with this
// module's libp2p `host.Host`.
func (m *p2pModule) handleStream(stream libp2pNetwork.Stream) {
m.logger.Debug().Msg("handling incoming stream")
peer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
m.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("parsing remote peer identity")

if err = stream.Reset(); err != nil {
m.logger.Error().Err(err).Msg("resetting stream")
}
return
}

if err := m.router.AddPeer(peer); err != nil {
m.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("adding remote peer to router")
}

go m.readStream(stream)
}

// readStream is intended to be called in a goroutine. It continuously reads from
// the given stream for handling at the network level. Used for handling "direct"
// messages (i.e. one specific target node).
func (m *p2pModule) readStream(stream libp2pNetwork.Stream) {
// Time out if no data is sent to free resources.
if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil {
// NB: tests using libp2p's `mocknet` rely on this not returning an error.
// `SetReadDeadline` not supported by `mocknet` streams.
m.logger.Debug().Err(err).Msg("setting stream read deadline")
}

// debug logging: stream scope stats
// (see: https://pkg.go.dev/github.com/libp2p/[email protected]/core/network#StreamScope)
if err := utils.LogScopeStatFactory(
&logger.Global.Logger,
"stream scope (read-side)",
)(stream.Scope()); err != nil {
m.logger.Debug().Err(err).Msg("logging stream scope stats")
}
// ---

data, err := io.ReadAll(stream)
if err != nil {
m.logger.Error().Err(err).Msg("reading from stream")
if err := stream.Reset(); err != nil {
m.logger.Debug().Err(err).Msg("resetting stream (read-side)")
}
return
}

if err := stream.Reset(); err != nil {
m.logger.Debug().Err(err).Msg("resetting stream (read-side)")
}

// debug logging
remotePeer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
m.logger.Debug().Err(err).Msg("getting remote remotePeer")
} else {
utils.LogIncomingMsg(m.logger, m.cfg.Hostname, remotePeer)
}
// ---

if err := m.handleNetworkData(data); err != nil {
m.logger.Error().Err(err).Msg("handling network data")
}
}

// handleNetworkData passes a network message to the configured
// `Router`implementation for routing.
func (m *p2pModule) handleNetworkData(data []byte) error {
appMsgData, err := m.router.HandleNetworkData(data)
if err != nil {
return err
}

// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if appMsgData == nil {
return nil
}

networkMessage := messaging.PocketEnvelope{}
if err := proto.Unmarshal(appMsgData, &networkMessage); err != nil {
// handlePocketEnvelope deserializes the received `PocketEnvelope` data and publishes
// a copy of its `Content` to the application event bus.
func (m *p2pModule) handlePocketEnvelope(pocketEnvelopeBz []byte) error {
poktEnvelope := messaging.PocketEnvelope{}
if err := proto.Unmarshal(pocketEnvelopeBz, &poktEnvelope); err != nil {
return fmt.Errorf("decoding network message: %w", err)
}

event := messaging.PocketEnvelope{
Content: networkMessage.Content,
Content: poktEnvelope.Content,
}
m.GetBus().PublishEventToBus(&event)
return nil
Expand All @@ -448,9 +349,3 @@ func (m *p2pModule) getMultiaddr() (multiaddr.Multiaddr, error) {
"%s:%d", m.cfg.Hostname, m.cfg.Port,
))
}

// newReadStreamDeadline returns a future deadline
// based on the read stream timeout duration.
func newReadStreamDeadline() time.Time {
return time.Now().Add(readStreamTimeout)
}
10 changes: 7 additions & 3 deletions p2p/module_raintree_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build test

package p2p

import (
Expand All @@ -13,10 +15,12 @@ import (

libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/raintree"
)

// TODO(#314): Add the tooling and instructions on how to generate unit tests in this file.
Expand Down Expand Up @@ -272,7 +276,7 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te
mod := *p2pMod
p2pMod.host.SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) {
log.Printf("[valID: %s] Read\n", sURL)
(&mod).handleStream(stream)
(&mod).router.(*raintree.RainTreeRouter).HandleStream(stream)
wg.Done()
})
}
Expand Down
31 changes: 31 additions & 0 deletions p2p/raintree/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package raintree

import (
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"

"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/utils"
)

// logStream logs the incoming stream and its scope stats
func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) {
rtr.logStreamScopeStats(stream)

remotePeer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
rtr.logger.Debug().Err(err).Msg("getting remote remotePeer")
} else {
utils.LogIncomingMsg(rtr.logger, rtr.getHostname(), remotePeer)
}
}

// logStreamScopeStats logs the incoming stream's scope stats
// (see: https://pkg.go.dev/github.com/libp2p/[email protected]/core/network#StreamScope)
func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) {
if err := utils.LogScopeStatFactory(
&logger.Global.Logger,
"stream scope (read-side)",
)(stream.Scope()); err != nil {
rtr.logger.Debug().Err(err).Msg("logging stream scope stats")
}
}
1 change: 1 addition & 0 deletions p2p/raintree/peers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM

hostMock := mocksP2P.NewMockHost(ctrl)
hostMock.EXPECT().Peerstore().Return(libp2pPStore).AnyTimes()
hostMock.EXPECT().SetStreamHandler(gomock.Any(), gomock.Any()).Times(1)

rtCfg := &config.RainTreeConfig{
Host: hostMock,
Expand Down
Loading

0 comments on commit fba11c3

Please sign in to comment.