Skip to content

Commit

Permalink
wip: checkpoint - gossip/bootstrapping
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jun 1, 2023
1 parent 662d658 commit 66105a2
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 92 deletions.
2 changes: 1 addition & 1 deletion p2p/background/kad_discovery_baseline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestLibp2pKademliaPeerDiscovery(t *testing.T) {
expectedPeerIDs = append(expectedPeerIDs, host4.ID())

// TECHDEBT: consider using `host.ConnManager().Notifee()` to avoid sleeping here
time.Sleep(time.Millisecond * 500)
time.Sleep(dhtUpdateSleepDuration)

// new host discovers existing hosts...
host4DiscoveredHost2Addrs := host4.Peerstore().Addrs(host2.ID())
Expand Down
73 changes: 54 additions & 19 deletions p2p/background/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package background
import (
"context"
"fmt"
"sync"

dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
Expand All @@ -15,6 +13,7 @@ import (
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
Expand Down Expand Up @@ -52,7 +51,7 @@ type backgroundRouter struct {
kadDHT *dht.IpfsDHT
// TECHDEBT: `pstore` will likely be removed in future refactoring / simplification
// of the `Router` interface.
// pstore is the background router's peerstore.
// pstore is the background router's peerstore. Assigned in `backgroundRouter#setupPeerstore()`.
pstore typesP2P.Peerstore
}

Expand All @@ -69,14 +68,6 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2
return nil, err
}

// seed initial peerstore with current on-chain peer info (i.e. staked actors)
pstore, err := cfg.PeerstoreProvider.GetStakedPeerstoreAtHeight(
cfg.CurrentHeightProvider.CurrentHeight(),
)
if err != nil {
return nil, err
}

// CONSIDERATION: If switching to `NewRandomSub`, there will be a max size
gossipSub, err := pubsub.NewGossipSub(ctx, cfg.Host) //pubsub.WithFloodPublish(false),
//pubsub.WithMaxMessageSize(256),
Expand All @@ -96,6 +87,10 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2
return nil, fmt.Errorf("creating DHT: %w", err)
}

//if err := kadDHT.Bootstrap(ctx); err != nil {
// return nil, fmt.Errorf("bootstrapping DHT: %w", err)
//}

topic, err := gossipSub.Join(protocol.BackgroundTopicStr)
if err != nil {
return nil, fmt.Errorf("joining background topic: %w", err)
Expand All @@ -121,10 +116,16 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2
topic: topic,
subscription: subscription,
logger: networkLogger,
pstore: pstore,
handler: cfg.Handler,
}

if err := rtr.setupPeerstore(
cfg.PeerstoreProvider,
cfg.CurrentHeightProvider,
); err != nil {
return nil, err
}

go rtr.readSubscription(ctx)

return rtr, nil
Expand Down Expand Up @@ -188,18 +189,52 @@ func (rtr *backgroundRouter) RemovePeer(peer typesP2P.Peer) error {
return rtr.pstore.RemovePeer(peer.GetAddress())
}

var readCountMu sync.Mutex
func (rtr *backgroundRouter) setupPeerstore(
pstoreProvider providers.PeerstoreProvider,
currentHeightProvider providers.CurrentHeightProvider,
) (err error) {
// seed initial peerstore with current on-chain peer info (i.e. staked actors)
rtr.pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(
currentHeightProvider.CurrentHeight(),
)
if err != nil {
return err
}

// CONSIDERATION: add `GetPeers` method to `PeerstoreProvider` interface
// to avoid this loop.
for _, peer := range rtr.pstore.GetPeerList() {
if err := utils.AddPeerToLibp2pHost(rtr.host, peer); err != nil {
return err
}

// TODO: refactor: #bootstrap()
libp2pPeer, err := utils.Libp2pAddrInfoFromPeer(peer)
if err != nil {
return fmt.Errorf(
"converting peer info, pokt address: %s: %w",
peer.GetAddress(),
err,
)
}

// don't attempt to connect to self
if rtr.host.ID() == libp2pPeer.ID {
return nil
}

// TECHDEBT(#595): add ctx to interface methods and propagate down.
if err := rtr.host.Connect(context.TODO(), libp2pPeer); err != nil {
return fmt.Errorf("connecting to peer: %w", err)
}
}
return nil
}

func (rtr *backgroundRouter) readSubscription(ctx context.Context) {
// TODO_THIS_COMMIT: look into "topic validaton"
// (see: https://github.com/libp2p/specs/tree/master/pubsub#topic-validation)
readCount := 0
for {
readCountMu.Lock()
readCount++
fmt.Printf("readCount: %d\n", readCount)
readCountMu.Unlock()

msg, err := rtr.subscription.Next(ctx)
if ctx.Err() != nil {
fmt.Printf("error: %s\n", ctx.Err())
Expand Down
27 changes: 25 additions & 2 deletions p2p/background/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
// map used as a set to collect IDs of peers which have received a message
seenMessages = make(map[string]struct{})
bootstrapWaitgroup = sync.WaitGroup{}
bootstrapPeerIDCh = make(chan string)
bootstrapPeerIDs = make(map[string]struct{})
broadcastWaitgroup = sync.WaitGroup{}
broadcastDone = make(chan struct{}, 1)
testTimeout = time.After(testTimeoutDuration)
Expand Down Expand Up @@ -170,6 +172,19 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
})
}

// concurrently update the set of bootstrapped peer IDs as they connect
go func() {
for {
peerIDStr := <-bootstrapPeerIDCh
if _, ok := bootstrapPeerIDs[peerIDStr]; ok {
// already connected to this peer during bootstrapping
continue
}
bootstrapPeerIDs[peerIDStr] = struct{}{}
bootstrapWaitgroup.Done()
}
}()

// bootstrap off of arbitrary testHost
privKey, selfPeer := newTestPeer(t)

Expand All @@ -185,9 +200,14 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {

// setup notifee/notify BEFORE bootstrapping
notifee := &libp2pNetwork.NotifyBundle{
ConnectedF: func(_ libp2pNetwork.Network, _ libp2pNetwork.Conn) {
ConnectedF: func(_ libp2pNetwork.Network, conn libp2pNetwork.Conn) {
t.Logf("connected!")
bootstrapWaitgroup.Done()
t.Logf("local PeerID %s; remote PeerID: %s",
conn.LocalPeer().String(),
conn.RemotePeer().String(),
)
bootstrapPeerIDCh <- conn.RemotePeer().String()
//bootstrapWaitgroup.Done()
},
}
testRouter.host.Network().Notify(notifee)
Expand Down Expand Up @@ -229,6 +249,9 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
case <-broadcastDone:
}

seenMessagesMutex.Lock()
defer seenMessagesMutex.Unlock()

actualPeerIDs = generics_testutil.GetKeys[string](seenMessages)
require.ElementsMatchf(t, expectedPeerIDs, actualPeerIDs, "peerIDs don't match")
}
Expand Down
Loading

0 comments on commit 66105a2

Please sign in to comment.