Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[P2P] refactor: message handling #763

Merged
merged 14 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -339,102 +339,102 @@ generate_node_state_machine_diagram: ## (Re)generates the Node State Machine dia

.PHONY: test_all
test_all: ## Run all go unit tests
go test -p=1 -count=1 ./...
go test -p=1 -count=1 -tags=test ./...

.PHONY: test_e2e
test_e2e: kubectl_check ## Run all E2E tests
echo "IMPROVE(#759): Make sure you ran 'make localnet_up' in case this fails with infrastructure related errors."
go test ${VERBOSE_TEST} -count=1 ./e2e/tests/... -tags=e2e
go test ${VERBOSE_TEST} -count=1 -tags=test,e2e ./e2e/tests/...

.PHONY: test_all_with_json_coverage
test_all_with_json_coverage: generate_rpc_openapi ## Run all go unit tests, output results & coverage into json & coverage files
go test -p=1 -count=1 -json ./... -covermode=count -coverprofile=coverage.out | tee test_results.json | jq
go test -p=1 -count=1 -tags=test -json ./... -covermode=count -coverprofile=coverage.out | tee test_results.json | jq

.PHONY: test_race
test_race: ## Identify all unit tests that may result in race conditions
go test ${VERBOSE_TEST} -race -count=1 ./...
go test ${VERBOSE_TEST} -race -count=1 -tags=test ./...

.PHONY: test_app
test_app: ## Run all go app module unit tests
go test ${VERBOSE_TEST} -p=1 -count=1 ./app/...
go test ${VERBOSE_TEST} -p=1 -count=1 -tags=test ./app/...

.PHONY: test_utility
test_utility: ## Run all go utility module unit tests
go test ${VERBOSE_TEST} -p=1 -count=1 ./utility/...
go test ${VERBOSE_TEST} -p=1 -count=1 -tags=test./utility/...
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved

.PHONY: test_shared
test_shared: ## Run all go unit tests in the shared module
go test ${VERBOSE_TEST} -p=1 -count=1 ./shared/...
go test ${VERBOSE_TEST} -p=1 -count=1 -tags=test ./shared/...

.PHONY: test_consensus
test_consensus: ## Run all go unit tests in the consensus module
go test ${VERBOSE_TEST} -p=1 -count=1 ./consensus/...
go test ${VERBOSE_TEST} -p=1 -count=1 -tags=test ./consensus/...

# These tests are isolated to a single package which enables logs to be streamed in realtime. More details here: https://stackoverflow.com/a/74903989/768439
.PHONY: test_consensus_e2e
test_consensus_e2e: ## Run all go t2e unit tests in the consensus module w/ log streaming
go test ${VERBOSE_TEST} -count=1 ./consensus/e2e_tests/...
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/e2e_tests/...

.PHONY: test_consensus_concurrent_tests
test_consensus_concurrent_tests: ## Run unit tests in the consensus module that could be prone to race conditions (#192)
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -run ^TestPacemakerTimeoutIncreasesRound$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -run ^TestHotstuff4Nodes1BlockHappyPath$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -race -run ^TestPacemakerTimeoutIncreasesRound$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -race -run ^TestHotstuff4Nodes1BlockHappyPath$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -tags=test -run ^TestPacemakerTimeoutIncreasesRound$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -tags=test -run ^TestHotstuff4Nodes1BlockHappyPath$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -tags=test -race -run ^TestPacemakerTimeoutIncreasesRound$ ./consensus/e2e_tests; done;
for i in $$(seq 1 100); do go test -timeout 2s -count=1 -tags=test -race -run ^TestHotstuff4Nodes1BlockHappyPath$ ./consensus/e2e_tests; done;

.PHONY: test_hotstuff
test_hotstuff: ## Run all go unit tests related to hotstuff consensus
go test ${VERBOSE_TEST} -count=1 ./consensus/e2e_tests -run Hotstuff
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/e2e_tests -run Hotstuff

.PHONY: test_pacemaker
test_pacemaker: ## Run all go unit tests related to hotstuff pacemaker
go test ${VERBOSE_TEST} -count=1 ./consensus/e2e_tests -run Pacemaker
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/e2e_tests -run Pacemaker

.PHONY: test_statesync
test_statesync: ## Run all go unit tests related to hotstuff statesync
go test -v ${VERBOSE_TEST} -count=1 -run StateSync ./consensus/e2e_tests
go test -v ${VERBOSE_TEST} -count=1 -tags=test -run StateSync ./consensus/e2e_tests

.PHONY: test_vrf
test_vrf: ## Run all go unit tests in the VRF library
go test ${VERBOSE_TEST} -count=1 ./consensus/leader_election/vrf
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/leader_election/vrf

.PHONY: test_sortition
test_sortition: ## Run all go unit tests in the Sortition library
go test ${VERBOSE_TEST} -count=1 ./consensus/leader_election/sortition
go test ${VERBOSE_TEST} -count=1 -tags=test ./consensus/leader_election/sortition

.PHONY: test_persistence
test_persistence: ## Run all go unit tests in the Persistence module
go test ${VERBOSE_TEST} -count=1 -p=1 ./persistence/...
go test ${VERBOSE_TEST} -count=1 -tags=test -p=1 ./persistence/...

.PHONY: test_persistence_state_hash
test_persistence_state_hash: ## Run all go unit tests in the Persistence module related to the state hash
go test ${VERBOSE_TEST} -count=1 -run TestStateHash ./persistence/...
go test ${VERBOSE_TEST} -count=1 -tags=test -run TestStateHash ./persistence/...

.PHONY: test_p2p
test_p2p: ## Run all p2p related tests
go test ${VERBOSE_TEST} -count=1 ./p2p/...
go test ${VERBOSE_TEST} -count=1 -tags=test ./p2p/...

.PHONY: test_p2p_raintree
test_p2p_raintree: ## Run all p2p raintree related tests
go test ${VERBOSE_TEST} -count=1 -run RainTreeNetwork -count=1 ./p2p/...
go test ${VERBOSE_TEST} -count=1 -tags=test -run RainTreeNetwork -count=1 ./p2p/...

.PHONY: test_p2p_raintree_addrbook
test_p2p_raintree_addrbook: ## Run all p2p raintree addr book related tests
go test ${VERBOSE_TEST} -count=1 -run RainTreeAddrBook -count=1 ./p2p/...
go test ${VERBOSE_TEST} -count=1 -tags=test -run RainTreeAddrBook -count=1 ./p2p/...

# TIP: For benchmarks, consider appending `-run=^#` to avoid running unit tests in the same package

.PHONY: benchmark_persistence_state_hash
benchmark_persistence_state_hash: ## Benchmark the state hash computation
go test ${VERBOSE_TEST} -count=1 -cpu 1,2 -benchtime=1s -benchmem -bench=. -run BenchmarkStateHash -count=1 ./persistence/...
go test ${VERBOSE_TEST} -count=1 -tags=test -cpu 1,2 -benchtime=1s -benchmem -bench=. -run BenchmarkStateHash -count=1 ./persistence/...

.PHONY: benchmark_sortition
benchmark_sortition: ## Benchmark the Sortition library
go test ${VERBOSE_TEST} -count=1 -bench=. -run ^# ./consensus/leader_election/sortition
go test ${VERBOSE_TEST} -count=1 -tags=test -bench=. -run ^# ./consensus/leader_election/sortition

.PHONY: benchmark_p2p_addrbook
benchmark_p2p_peerstore: ## Run P2P peerstore benchmarks
go test ${VERBOSE_TEST} -count=1 -bench=. -run BenchmarkPeerstore ./p2p/...
go test ${VERBOSE_TEST} -count=1 -tags=test -bench=. -run BenchmarkPeerstore ./p2p/...

### Inspired by @goldinguy_ in this post: https://goldin.io/blog/stop-using-todo ###
# TODO - General Purpose catch-all.
Expand Down
10 changes: 10 additions & 0 deletions p2p/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.51] - 2023-05-23

- Added `Handler` field to `RainTreeConfig` & `BackgroundConfig`
- Refactored `rainTreeRouter` logging methods
- Renamed `rainTreeRouter#HandleNetworkData()` to `#handleRainTreeMsg()`
- Renamed `p2pModule#handleNetworkData()` to `#handlePocketEnvelope()`
- Added -tags=test to all test make targets
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
- Fixed mockdns usage in `TestP2PModule_Insecure_Error` test
- Moved message handling from p2p module to router

## [0.0.0.50] - 2023-05-08

- Removed unused `Transport` interface

This comment was marked as outdated.

This comment was marked as outdated.

Expand Down
2 changes: 2 additions & 0 deletions p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type BackgroundConfig struct {
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
}

// RainTreeConfig implements `RouterConfig` for use with `RainTreeRouter`.
Expand All @@ -36,6 +37,7 @@ type RainTreeConfig struct {
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error

MaxNonces uint64
}
Expand Down
119 changes: 7 additions & 112 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@ package p2p
import (
"errors"
"fmt"
"io"
"time"

"github.com/libp2p/go-libp2p"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/multiformats/go-multiaddr"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/providers"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
Expand All @@ -31,12 +26,6 @@ import (
"google.golang.org/protobuf/types/known/anypb"
)

// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions.
// TECHDEBT(#629): parameterize and expose via config.
// readStreamTimeout is the duration to wait for a read operation on a
// stream to complete, after which the stream is closed ("timed out").
const readStreamTimeout = time.Second * 10

var _ modules.P2PModule = &p2pModule{}

type p2pModule struct {
Expand Down Expand Up @@ -172,11 +161,6 @@ func (m *p2pModule) Start() (err error) {
return fmt.Errorf("setting up router: %w", err)
}

// Don't handle incoming streams in client debug mode.
if !m.isClientDebugMode() {
m.host.SetStreamHandler(protocol.PoktProtocolID, m.handleStream)
}

m.GetBus().
GetTelemetryModule().
GetTimeSeriesAgent().
Expand Down Expand Up @@ -291,6 +275,7 @@ func (m *p2pModule) setupRouter() (err error) {
CurrentHeightProvider: m.currentHeightProvider,
PeerstoreProvider: m.pstoreProvider,
Host: m.host,
Handler: m.handlePocketEnvelope,
MaxNonces: m.cfg.MaxNonces,
},
)
Expand Down Expand Up @@ -341,100 +326,16 @@ func (m *p2pModule) isClientDebugMode() bool {
return m.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode
}

// handleStream is called each time a peer establishes a new stream with this
// module's libp2p `host.Host`.
func (m *p2pModule) handleStream(stream libp2pNetwork.Stream) {
m.logger.Debug().Msg("handling incoming stream")
peer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
m.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("parsing remote peer identity")

if err = stream.Reset(); err != nil {
m.logger.Error().Err(err).Msg("resetting stream")
}
return
}

if err := m.router.AddPeer(peer); err != nil {
m.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("adding remote peer to router")
}

go m.readStream(stream)
}

// readStream is intended to be called in a goroutine. It continuously reads from
// the given stream for handling at the network level. Used for handling "direct"
// messages (i.e. one specific target node).
func (m *p2pModule) readStream(stream libp2pNetwork.Stream) {
// Time out if no data is sent to free resources.
if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil {
// NB: tests using libp2p's `mocknet` rely on this not returning an error.
// `SetReadDeadline` not supported by `mocknet` streams.
m.logger.Debug().Err(err).Msg("setting stream read deadline")
}

// debug logging: stream scope stats
// (see: https://pkg.go.dev/github.com/libp2p/[email protected]/core/network#StreamScope)
if err := utils.LogScopeStatFactory(
&logger.Global.Logger,
"stream scope (read-side)",
)(stream.Scope()); err != nil {
m.logger.Debug().Err(err).Msg("logging stream scope stats")
}
// ---

data, err := io.ReadAll(stream)
if err != nil {
m.logger.Error().Err(err).Msg("reading from stream")
if err := stream.Reset(); err != nil {
m.logger.Debug().Err(err).Msg("resetting stream (read-side)")
}
return
}

if err := stream.Reset(); err != nil {
m.logger.Debug().Err(err).Msg("resetting stream (read-side)")
}

// debug logging
remotePeer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
m.logger.Debug().Err(err).Msg("getting remote remotePeer")
} else {
utils.LogIncomingMsg(m.logger, m.cfg.Hostname, remotePeer)
}
// ---

if err := m.handleNetworkData(data); err != nil {
m.logger.Error().Err(err).Msg("handling network data")
}
}

// handleNetworkData passes a network message to the configured
// `Router`implementation for routing.
func (m *p2pModule) handleNetworkData(data []byte) error {
appMsgData, err := m.router.HandleNetworkData(data)
if err != nil {
return err
}

// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if appMsgData == nil {
return nil
}

networkMessage := messaging.PocketEnvelope{}
if err := proto.Unmarshal(appMsgData, &networkMessage); err != nil {
// handlePocketEnvelope deserializes the received `PocketEnvelope` data and publishes
// a copy of its `Content` to the application event bus.
func (m *p2pModule) handlePocketEnvelope(pocketEnvelopeBz []byte) error {
poktEnvelope := messaging.PocketEnvelope{}
if err := proto.Unmarshal(pocketEnvelopeBz, &poktEnvelope); err != nil {
return fmt.Errorf("decoding network message: %w", err)
}

event := messaging.PocketEnvelope{
Content: networkMessage.Content,
Content: poktEnvelope.Content,
}
m.GetBus().PublishEventToBus(&event)
return nil
Expand All @@ -449,9 +350,3 @@ func (m *p2pModule) getMultiaddr() (multiaddr.Multiaddr, error) {
"%s:%d", m.cfg.Hostname, m.cfg.Port,
))
}

// newReadStreamDeadline returns a future deadline
// based on the read stream timeout duration.
func newReadStreamDeadline() time.Time {
return time.Now().Add(readStreamTimeout)
}
10 changes: 7 additions & 3 deletions p2p/module_raintree_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a linter for this? Dima recently added a custom one in build/linters/tests.go and it looks like we have access to the File.Name via the dsl matcher, but I don't know if there's any easy way to check for the first line.

Can be done in a separate commit, not a blocker.

type File struct {
	// Name is a file base name.
	Name String

	// PkgPath is a file package path.
	// Examples: "io/ioutil", "strings", "github.com/quasilyte/go-ruleguard/dsl".
	PkgPath String
}

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only necessary to add the test build tag to:

  1. non-test files in a non-internal package which exports a private member for testing purposes.
  2. test files which depend on something exported by a file which uses the build tag (assumes at least one instance of case 1).

Are you suggesting a linter which enforces the test build tag on all test files or something?

I don't suppose there would really be a prohibitive downside to adding the test tag in places where it's not strictly necessary. I can imagine an argument for either but am leaning towards "ubiquitous build tags" at the moment (unexpectedly):

  • ubiquitous build tags
    • pros
      • consistent
      • simpler
      • better justifies updates to the makefile (e.g. -tags test; necessary in either case)
    • cons
      • slightly unconventional
      • requires running tests via make targets OR adding tags manually (e.g. IDE run/test configs)
    • UX
      • go test runs no tests
  • only necessary build tags
    • pros
      • minimal impact on the codebase
    • cons
      • more mental overhead
      • less obvious when test(s) didn't run because of a missing tag
    • UX
      • go test runs some but not all tests


package p2p

import (
Expand All @@ -13,10 +15,12 @@ import (

libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/raintree"
)

// TODO(#314): Add the tooling and instructions on how to generate unit tests in this file.
Expand Down Expand Up @@ -272,7 +276,7 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te
mod := *p2pMod
p2pMod.host.SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) {
log.Printf("[valID: %s] Read\n", sURL)
(&mod).handleStream(stream)
(&mod).router.(*raintree.RainTreeRouter).HandleStream(stream)
wg.Done()
})
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code patch provided seems reasonable, but I have a few minor suggestions for improvement. Please consider the following:

  1. In line 19, add a comment to explain the purpose of moving the import statement and why it has been separated from the other import statements.

  2. Although not incorrect, you might want to change lines 272-276 to avoid using the pointer to 'mod'. Instead, directly use 'p2pMod'. This increases readability and reduces complexity:

    mod := p2pMod
    ...
    mod.router.(*raintree.RainTreeRouter).HandleStream(stream)
  3. Make sure that there are unit tests (or create new unit tests) that cover the changes introduced in this patch. The current tests should pass, and any new edge cases should also be considered when testing.

  4. Ensure that you follow the rest of the project's style and coding conventions when contributing your patch.

Overall, the code patch is generally fine, but addressing these points would lead to better quality code review.

Expand Down
31 changes: 31 additions & 0 deletions p2p/raintree/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package raintree

import (
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"

"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/utils"
)

// logStream logs the incoming stream and its scope stats
func (rtr *rainTreeRouter) logStream(stream libp2pNetwork.Stream) {
rtr.logStreamScopeStats(stream)

remotePeer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
rtr.logger.Debug().Err(err).Msg("getting remote remotePeer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you go with Debug instead of Error here?

Ditto below

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see below)

In this case, my thinking was that this error only happens in the context of this logging helper function which is not a critical function. I didn't imagine that this logging helper producing an error would be useful to the end user (i.e. not actionable nor a useful signal). Perhaps I assume too much.

} else {
utils.LogIncomingMsg(rtr.logger, rtr.getHostname(), remotePeer)
}
}

// logStreamScopeStats logs the incoming stream's scope stats
// (see: https://pkg.go.dev/github.com/libp2p/[email protected]/core/network#StreamScope)
func (rtr *rainTreeRouter) logStreamScopeStats(stream libp2pNetwork.Stream) {
if err := utils.LogScopeStatFactory(
&logger.Global.Logger,
"stream scope (read-side)",
)(stream.Scope()); err != nil {
rtr.logger.Debug().Err(err).Msg("logging stream scope stats")
}
}
1 change: 1 addition & 0 deletions p2p/raintree/peers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func testRainTreeMessageTargets(t *testing.T, expectedMsgProp *ExpectedRainTreeM

hostMock := mocksP2P.NewMockHost(ctrl)
hostMock.EXPECT().Peerstore().Return(libp2pPStore).AnyTimes()
hostMock.EXPECT().SetStreamHandler(gomock.Any(), gomock.Any()).Times(1)

rtCfg := &config.RainTreeConfig{
Host: hostMock,
Expand Down
Loading