Skip to content

Commit

Permalink
polygon/p2p: address follow ups (#9470)
Browse files Browse the repository at this point in the history
addresses follow up comments left on
#9399
  • Loading branch information
taratorio authored Feb 20, 2024
1 parent ddea712 commit 45512e3
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 170 deletions.
129 changes: 0 additions & 129 deletions polygon/p2p/downloader.go

This file was deleted.

125 changes: 125 additions & 0 deletions polygon/p2p/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package p2p

import (
"context"
"errors"
"fmt"
"time"

"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/rlp"
)

const responseTimeout = 5 * time.Second

var invalidFetchHeadersRangeErr = errors.New("invalid fetch headers range")

type RequestIdGenerator func() uint64

type Fetcher interface {
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error)
}

func NewFetcher(
logger log.Logger,
messageListener MessageListener,
messageSender MessageSender,
peerPenalizer PeerPenalizer,
requestIdGenerator RequestIdGenerator,
) Fetcher {
return &fetcher{
logger: logger,
messageListener: messageListener,
messageSender: messageSender,
peerPenalizer: peerPenalizer,
requestIdGenerator: requestIdGenerator,
}
}

type fetcher struct {
logger log.Logger
messageListener MessageListener
messageSender MessageSender
peerPenalizer PeerPenalizer
requestIdGenerator RequestIdGenerator
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) {
if start >= end {
return nil, fmt.Errorf("%w: start=%d, end=%d", invalidFetchHeadersRangeErr, start, end)
}

var headers []*types.Header
requestId := f.requestIdGenerator()

observer := make(ChanMessageObserver[*sentry.InboundMessage])
f.messageListener.RegisterBlockHeadersObserver(observer)
defer f.messageListener.UnregisterBlockHeadersObserver(observer)

//
// TODO 1) chunk request into smaller ranges if needed to fit in the 2 MiB response size soft limit
// and also 1024 max headers server (check AnswerGetBlockHeadersQuery)
// 2) peer should return <= amount, check for > amount and penalize peer
//
err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{
RequestId: requestId,
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Number: start,
},
Amount: end - start,
},
})
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(ctx, responseTimeout)
defer cancel()
wg, ctx := errgroup.WithContext(ctx)

wg.Go(func() error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("interrupted while waiting for msg from peer: %w", ctx.Err())
case msg := <-observer:
msgPeerId := PeerIdFromH512(msg.PeerId)
if msgPeerId != peerId {
continue
}

var pkt eth.BlockHeadersPacket66
if err := rlp.DecodeBytes(msg.Data, &pkt); err != nil {
if rlp.IsInvalidRLPError(err) {
f.logger.Debug("penalizing peer for invalid rlp response", "peerId", peerId)
penalizeErr := f.peerPenalizer.Penalize(ctx, peerId)
if penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
}
}

return fmt.Errorf("failed to decode BlockHeadersPacket66: %w", err)
}

if pkt.RequestId != requestId {
continue
}

headers = pkt.BlockHeadersPacket
return nil
}
}
})

if err := wg.Wait(); err != nil {
return nil, err
}

return headers, nil
}
8 changes: 4 additions & 4 deletions polygon/p2p/message_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
type MessageListener interface {
Start(ctx context.Context)
Stop()
RegisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage])
UnregisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage])
RegisterBlockHeadersObserver(observer MessageObserver[*sentry.InboundMessage])
UnregisterBlockHeadersObserver(observer MessageObserver[*sentry.InboundMessage])
}

func NewMessageListener(logger log.Logger, sentryClient direct.SentryClient) MessageListener {
Expand Down Expand Up @@ -50,11 +50,11 @@ func (ml *messageListener) Stop() {
ml.stopWg.Wait()
}

func (ml *messageListener) RegisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) {
func (ml *messageListener) RegisterBlockHeadersObserver(observer MessageObserver[*sentry.InboundMessage]) {
ml.registerInboundMessageObserver(observer, sentry.MessageId_BLOCK_HEADERS_66)
}

func (ml *messageListener) UnregisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) {
func (ml *messageListener) UnregisterBlockHeadersObserver(observer MessageObserver[*sentry.InboundMessage]) {
ml.unregisterInboundMessageObserver(observer, sentry.MessageId_BLOCK_HEADERS_66)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ import (
"github.com/ledgerwatch/erigon/rlp"
)

type MessageBroadcaster interface {
GetBlockHeaders66(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error
type MessageSender interface {
SendGetBlockHeaders(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error
}

func NewMessageBroadcaster(sentryClient direct.SentryClient) MessageBroadcaster {
return &messageBroadcaster{
func NewMessageSender(sentryClient direct.SentryClient) MessageSender {
return &messageSender{
sentryClient: sentryClient,
}
}

type messageBroadcaster struct {
type messageSender struct {
sentryClient direct.SentryClient
}

func (mb *messageBroadcaster) GetBlockHeaders66(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error {
func (ms *messageSender) SendGetBlockHeaders(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error {
data, err := rlp.EncodeToBytes(req)
if err != nil {
return err
}

_, err = mb.sentryClient.SendMessageById(ctx, &sentry.SendMessageByIdRequest{
_, err = ms.sentryClient.SendMessageById(ctx, &sentry.SendMessageByIdRequest{
PeerId: peerId.H512(),
Data: &sentry.OutboundMessageData{
Id: sentry.MessageId_GET_BLOCK_HEADERS_66,
Expand Down
15 changes: 8 additions & 7 deletions polygon/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type Service interface {
Stop()
MaxPeers() int
PeersSyncProgress() PeersSyncProgress
DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error)
// FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received.
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error)
Penalize(ctx context.Context, peerId PeerId) error
}

Expand All @@ -33,12 +34,12 @@ func newService(
requestIdGenerator RequestIdGenerator,
) Service {
messageListener := NewMessageListener(logger, sentryClient)
messageBroadcaster := NewMessageBroadcaster(sentryClient)
messageSender := NewMessageSender(sentryClient)
peerPenalizer := NewPeerPenalizer(sentryClient)
downloader := NewDownloader(logger, messageListener, messageBroadcaster, peerPenalizer, requestIdGenerator)
fetcher := NewFetcher(logger, messageListener, messageSender, peerPenalizer, requestIdGenerator)
return &service{
config: config,
downloader: downloader,
fetcher: fetcher,
messageListener: messageListener,
peerPenalizer: peerPenalizer,
}
Expand All @@ -47,7 +48,7 @@ func newService(
type service struct {
once sync.Once
config p2p.Config
downloader Downloader
fetcher Fetcher
messageListener MessageListener
peerPenalizer PeerPenalizer
}
Expand All @@ -66,8 +67,8 @@ func (s *service) MaxPeers() int {
return s.config.MaxPeers
}

func (s *service) DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) {
return s.downloader.DownloadHeaders(ctx, start, end, peerId)
func (s *service) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) {
return s.fetcher.FetchHeaders(ctx, start, end, peerId)
}

func (s *service) Penalize(ctx context.Context, peerId PeerId) error {
Expand Down
12 changes: 6 additions & 6 deletions polygon/p2p/service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 45512e3

Please sign in to comment.