Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[P2P] feat: add background router #707

Merged
merged 37 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
000d592
chore: remove unused `Transport` interface
bryanchriswhite Apr 28, 2023
505ede9
refactor: move & rename `RainTreeConfig` to `util.RouterConfig`
bryanchriswhite Apr 28, 2023
e4752cd
refactor: rename `protocol.DefaultTopicStr` to `BackgroundTopicStr`
bryanchriswhite Apr 28, 2023
22e9098
chore: add `protocol.PeerDiscoveryNamespace`
bryanchriswhite Apr 28, 2023
f9f67a2
test: add kademlia peer discovery baseline
bryanchriswhite Apr 28, 2023
be2c3a8
feat: add background router
bryanchriswhite Apr 28, 2023
e4d3aa9
chore: update go.mod
bryanchriswhite Apr 28, 2023
66c8b06
chore: update dockerfile go version to 1.19
bryanchriswhite May 1, 2023
6befd4b
chore: add godoc comments
bryanchriswhite May 1, 2023
976135f
docs: update P2P README code organization section
bryanchriswhite Apr 28, 2023
1c6ae3d
docs: add definitions to P2P readme
bryanchriswhite May 1, 2023
c46120e
docs: update P2P module architecture diagram
bryanchriswhite May 1, 2023
0b7ab59
docs: rename raintree router architecture section
bryanchriswhite May 1, 2023
662fe8d
docs: add message propagation diagram
bryanchriswhite May 1, 2023
cf2e1be
chore: update changelog
bryanchriswhite Apr 28, 2023
d6642e0
test: reduce required sleep time
bryanchriswhite May 1, 2023
4159d7a
test: upgrade comment to INVESTIGATE
bryanchriswhite May 1, 2023
3d2c0f9
refactor: `GetKeys()` testutil
bryanchriswhite May 4, 2023
075f35a
Apply suggestions from code review
bryanchriswhite May 4, 2023
2120da8
docs: message propagation diagram fixes:
bryanchriswhite May 4, 2023
3bac5b1
refactor: move router configs to config pkg
bryanchriswhite May 4, 2023
f576c82
refactor: `RouterConfig` as interface:
bryanchriswhite May 4, 2023
6de5a87
refactor: router config imports & usage types
bryanchriswhite May 4, 2023
69b2725
refactor: remove `Hostname` from `RainTreeConfig`
bryanchriswhite May 4, 2023
e3dff67
chore: get hostname from P2P config for logging
bryanchriswhite May 4, 2023
05df60a
refactor: rename `P2PConfig#MaxMempoolCount` to `P2PConfig#MaxNonces`
bryanchriswhite May 4, 2023
21ef130
refactor: rename `DefaultP2PMaxMempoolCount` to `DefaultP2PMaxNonces`
bryanchriswhite May 4, 2023
b13b897
test: add missing `MaxNonces` config parameter
bryanchriswhite May 4, 2023
7725804
chore: add comments to `backgroundRouter` fields
bryanchriswhite May 4, 2023
506a996
chore: update comment
bryanchriswhite May 4, 2023
c0266dd
refactor: `dhtUpdateSleepDuration` as constant
bryanchriswhite May 4, 2023
0d04ea3
test: use `require.Greaterf()` instead of `Lenf()`
bryanchriswhite May 4, 2023
14d80cb
test: improve naming
bryanchriswhite May 4, 2023
b36bc7e
test: introduce var for readability
bryanchriswhite May 4, 2023
e57bd7d
chore: update changelog
bryanchriswhite May 4, 2023
ee76dcd
test: add comment & consolidate go routines
bryanchriswhite May 4, 2023
648b75a
Merge remote-tracking branch 'pokt/main' into feat/full-node-gossip
bryanchriswhite May 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.40] - 2023-05-01
## [0.0.0.40] - 2023-05-04

- Updated Dockerfiles using outdated go version to 1.19

Expand Down
9 changes: 7 additions & 2 deletions internal/testutil/map.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package testutil

func GetKeys[K comparable, V any](keyMap map[K]V) (keys []K) {
func GetKeys[K comparable, V any](keyMap map[K]V) []K {
var (
idx = 0
keys = make([]K, len(keyMap))
)
for key := range keyMap {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
keys = append(keys, key)
keys[idx] = key
idx++
}
return keys
}
2 changes: 1 addition & 1 deletion p2p/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.50] - 2023-05-01
## [0.0.0.50] - 2023-05-04

- Removed unused `Transport` interface
- Moved and renamed `raintree.RainTreeConfig` to `util.RouterConfig`
Expand Down
42 changes: 19 additions & 23 deletions p2p/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ When used generally, shorthand for "message propogation"; **not to be confused w

### "gossipsub"

A specific ["pubsub"](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub) router implementation / protocol which uses information (referred to internally as 'gossip', **distinct from our definition**) about which peers have seen which messages to facilitate "gossip" (as per our definition).
A specific ["pubsub"](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub) router implementation/protocol which uses information (referred to internally as 'gossip' by LibP2P, **distinct from our definition**) about which peers have seen which messages to facilitate "gossip" (as per our definition).

### "raintree gossip"

A "gossip" protocol (and implementation) which uses the raintree algorithm for peer selection. Used between all staked actors to optimize for "gossip" speed.
A structured "gossip" protocol (and implementation) which uses the raintree algorithm for peer selection. Used between all staked actors to optimize for "gossip" speed.

### "background gossip"

Expand Down Expand Up @@ -78,21 +78,16 @@ _TODO(olshansky, BenVan): Link to RainTree visualizations once it is complete._

Given `Local P2P Module` has a message that it needs to propagate:

1a. `Raintree Router` selects targets from the `Pokt Peerstore`, **which only includes staked actors**

1b. `Background Router` selects targets from the libp2p `Peerstore`, **which includes all P2P participants**

2. Libp2p `Host` manages opens and closes streams to targeted peers on behalf of the routers

2. `Remote P2P module`'s (i.e. receiver’s) `handleStream` is called (having been registered via `setStreamHandler()`)

3a. `handleStream` propagates message via `Raintree Router`

3b. `handleStream` propagates message via `Background Router`

4a. Repeat step 1a from `Remote P2P Module`'s perspective targeting its next peers

4b. Repeat step 1b from `Remote P2P Module`'s perspective targeting its next peers
<ul style="list-style-type: none;">
<li>1a. <code>Raintree Router</code> selects targets from the <code>Pokt Peerstore</code>, <strong>which only includes staked actors</strong></li>
<li>1b. <code>Background Router</code> selects targets from the libp2p <code>Peerstore</code>, <strong>which includes all P2P participants</strong></li>
<li>2. Libp2p <code>Host</code> manages opening and closing streams to targeted peers</li>
<li>3. <code>Remote P2P module</code>'s (i.e. receiver's) <code>handleStream</code> is called (having been registered via <code>setStreamHandler()</code>)</li>
<li>4a. <code>handleStream</code> propagates message via <code>Raintree Router</code></li>
<li>4b. <code>handleStream</code> propagates message via <code>Background Router</code></li>
<li>5a. Repeat step 1a from <code>Remote P2P Module</code>'s perspective targeting its next peers</li>
<li>5b. Repeat step 1b from <code>Remote P2P Module</code>'s perspective targeting its next peers</li>
</ul>

```mermaid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an AMAZING diagram. It's easy to read but it also gives both a really good high level view and low level details at the same time. Impressive

flowchart TD
Expand Down Expand Up @@ -144,7 +139,7 @@ end

subgraph rBG[Background Router]
subgraph rBGPS[Background Peerstore]
rNetPS([arr P2P participants])
rNetPS([all P2P participants])
end

subgraph rGossipSub[GossipSub]
Expand All @@ -157,12 +152,13 @@ rGossipSub --> rBGPS
rDHT --> rBGPS
end

rHost -. "setStreamHandler()" .-> hs[[handleStream]]
hs --3a--> rRT
hs --3b--> rBG
rBG --"4a (cont. propagation)"--> rHost
rHost -. "3 (setStreamHandler())" .-> hs[[handleStream]]

hs --4a--> rRT
hs --4b--> rBG
rBG --"5a (cont. propagation)"--> rHost
linkStyle 11 stroke:#ff3
rRT --"4b (cont. propagation)"--> rHost
rRT --"5b (cont. propagation)"--> rHost
linkStyle 12 stroke:#ff3
end

Expand Down
17 changes: 10 additions & 7 deletions p2p/background/kad_discovery_baseline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/pokt-network/pocket/runtime/defaults"
)

const dhtUpdateSleepDuration = time.Millisecond * 500

func TestLibp2pKademliaPeerDiscovery(t *testing.T) {
ctx := context.Background()

Expand All @@ -33,17 +35,18 @@ func TestLibp2pKademliaPeerDiscovery(t *testing.T) {

// TECHDEBT: consider using `host.ConnManager().Notifee()` to avoid sleeping here
// delay assertions for 500ms
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(time.Millisecond * 500)
// NB: wait for peer discovery to complete
time.Sleep(dhtUpdateSleepDuration)

// assert that host2 has host3 in its peerstore
host2DiscoveredAddrs := host2.Peerstore().Addrs(host3.ID())
require.Lenf(t, host2DiscoveredAddrs, 1, "did not discover host3")
require.Greaterf(t, len(host2DiscoveredAddrs), 0, "did not discover host3")
require.Equalf(t, addr3.String(), host2DiscoveredAddrs[0].String(), "did not discover host3")
require.ElementsMatchf(t, expectedPeerIDs, host2.Peerstore().Peers(), "host2 peer IDs don't match")

// assert that host3 has host2 in its peerstore
host3DiscoveredHost2Addrs := host3.Peerstore().Addrs(host2.ID())
require.Lenf(t, host3DiscoveredHost2Addrs, 1, "host3 did not discover host2")
require.Greaterf(t, len(host3DiscoveredHost2Addrs), 0, "host3 did not discover host2")
require.Equalf(t, addr2.String(), host3DiscoveredHost2Addrs[0].String(), "host3 did not discover host2")
require.ElementsMatchf(t, expectedPeerIDs, host3.Peerstore().Peers(), "host3 peer IDs don't match")

Expand All @@ -56,20 +59,20 @@ func TestLibp2pKademliaPeerDiscovery(t *testing.T) {

// new host discovers existing hosts...
host4DiscoveredHost2Addrs := host4.Peerstore().Addrs(host2.ID())
require.Lenf(t, host4DiscoveredHost2Addrs, 1, "host4 did not discover host2")
require.Greaterf(t, len(host4DiscoveredHost2Addrs), 0, "host4 did not discover host2")
require.Equalf(t, addr2.String(), host4DiscoveredHost2Addrs[0].String(), "host4 did not discover host2")

host4DiscoveredHost3Addrs := host4.Peerstore().Addrs(host3.ID())
require.Lenf(t, host4DiscoveredHost3Addrs, 1, "host4 did not discover host3")
require.Greaterf(t, len(host4DiscoveredHost3Addrs), 0, "host4 did not discover host3")
require.Equalf(t, addr3.String(), host4DiscoveredHost3Addrs[0].String(), "host4 did not discover host3")

// existing hosts discovers host host...
host2DiscoveredHost4Addrs := host2.Peerstore().Addrs(host4.ID())
require.Lenf(t, host2DiscoveredHost4Addrs, 1, "host2 did not discover host4")
require.Greaterf(t, len(host2DiscoveredHost4Addrs), 0, "host2 did not discover host4")
require.Equalf(t, addr4.String(), host2DiscoveredHost4Addrs[0].String(), "host2 did not discover host4")

host3DiscoveredHost4Addrs := host3.Peerstore().Addrs(host4.ID())
require.Lenf(t, host3DiscoveredHost4Addrs, 1, "host3 did not discover host4")
require.Greaterf(t, len(host3DiscoveredHost4Addrs), 0, "host3 did not discover host4")
require.Equalf(t, addr4.String(), host3DiscoveredHost4Addrs[0].String(), "host3 did not discover host4")

require.ElementsMatchf(t, expectedPeerIDs, host4.Peerstore().Peers(), "host4 peer IDs don't match")
Expand Down
17 changes: 11 additions & 6 deletions p2p/background/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
libp2pHost "github.com/libp2p/go-libp2p/core/host"

"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
Expand Down Expand Up @@ -38,23 +39,27 @@ type backgroundRouter struct {
// (i.e. multiple, unidentified receivers)
// TECHDEBT: investigate diff between randomSub and gossipSub
gossipSub *pubsub.PubSub
// topic similar to pubsub but received messages are filtered by a "topic" string.
// topic is similar to pubsub but received messages are filtered by a "topic" string.
// Published messages are also given the respective topic before broadcast.
topic *pubsub.Topic
// subscription provides an interface to continuously read messages from.
subscription *pubsub.Subscription
kadDHT *dht.IpfsDHT
pstore typesP2P.Peerstore
// kadDHT is a kademlia distributed hash table used for routing and peer discovery.
kadDHT *dht.IpfsDHT
// TECHDEBT: `pstore` will likely be removed in future refactoring / simplification
// of the `Router` interface.
// pstore is the background router's peerstore.
pstore typesP2P.Peerstore
}

// NewBackgroundRouter returns a `backgroundRouter` as a `typesP2P.Router`
// interface using the given configuration.
func NewBackgroundRouter(bus modules.Bus, cfg *utils.RouterConfig) (typesP2P.Router, error) {
func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2P.Router, error) {
// TECHDEBT(#595): add ctx to interface methods and propagate down.
ctx := context.TODO()

networkLogger := logger.Global.CreateLoggerForModule("backgroundRouter")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud and outside the scope of this PR: I'm beginning to think we should move GetModuleName from InitilizableModule to IntegratableModule and be consistent with logger.Global.CreateLoggerForModule(u.GetModuleName())

networkLogger.Info().Msg("Initializing background")
networkLogger.Info().Msg("Initializing background router")

// seed initial peerstore with current on-chain peer info (i.e. staked actors)
pstore, err := cfg.PeerstoreProvider.GetStakedPeerstoreAtHeight(
Expand All @@ -64,7 +69,7 @@ func NewBackgroundRouter(bus modules.Bus, cfg *utils.RouterConfig) (typesP2P.Rou
return nil, err
}

// NOTE_TO_SELF: `pubsub.NewRandomSub` requires a `size` arg.
// CONSIDERATION: If switching to `NewRandomSub`, there will be a max size
gossipSub, err := pubsub.NewGossipSub(ctx, cfg.Host)
if err != nil {
return nil, fmt.Errorf("creating gossip pubsub: %w", err)
Expand Down
25 changes: 14 additions & 11 deletions p2p/background/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/config"
typesP2P "github.com/pokt-network/pocket/p2p/types"
mock_types "github.com/pokt-network/pocket/p2p/types/mocks"
"github.com/pokt-network/pocket/p2p/utils"
Expand Down Expand Up @@ -123,7 +124,8 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {

var (
ctx = context.Background()
mu sync.Mutex
// mutex preventing concurrent writes to `seenMessages`
seenMessagesMutext sync.Mutex
// map used as a set to collect IDs of peers which have received a message
seenMessages = make(map[string]struct{})
bootstrapWaitgroup = sync.WaitGroup{}
Expand All @@ -147,7 +149,7 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
testHosts = append(testHosts, host)
expectedPeerIDs[i] = host.ID().String()
rtr := newRouterWithSelfPeerAndHost(t, selfPeer, host)
go readSubscription(t, ctx, &broadcastWaitgroup, rtr, &mu, seenMessages)
go readSubscription(t, ctx, &broadcastWaitgroup, rtr, &seenMessagesMutext, seenMessages)
}

// bootstrap off of arbitrary testHost
Expand All @@ -174,6 +176,8 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {

bootstrap(t, ctx, testHosts)

// broadcasting in a go routine so that we can wait for bootstrapping to
// complete before broadcasting.
go func() {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// wait for hosts to listen and peer discovery
bootstrapWaitgroup.Wait()
Expand All @@ -188,10 +192,8 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
t.Log("broadcasting...")
err := testRouter.Broadcast([]byte(testMsg))
require.NoError(t, err)
}()

// wait concurrently
go func() {
// wait for broadcast to be received by all peers
broadcastWaitgroup.Wait()
broadcastDone <- struct{}{}
}()
Expand All @@ -200,7 +202,7 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
select {
case <-testTimeout:
t.Fatalf(
"timed out waiting for message: got %d; wanted %d",
"timed out waiting for all expected messages: got %d; wanted %d",
len(seenMessages),
numPeers,
)
Expand All @@ -217,6 +219,7 @@ func bootstrap(t *testing.T, ctx context.Context, testHosts []libp2pHost.Host) {

t.Log("bootstrapping...")
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
bootstrapHost := testHosts[0]
bootstrapAddr := bootstrapHost.Addrs()[0]
for _, h := range testHosts {
if h.ID() == bootstrapHost.ID() {
continue
Expand All @@ -228,7 +231,7 @@ func bootstrap(t *testing.T, ctx context.Context, testHosts []libp2pHost.Host) {
addrInfo := libp2pPeer.AddrInfo{
ID: bootstrapHost.ID(),
Addrs: []multiaddr.Multiaddr{
bootstrapHost.Addrs()[0].Encapsulate(p2pAddr),
bootstrapAddr.Encapsulate(p2pAddr),
},
}

Expand Down Expand Up @@ -282,7 +285,7 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib
err := pstore.AddPeer(selfPeer)
require.NoError(t, err)

router, err := NewBackgroundRouter(busMock, &utils.RouterConfig{
router, err := NewBackgroundRouter(busMock, &config.BackgroundConfig{
Addr: selfPeer.GetAddress(),
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: consensusMock,
Expand All @@ -296,7 +299,7 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib
return libp2pNet
}

// TECHDEBT: move & de-dup
// TECHDEBT(#609): move & de-duplicate
func newTestPeer(t *testing.T) (cryptoPocket.PrivateKey, *typesP2P.NetworkPeer) {
t.Helper()

Expand Down Expand Up @@ -347,7 +350,7 @@ func newTestHost(t *testing.T, mockNet mocknet.Mocknet, privKey cryptoPocket.Pri
func readSubscription(
t *testing.T,
ctx context.Context,
wg *sync.WaitGroup,
broadcastWaitGroup *sync.WaitGroup,
rtr *backgroundRouter,
mu *sync.Mutex,
seenMsgs map[string]struct{},
Expand All @@ -366,7 +369,7 @@ func readSubscription(
require.NoError(t, err)

mu.Lock()
wg.Done()
broadcastWaitGroup.Done()
seenMsgs[rtr.host.ID().String()] = struct{}{}
mu.Unlock()
}
Expand Down
Loading