Skip to content

Commit

Permalink
wip: checkpoint - combined
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jun 1, 2023
1 parent 5898287 commit 662d658
Show file tree
Hide file tree
Showing 17 changed files with 335 additions and 124 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/foxcpp/go-mockdns v1.0.0
github.com/getkin/kin-openapi v0.107.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/vault/api v1.9.0
github.com/jackc/pgconn v1.13.0
github.com/jordanorelli/lexnum v0.0.0-20141216151731-460eeb125754
Expand Down Expand Up @@ -112,7 +113,6 @@ require (
github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.6.6 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect
Expand Down
30 changes: 30 additions & 0 deletions internal/testutil/bus/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package bus_testutil

import (
"github.com/regen-network/gocuke"

"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/internal/testutil/runtime"
"github.com/pokt-network/pocket/runtime/genesis"
"github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/modules/mocks"
)

func NewBus(
t gocuke.TestingT,
privKey crypto.PrivateKey,
serviceURL string,
genesisState *genesis.GenesisState,
busEventHandlerFactory testutil.BusEventHandlerFactory,
) *mock_modules.MockBus {
t.Helper()

runtimeMgrMock := runtime_testutil.BaseRuntimeManagerMock(
t, privKey,
serviceURL,
genesisState,
)
busMock := testutil.BusMockWithEventHandler(t, runtimeMgrMock, busEventHandlerFactory)
busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes()
return busMock
}
25 changes: 3 additions & 22 deletions internal/testutil/constructors/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package constructors
import (
libp2pHost "github.com/libp2p/go-libp2p/core/host"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/pokt-network/pocket/internal/testutil/bus"
consensus_testutil "github.com/pokt-network/pocket/internal/testutil/consensus"
persistence_testutil "github.com/pokt-network/pocket/internal/testutil/persistence"
telemetry_testutil "github.com/pokt-network/pocket/internal/testutil/telemetry"
Expand All @@ -11,7 +12,6 @@ import (

"github.com/pokt-network/pocket/internal/testutil"
p2p_testutil "github.com/pokt-network/pocket/internal/testutil/p2p"
runtime_testutil "github.com/pokt-network/pocket/internal/testutil/runtime"
"github.com/pokt-network/pocket/p2p"
"github.com/pokt-network/pocket/runtime/genesis"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
Expand Down Expand Up @@ -57,7 +57,7 @@ func NewBusesMocknetAndP2PModules(
}

privKey := privKeys[i]
busMock := NewBus(t, privKey, serviceURL, genesisState, busEventHandlerFactory)
busMock := bus_testutil.NewBus(t, privKey, serviceURL, genesisState, busEventHandlerFactory)
buses[serviceURL] = busMock

// TODO_THIS_COMMIT: refactor
Expand Down Expand Up @@ -118,29 +118,10 @@ func NewBusesAndP2PModuleWithHost(
) (*mock_modules.MockBus, modules.P2PModule) {
t.Helper()

busMock := NewBus(t, privKey, serviceURL, genesisState, busEventHandlerFactory)
busMock := bus_testutil.NewBus(t, privKey, serviceURL, genesisState, busEventHandlerFactory)
return busMock, NewP2PModuleWithHost(t, busMock, host)
}

func NewBus(
t gocuke.TestingT,
privKey cryptoPocket.PrivateKey,
serviceURL string,
genesisState *genesis.GenesisState,
busEventHandlerFactory testutil.BusEventHandlerFactory,
) *mock_modules.MockBus {
t.Helper()

runtimeMgrMock := runtime_testutil.BaseRuntimeManagerMock(
t, privKey,
serviceURL,
genesisState,
)
busMock := testutil.BusMockWithEventHandler(t, runtimeMgrMock, busEventHandlerFactory)
busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes()
return busMock
}

func NewP2PModuleWithHost(
t gocuke.TestingT,
busMock *mock_modules.MockBus,
Expand Down
3 changes: 3 additions & 0 deletions internal/testutil/generics/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package generics_testutil

type ProxyFactory[T any] func(target T) (proxy T)
56 changes: 44 additions & 12 deletions p2p/background/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package background
import (
"context"
"fmt"
"sync"

dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
"github.com/pokt-network/pocket/shared/messaging"
"google.golang.org/protobuf/proto"

"github.com/pokt-network/pocket/logger"
Expand Down Expand Up @@ -65,6 +65,10 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2
networkLogger := logger.Global.CreateLoggerForModule("backgroundRouter")
networkLogger.Info().Msg("Initializing background router")

if err := cfg.IsValid(); err != nil {
return nil, err
}

// seed initial peerstore with current on-chain peer info (i.e. staked actors)
pstore, err := cfg.PeerstoreProvider.GetStakedPeerstoreAtHeight(
cfg.CurrentHeightProvider.CurrentHeight(),
Expand All @@ -74,7 +78,9 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2
}

// CONSIDERATION: If switching to `NewRandomSub`, there will be a max size
gossipSub, err := pubsub.NewGossipSub(ctx, cfg.Host)
gossipSub, err := pubsub.NewGossipSub(ctx, cfg.Host) //pubsub.WithFloodPublish(false),
//pubsub.WithMaxMessageSize(256),

if err != nil {
return nil, fmt.Errorf("creating gossip pubsub: %w", err)
}
Expand All @@ -100,7 +106,10 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2
// > output buffer. The default length is 32 but it can be configured to avoid
// > dropping messages if the consumer is not reading fast enough.
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#WithBufferSize)
subscription, err := topic.Subscribe()
subscription, err := topic.Subscribe(
//pubsub.WithBufferSize(10),
//pubsub.With
)
if err != nil {
return nil, fmt.Errorf("subscribing to background topic: %w", err)
}
Expand All @@ -113,6 +122,7 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2
subscription: subscription,
logger: networkLogger,
pstore: pstore,
handler: cfg.Handler,
}

go rtr.readSubscription(ctx)
Expand All @@ -122,8 +132,18 @@ func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2

// Broadcast implements the respective `typesP2P.Router` interface method.
func (rtr *backgroundRouter) Broadcast(data []byte) error {
// CONSIDERATION: validate as PocketEnvelopeBz here (?)
// TODO_THIS_COMMIT: wrap in BackgroundMessage
backgroundMsg := &typesP2P.BackgroundMessage{
Data: data,
}
backgroundMsgBz, err := proto.Marshal(backgroundMsg)
if err != nil {
return err
}

// TECHDEBT(#595): add ctx to interface methods and propagate down.
return rtr.topic.Publish(context.TODO(), data)
return rtr.topic.Publish(context.TODO(), backgroundMsgBz)
}

// Send implements the respective `typesP2P.Router` interface method.
Expand Down Expand Up @@ -168,8 +188,24 @@ func (rtr *backgroundRouter) RemovePeer(peer typesP2P.Peer) error {
return rtr.pstore.RemovePeer(peer.GetAddress())
}

var readCountMu sync.Mutex

func (rtr *backgroundRouter) readSubscription(ctx context.Context) {
for msg, err := rtr.subscription.Next(ctx); ctx.Err() == nil; {
// TODO_THIS_COMMIT: look into "topic validaton"
// (see: https://github.com/libp2p/specs/tree/master/pubsub#topic-validation)
readCount := 0
for {
readCountMu.Lock()
readCount++
fmt.Printf("readCount: %d\n", readCount)
readCountMu.Unlock()

msg, err := rtr.subscription.Next(ctx)
if ctx.Err() != nil {
fmt.Printf("error: %s\n", ctx.Err())
return
}

if err != nil {
rtr.logger.Error().Err(err).
Msg("error reading from background topic subscription")
Expand All @@ -184,17 +220,13 @@ func (rtr *backgroundRouter) readSubscription(ctx context.Context) {
}
}

func (rtr *backgroundRouter) handleBackgroundMsg(data []byte) error {
func (rtr *backgroundRouter) handleBackgroundMsg(backgroundMsgBz []byte) error {
var backgroundMsg typesP2P.BackgroundMessage
if err := proto.Unmarshal(data, &backgroundMsg); err != nil {
if err := proto.Unmarshal(backgroundMsgBz, &backgroundMsg); err != nil {
return err
}

networkMessage := messaging.PocketEnvelope{}
if err := proto.Unmarshal(data, &networkMessage); err != nil {
return err
}
return nil
return rtr.handler(backgroundMsg.Data)
}

// isClientDebugMode returns the value of `ClientDebugMode` in the base config
Expand Down
Loading

0 comments on commit 662d658

Please sign in to comment.