Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/p2p: service composition #9602

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 14 additions & 49 deletions polygon/p2p/service.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
package p2p

import (
"context"
"math/rand"
"sync"
"time"

"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon/core/types"
)

//go:generate mockgen -destination=./service_mock.go -package=p2p . Service
//go:generate mockgen -source=./service.go -destination=./service_mock.go -package=p2p . Service
type Service interface {
Start(ctx context.Context)
Stop()
Fetcher
MessageListener
PeerTracker
PeerPenalizer
MaxPeers() int
ListPeersMayHaveBlockNum(blockNum uint64) []*PeerId
// FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received.
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error)
Penalize(ctx context.Context, peerId *PeerId) error
GetMessageListener() MessageListener
}

func NewService(maxPeers int, logger log.Logger, sentryClient direct.SentryClient) Service {
Expand Down Expand Up @@ -50,51 +44,22 @@ func newService(
fetcher = NewPenalizingFetcher(logger, fetcher, peerPenalizer)
fetcher = NewTrackingFetcher(fetcher, peerTracker)
return &service{
Fetcher: fetcher,
MessageListener: messageListener,
PeerPenalizer: peerPenalizer,
PeerTracker: peerTracker,
maxPeers: maxPeers,
fetcher: fetcher,
messageListener: messageListener,
peerPenalizer: peerPenalizer,
peerTracker: peerTracker,
logger: logger,
}
}

type service struct {
once sync.Once
maxPeers int
fetcher Fetcher
messageListener MessageListener
peerPenalizer PeerPenalizer
peerTracker PeerTracker
logger log.Logger
}

func (s *service) Start(ctx context.Context) {
s.once.Do(func() {
s.messageListener.Start(ctx)
})
}

func (s *service) Stop() {
s.messageListener.Stop()
Fetcher
MessageListener
PeerPenalizer
PeerTracker
maxPeers int
}

func (s *service) MaxPeers() int {
return s.maxPeers
}

func (s *service) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) ([]*types.Header, error) {
return s.fetcher.FetchHeaders(ctx, start, end, peerId)
}

func (s *service) Penalize(ctx context.Context, peerId *PeerId) error {
return s.peerPenalizer.Penalize(ctx, peerId)
}

func (s *service) ListPeersMayHaveBlockNum(blockNum uint64) []*PeerId {
return s.peerTracker.ListPeersMayHaveBlockNum(blockNum)
}

func (s *service) GetMessageListener() MessageListener {
return s.messageListener
}
154 changes: 123 additions & 31 deletions polygon/p2p/service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions polygon/sync/tip_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (te *TipEvents) Events() <-chan Event {
}

func (te *TipEvents) Run(ctx context.Context) error {
newBlockObserverCancel := te.p2pService.GetMessageListener().RegisterNewBlockObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockPacket]) {
newBlockObserverCancel := te.p2pService.RegisterNewBlockObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockPacket]) {
block := message.Decoded.Block
te.events.PushEvent(Event{
Type: EventTypeNewHeader,
Expand All @@ -104,7 +104,7 @@ func (te *TipEvents) Run(ctx context.Context) error {
})
defer newBlockObserverCancel()

newBlockHashesObserverCancel := te.p2pService.GetMessageListener().RegisterNewBlockHashesObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockHashesPacket]) {
newBlockHashesObserverCancel := te.p2pService.RegisterNewBlockHashesObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockHashesPacket]) {
te.events.PushEvent(Event{
Type: EventTypeNewHeaderHashes,
newHeaderHashes: EventNewHeaderHashes{
Expand Down
Loading