Skip to content

Commit

Permalink
test: post-refactor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jun 22, 2023
1 parent 437afc8 commit 43cf671
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 97 deletions.
149 changes: 106 additions & 43 deletions p2p/background/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,41 @@ import (
"time"

"github.com/golang/mock/gomock"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
libp2pPeer "github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
typesP2P "github.com/pokt-network/pocket/p2p/types"
mock_types "github.com/pokt-network/pocket/p2p/types/mocks"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/runtime/defaults"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/messaging"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
"github.com/stretchr/testify/require"
)

// https://www.rfc-editor.org/rfc/rfc3986#section-3.2.2
const testIP6ServiceURL = "[2a00:1450:4005:802::2004]:8080"

// TECHDEBT(#609): move & de-dup.
var testLocalServiceURL = fmt.Sprintf("127.0.0.1:%d", defaults.DefaultP2PPort)
var (
testLocalServiceURL = fmt.Sprintf("127.0.0.1:%d", defaults.DefaultP2PPort)
noopHandler = func(data []byte) error { return nil }
)

func TestBackgroundRouter_AddPeer(t *testing.T) {
testRouter := newTestRouter(t, nil)
testRouter := newTestRouter(t, nil, noopHandler)
libp2pPStore := testRouter.host.Peerstore()

// NB: assert initial state
Expand Down Expand Up @@ -81,7 +90,7 @@ func TestBackgroundRouter_AddPeer(t *testing.T) {
}

func TestBackgroundRouter_RemovePeer(t *testing.T) {
testRouter := newTestRouter(t, nil)
testRouter := newTestRouter(t, nil, noopHandler)
peerstore := testRouter.host.Peerstore()

// NB: assert initial state
Expand Down Expand Up @@ -114,6 +123,73 @@ func TestBackgroundRouter_RemovePeer(t *testing.T) {
require.Len(t, existingPeerstoreAddrs, 1)
}

func TestBackgroundRouter_Validation(t *testing.T) {
ctx := context.Background()
libp2pMockNet := mocknet.New()

invalidWireFormatData := []byte("test message")
invalidPocketEnvelope := &anypb.Any{
TypeUrl: "/test",
Value: invalidWireFormatData,
}
invalidPocketEnvelopeBz, err := proto.Marshal(invalidPocketEnvelope)
require.NoError(t, err)

invalidMessages := [][]byte{
invalidWireFormatData,
invalidPocketEnvelopeBz,
}

receivedChan := make(chan struct{})

receiverPrivKey, receiverPeer := newTestPeer(t)
receiverHost := newTestHost(t, libp2pMockNet, receiverPrivKey)
receiverRouter := newRouterWithSelfPeerAndHost(t, receiverPeer, receiverHost, func(data []byte) error {
receivedChan <- struct{}{}
return nil
})

t.Cleanup(func() {
err := receiverRouter.Close()
require.NoError(t, err)
})

senderPrivKey, _ := newTestPeer(t)
senderHost := newTestHost(t, libp2pMockNet, senderPrivKey)
gossipPubsub, err := pubsub.NewGossipSub(ctx, senderHost)
require.NoError(t, err)

err = libp2pMockNet.LinkAll()
require.NoError(t, err)

receiverAddrInfo, err := utils.Libp2pAddrInfoFromPeer(receiverPeer)
require.NoError(t, err)

err = senderHost.Connect(ctx, receiverAddrInfo)
require.NoError(t, err)

topic, err := gossipPubsub.Join(protocol.BackgroundTopicStr)
require.NoError(t, err)

for _, invalidMessageBz := range invalidMessages {
err = topic.Publish(ctx, invalidMessageBz)
require.NoError(t, err)
}

select {
case <-time.After(time.Second * 2):
// TECHDEBT: find a better way to prove that pre-propagation validation
// works as expected. Ideally, we should be able to distinguish which
// invalid message was received in the event of failure.
//
// CONSIDERATION: we could use the telemetry module mock to set expectations
// for validation failure telemetry calls, which would probably be useful in
// their own right.
case <-receivedChan:
t.Fatal("expected message to not be received")
}
}

func TestBackgroundRouter_Broadcast(t *testing.T) {
const (
numPeers = 4
Expand All @@ -138,30 +214,44 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {
libp2pMockNet = mocknet.New()
)

// setup 4 libp2p hosts to listen for incoming streams from the test backgroundRouter
testPocketEnvelope, err := messaging.PackMessage(&anypb.Any{
TypeUrl: "/test",
Value: []byte(testMsg),
})
require.NoError(t, err)

testPocketEnvelopeBz, err := proto.Marshal(testPocketEnvelope)
require.NoError(t, err)

// setup 4 receiver routers to listen for incoming messages from the sender router
for i := 0; i < numPeers; i++ {
broadcastWaitgroup.Add(1)
bootstrapWaitgroup.Add(1)

privKey, selfPeer := newTestPeer(t)
privKey, peer := newTestPeer(t)
host := newTestHost(t, libp2pMockNet, privKey)
testHosts = append(testHosts, host)
expectedPeerIDs[i] = host.ID().String()
rtr := newRouterWithSelfPeerAndHost(t, selfPeer, host)
go readSubscription(t, ctx, &broadcastWaitgroup, rtr, &seenMessagesMutext, seenMessages)
newRouterWithSelfPeerAndHost(t, peer, host, func(data []byte) error {
seenMessagesMutext.Lock()
broadcastWaitgroup.Done()
seenMessages[host.ID().String()] = struct{}{}
seenMessagesMutext.Unlock()
return nil
})
}

// bootstrap off of arbitrary testHost
privKey, selfPeer := newTestPeer(t)

// set up a test backgroundRouter
testRouterHost := newTestHost(t, libp2pMockNet, privKey)
testRouter := newRouterWithSelfPeerAndHost(t, selfPeer, testRouterHost)
testRouter := newRouterWithSelfPeerAndHost(t, selfPeer, testRouterHost, noopHandler)
testHosts = append(testHosts, testRouterHost)

// simulate network links between each to every other
// (i.e. fully-connected network)
err := libp2pMockNet.LinkAll()
err = libp2pMockNet.LinkAll()
require.NoError(t, err)

// setup notifee/notify BEFORE bootstrapping
Expand Down Expand Up @@ -189,7 +279,7 @@ func TestBackgroundRouter_Broadcast(t *testing.T) {

// broadcast message
t.Log("broadcasting...")
err := testRouter.Broadcast([]byte(testMsg))
err := testRouter.Broadcast(testPocketEnvelopeBz)
require.NoError(t, err)

// wait for broadcast to be received by all peers
Expand Down Expand Up @@ -241,7 +331,7 @@ func bootstrap(t *testing.T, ctx context.Context, testHosts []libp2pHost.Host) {
}

// TECHDEBT(#609): move & de-duplicate
func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet) *backgroundRouter {
func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet, handler typesP2P.MessageHandler) *backgroundRouter {
t.Helper()

privKey, selfPeer := newTestPeer(t)
Expand All @@ -256,10 +346,10 @@ func newTestRouter(t *testing.T, libp2pMockNet mocknet.Mocknet) *backgroundRoute
require.NoError(t, err)
})

return newRouterWithSelfPeerAndHost(t, selfPeer, host)
return newRouterWithSelfPeerAndHost(t, selfPeer, host, handler)
}

func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host libp2pHost.Host) *backgroundRouter {
func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host libp2pHost.Host, handler typesP2P.MessageHandler) *backgroundRouter {
t.Helper()

ctrl := gomock.NewController(t)
Expand All @@ -268,7 +358,7 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib
P2P: &configs.P2PConfig{
IsClientOnly: false,
},
})
}).AnyTimes()

consensusMock := mockModules.NewMockConsensusModule(ctrl)
consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes()
Expand All @@ -289,6 +379,7 @@ func newRouterWithSelfPeerAndHost(t *testing.T, selfPeer typesP2P.Peer, host lib
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: consensusMock,
Host: host,
Handler: handler,
})
require.NoError(t, err)

Expand Down Expand Up @@ -345,31 +436,3 @@ func newTestHost(t *testing.T, mockNet mocknet.Mocknet, privKey cryptoPocket.Pri
// construct mock host
return newMockNetHostFromPeer(t, mockNet, privKey, peer)
}

func readSubscription(
t *testing.T,
ctx context.Context,
broadcastWaitGroup *sync.WaitGroup,
rtr *backgroundRouter,
mu *sync.Mutex,
seenMsgs map[string]struct{},
) {
t.Helper()

for {
if err := ctx.Err(); err != nil {
if err != context.Canceled || err != context.DeadlineExceeded {
require.NoError(t, err)
}
return
}

_, err := rtr.subscription.Next(ctx)
require.NoError(t, err)

mu.Lock()
broadcastWaitGroup.Done()
seenMsgs[rtr.host.ID().String()] = struct{}{}
mu.Unlock()
}
}
16 changes: 1 addition & 15 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,6 @@ func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, e
return new(p2pModule).Create(bus, options...)
}

// TODO_THIS_COMMIT: rename (WithHost) & consider moving to testutil file
// WithHostOption associates an existing (i.e. "started") libp2p `host.Host`
// with this module, instead of creating a new one on `#Start()`.
// Primarily intended for testing.
func WithHostOption(host libp2pHost.Host) modules.ModuleOption {
return func(m modules.InitializableModule) {
mod, ok := m.(*p2pModule)
if ok {
mod.host = host
mod.logger.Debug().Msg("using host provided via `WithHostOption`")
}
}
}

func (m *p2pModule) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
logger.Global.Debug().Msg("Creating P2P module")
*m = p2pModule{
Expand Down Expand Up @@ -157,7 +143,7 @@ func (m *p2pModule) Start() (err error) {
telemetry.P2P_NODE_STARTED_TIMESERIES_METRIC_DESCRIPTION,
)

// Return early if host has already been started (e.g. via `WithHostOption`)
// Return early if host has already been started (e.g. via `WithHost`)
if m.host == nil {
// Libp2p hosts provided via `WithHost()` option are destroyed when
// `#Stop()`ing the module. Therefore, a new one must be created.
Expand Down
36 changes: 12 additions & 24 deletions p2p/module_raintree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@ import (
"regexp"
"sort"
"strconv"
"strings"
"sync"
"testing"

libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/raintree"
)

// TODO(#314): Add the tooling and instructions on how to generate unit tests in this file.
Expand Down Expand Up @@ -220,11 +216,13 @@ func TestRainTreeNetworkCompleteTwentySevenNodes(t *testing.T) {
// 1. It creates and configures a "real" P2P module where all the other components of the node are mocked.
// 2. It then triggers a single message and waits for all of the expected messages transmission to complete before announcing failure.
func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig TestNetworkSimulationConfig) {
var readWriteWaitGroup sync.WaitGroup

// Configure & prepare test module
numValidators := len(networkSimulationConfig)
runtimeConfigs := createMockRuntimeMgrs(t, numValidators)
genesisMock := runtimeConfigs[0].GetGenesis()
busMocks := createMockBuses(t, runtimeConfigs)
busMocks := createMockBuses(t, runtimeConfigs, &readWriteWaitGroup)

valIds := make([]string, 0, numValidators)
for valId := range networkSimulationConfig {
Expand All @@ -241,58 +239,48 @@ func testRainTreeCalls(t *testing.T, origNode string, networkSimulationConfig Te

// Create connection and bus mocks along with a shared WaitGroup to track the number of expected
// reads and writes throughout the mocked local network
var wg sync.WaitGroup
for i, valId := range valIds {
expectedCall := networkSimulationConfig[valId]
expectedReads := expectedCall.numNetworkReads
expectedWrites := expectedCall.numNetworkWrites

log.Printf("[valId: %s] expected reads: %d\n", valId, expectedReads)
log.Printf("[valId: %s] expected writes: %d\n", valId, expectedWrites)
wg.Add(expectedReads)
wg.Add(expectedWrites)
readWriteWaitGroup.Add(expectedReads)
readWriteWaitGroup.Add(expectedWrites)

persistenceMock := preparePersistenceMock(t, busMocks[i], genesisMock)
consensusMock := prepareConsensusMock(t, busMocks[i])
telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &wg, expectedWrites)
telemetryMock := prepareTelemetryMock(t, busMocks[i], valId, &readWriteWaitGroup, expectedWrites)

prepareBusMock(busMocks[i], persistenceMock, consensusMock, telemetryMock)
}

libp2pMockNet := mocknet.New()
defer func() {
err := libp2pMockNet.Close()
require.NoError(t, err)
}()

// Inject the connection and bus mocks into the P2P modules
p2pModules := createP2PModules(t, busMocks, libp2pMockNet)

for serviceURL, p2pMod := range p2pModules {
for _, p2pMod := range p2pModules {
err := p2pMod.Start()
require.NoError(t, err)

sURL := strings.Clone(serviceURL)
mod := *p2pMod
p2pMod.host.SetStreamHandler(protocol.PoktProtocolID, func(stream libp2pNetwork.Stream) {
log.Printf("[valID: %s] Read\n", sURL)
(&mod).router.(*raintree.RainTreeRouter).HandleStream(stream)
wg.Done()
})
}

// Wait for completion
defer waitForNetworkSimulationCompletion(t, &wg)
defer waitForNetworkSimulationCompletion(t, &readWriteWaitGroup)
t.Cleanup(func() {
// Stop all p2p modules
for _, p2pMod := range p2pModules {
err := p2pMod.Stop()
require.NoError(t, err)
}

err := libp2pMockNet.Close()
require.NoError(t, err)
})

// Send the first message (by the originator) to trigger a RainTree broadcast
p := &anypb.Any{}
p := &anypb.Any{TypeUrl: "test"}
p2pMod := p2pModules[origNode]
require.NoError(t, p2pMod.Broadcast(p))
}
Expand Down
Loading

0 comments on commit 43cf671

Please sign in to comment.