Skip to content

Commit

Permalink
polygon/p2p: service composition (erigontech#9602)
Browse files Browse the repository at this point in the history
  • Loading branch information
taratorio authored and mriccobene committed Mar 13, 2024
1 parent af0ab02 commit d5c8c02
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 82 deletions.
63 changes: 14 additions & 49 deletions polygon/p2p/service.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
package p2p

import (
"context"
"math/rand"
"sync"
"time"

"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon/core/types"
)

//go:generate mockgen -destination=./service_mock.go -package=p2p . Service
//go:generate mockgen -source=./service.go -destination=./service_mock.go -package=p2p . Service
type Service interface {
Start(ctx context.Context)
Stop()
Fetcher
MessageListener
PeerTracker
PeerPenalizer
MaxPeers() int
ListPeersMayHaveBlockNum(blockNum uint64) []*PeerId
// FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received.
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error)
Penalize(ctx context.Context, peerId *PeerId) error
GetMessageListener() MessageListener
}

func NewService(maxPeers int, logger log.Logger, sentryClient direct.SentryClient) Service {
Expand Down Expand Up @@ -50,51 +44,22 @@ func newService(
fetcher = NewPenalizingFetcher(logger, fetcher, peerPenalizer)
fetcher = NewTrackingFetcher(fetcher, peerTracker)
return &service{
Fetcher: fetcher,
MessageListener: messageListener,
PeerPenalizer: peerPenalizer,
PeerTracker: peerTracker,
maxPeers: maxPeers,
fetcher: fetcher,
messageListener: messageListener,
peerPenalizer: peerPenalizer,
peerTracker: peerTracker,
logger: logger,
}
}

type service struct {
once sync.Once
maxPeers int
fetcher Fetcher
messageListener MessageListener
peerPenalizer PeerPenalizer
peerTracker PeerTracker
logger log.Logger
}

func (s *service) Start(ctx context.Context) {
s.once.Do(func() {
s.messageListener.Start(ctx)
})
}

func (s *service) Stop() {
s.messageListener.Stop()
Fetcher
MessageListener
PeerPenalizer
PeerTracker
maxPeers int
}

func (s *service) MaxPeers() int {
return s.maxPeers
}

func (s *service) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) {
return s.fetcher.FetchHeaders(ctx, start, end, peerId)
}

func (s *service) Penalize(ctx context.Context, peerId *PeerId) error {
return s.peerPenalizer.Penalize(ctx, peerId)
}

func (s *service) ListPeersMayHaveBlockNum(blockNum uint64) []*PeerId {
return s.peerTracker.ListPeersMayHaveBlockNum(blockNum)
}

func (s *service) GetMessageListener() MessageListener {
return s.messageListener
}
154 changes: 123 additions & 31 deletions polygon/p2p/service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions polygon/sync/tip_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (te *TipEvents) Events() <-chan Event {
}

func (te *TipEvents) Run(ctx context.Context) error {
newBlockObserverCancel := te.p2pService.GetMessageListener().RegisterNewBlockObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockPacket]) {
newBlockObserverCancel := te.p2pService.RegisterNewBlockObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockPacket]) {
block := message.Decoded.Block
te.events.PushEvent(Event{
Type: EventTypeNewHeader,
Expand All @@ -104,7 +104,7 @@ func (te *TipEvents) Run(ctx context.Context) error {
})
defer newBlockObserverCancel()

newBlockHashesObserverCancel := te.p2pService.GetMessageListener().RegisterNewBlockHashesObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockHashesPacket]) {
newBlockHashesObserverCancel := te.p2pService.RegisterNewBlockHashesObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockHashesPacket]) {
te.events.PushEvent(Event{
Type: EventTypeNewHeaderHashes,
newHeaderHashes: EventNewHeaderHashes{
Expand Down

0 comments on commit d5c8c02

Please sign in to comment.