Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/p2p: add request chunking to FetchHeaders #9536

Merged
merged 15 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 187 additions & 31 deletions polygon/p2p/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package p2p

import (
"context"
"errors"
"fmt"
"time"

"github.com/ledgerwatch/log/v3"
"modernc.org/mathutil"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/rlp"
)

const responseTimeout = 5 * time.Second
const (
responseTimeout = 5 * time.Second
maxFetchHeadersRange = 16384
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for this limit? I feel that it should be up to the client to decide

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was so that we have an upper bound on number of requests sent to a peer at the same time - it is no longer applicable since we now changed to do 1 request at a time when chunking so ive removed it

)

type RequestIdGenerator func() uint64

Expand Down Expand Up @@ -54,36 +60,53 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
}

amount := end - start
requestId := f.requestIdGenerator()
observer := make(ChanMessageObserver[*sentry.InboundMessage])
if amount > maxFetchHeadersRange {
return nil, &ErrInvalidFetchHeadersRange{
start: start,
end: end,
}
}

// Soft response limits are:
// 1. 2 MB size
// 2. 1024 headers
//
// A header is approximately 500 bytes, hence 1024 headers should be less than 2 MB.
// As a simplification we can only use MaxHeadersServe for chunking.
chunks := amount / eth.MaxHeadersServe
if amount%eth.MaxHeadersServe > 0 {
chunks++
}

observer := make(ChanMessageObserver[*sentry.InboundMessage], chunks)
f.messageListener.RegisterBlockHeadersObserver(observer)
defer f.messageListener.UnregisterBlockHeadersObserver(observer)

//
// TODO 1) chunk request into smaller ranges if needed to fit in the 2 MiB response size soft limit
// and also 1024 max headers server (check AnswerGetBlockHeadersQuery)
// 2) peer should return <= amount, check for > amount and penalize peer
//
err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{
RequestId: requestId,
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Number: start,
requestIds := make(map[uint64]uint64, chunks)
for i := uint64(0); i < chunks; i++ {
chunkStart := start + i*eth.MaxHeadersServe
chunkAmount := mathutil.MinUint64(end-chunkStart, eth.MaxHeadersServe)
requestId := f.requestIdGenerator()
requestIds[requestId] = i

if err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't spam a peer with requests. let's do it sequentially: send one, and wait for a response, then send a 2nd one. if the response doesn't come in time, we should retry the request a couple of times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, this actually simplifies the code a lot - fixed
re: retrying - im doing this in a subsequent PR #9537 (it needs to be updated with the changes ive just done here)

RequestId: requestId,
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Number: chunkStart,
},
Amount: chunkAmount,
},
Amount: amount,
},
})
if err != nil {
return nil, err
}); err != nil {
return nil, err
}
}

ctx, cancel := context.WithTimeout(ctx, responseTimeout)
ctx, cancel := context.WithTimeout(ctx, time.Duration(len(requestIds))*responseTimeout)
defer cancel()

var headers []*types.Header
var requestReceived bool
for !requestReceived {
headerChunks := make([][]*types.Header, chunks)
for len(requestIds) > 0 {
select {
case <-ctx.Done():
return nil, fmt.Errorf("interrupted while waiting for msg from peer: %w", ctx.Err())
Expand All @@ -106,23 +129,52 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
return nil, fmt.Errorf("failed to decode BlockHeadersPacket66: %w", err)
}

if pkt.RequestId != requestId {
requestIdIndex, ok := requestIds[pkt.RequestId]
if !ok {
continue
}

headers = pkt.BlockHeadersPacket
requestReceived = true
headerChunks[requestIdIndex] = pkt.BlockHeadersPacket
delete(requestIds, pkt.RequestId)
}
}

headers := make([]*types.Header, 0, amount)
for _, headerChunk := range headerChunks {
for _, header := range headerChunk {
headers = append(headers, header)
}
}

if uint64(len(headers)) != amount {
if err := f.validateHeadersResponse(headers, start, end, amount); err != nil {
shouldPenalize := errors.Is(err, &ErrIncorrectOriginHeader{}) ||
errors.Is(err, &ErrTooManyHeaders{}) ||
errors.Is(err, &ErrDisconnectedHeaders{})

if shouldPenalize {
f.logger.Debug("penalizing peer", "peerId", peerId, "err", err.Error())

penalizeErr := f.peerPenalizer.Penalize(ctx, peerId)
if penalizeErr != nil {
err = fmt.Errorf("%w: %w", penalizeErr, err)
}
}

return nil, err
}

return headers, nil
}

func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, end, amount uint64) error {
if uint64(len(headers)) < amount {
var first, last uint64
if len(headers) > 0 {
first = headers[0].Number.Uint64()
last = headers[len(headers)-1].Number.Uint64()
}

return nil, &ErrIncompleteFetchHeadersResponse{
return &ErrIncompleteHeaders{
requestStart: start,
requestEnd: end,
first: first,
Expand All @@ -131,7 +183,44 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
}
}

return headers, nil
if uint64(len(headers)) > amount {
return &ErrTooManyHeaders{
requested: int(amount),
received: len(headers),
}
}

if start != headers[0].Number.Uint64() {
return &ErrIncorrectOriginHeader{
requested: start,
received: headers[0].Number.Uint64(),
}
}

var parentHeader *types.Header
for _, header := range headers {
if parentHeader == nil {
parentHeader = header
continue
}

parentHeaderHash := parentHeader.Hash()
currentHeaderNum := header.Number.Uint64()
parentHeaderNum := parentHeader.Number.Uint64()
if header.ParentHash != parentHeaderHash || currentHeaderNum != parentHeaderNum+1 {
return &ErrDisconnectedHeaders{
currentHash: header.Hash(),
currentParentHash: header.ParentHash,
currentNum: currentHeaderNum,
parentHash: parentHeaderHash,
parentNum: parentHeaderNum,
}
}

parentHeader = header
}

return nil
}

type ErrInvalidFetchHeadersRange struct {
Expand All @@ -143,25 +232,92 @@ func (e ErrInvalidFetchHeadersRange) Error() string {
return fmt.Sprintf("invalid fetch headers range: start=%d, end=%d", e.start, e.end)
}

type ErrIncompleteFetchHeadersResponse struct {
type ErrIncompleteHeaders struct {
requestStart uint64
requestEnd uint64
first uint64
last uint64
amount int
}

func (e ErrIncompleteFetchHeadersResponse) Error() string {
func (e ErrIncompleteHeaders) Error() string {
return fmt.Sprintf(
"incomplete fetch headers response: first=%d, last=%d, amount=%d, requested [%d, %d)",
e.first, e.last, e.amount, e.requestStart, e.requestEnd,
)
}

func (e ErrIncompleteFetchHeadersResponse) LowestMissingBlockNum() uint64 {
func (e ErrIncompleteHeaders) LowestMissingBlockNum() uint64 {
if e.last == 0 || e.first == 0 || e.first != e.requestStart {
return e.requestStart
}

return e.last + 1
}

type ErrTooManyHeaders struct {
requested int
received int
}

func (e ErrTooManyHeaders) Error() string {
return fmt.Sprintf("too many headers in fetch headers response: requested=%d, received=%d", e.requested, e.received)
}

func (e ErrTooManyHeaders) Is(err error) bool {
var errTooManyHeaders *ErrTooManyHeaders
switch {
case errors.As(err, &errTooManyHeaders):
return true
default:
return false
}
}

type ErrDisconnectedHeaders struct {
currentHash common.Hash
currentParentHash common.Hash
currentNum uint64
parentHash common.Hash
parentNum uint64
}

func (e ErrDisconnectedHeaders) Error() string {
return fmt.Sprintf(
"disconnected headers in fetch headers response: %s, %s, %s, %s, %s",
fmt.Sprintf("currentHash=%v", e.currentHash),
fmt.Sprintf("currentParentHash=%v", e.currentParentHash),
fmt.Sprintf("currentNum=%v", e.currentNum),
fmt.Sprintf("parentHash=%v", e.parentHash),
fmt.Sprintf("parentNum=%v", e.parentNum),
)
}

func (e ErrDisconnectedHeaders) Is(err error) bool {
var errDisconnectedHeaders *ErrDisconnectedHeaders
switch {
case errors.As(err, &errDisconnectedHeaders):
return true
default:
return false
}
}

type ErrIncorrectOriginHeader struct {
requested uint64
received uint64
}

func (e ErrIncorrectOriginHeader) Error() string {
return fmt.Sprintf("incorrect origin header: requested=%d, received=%d", e.requested, e.received)
}

func (e ErrIncorrectOriginHeader) Is(err error) bool {
var errIncorrectOriginHeader *ErrIncorrectOriginHeader
switch {
case errors.As(err, &errIncorrectOriginHeader):
return true
default:
return false
}
}
2 changes: 1 addition & 1 deletion polygon/p2p/fetcher_tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type trackingFetcher struct {
func (tf *trackingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) {
res, err := tf.Fetcher.FetchHeaders(ctx, start, end, peerId)
if err != nil {
var errIncompleteResponse *ErrIncompleteFetchHeadersResponse
var errIncompleteResponse *ErrIncompleteHeaders
if errors.As(err, &errIncompleteResponse) {
tf.peerTracker.BlockNumMissing(peerId, errIncompleteResponse.LowestMissingBlockNum())
} else if errors.Is(err, context.DeadlineExceeded) {
Expand Down
Loading