Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/sync,p2p: chain tip block fetch improvements #12224

Merged
merged 14 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 175 additions & 41 deletions polygon/p2p/fetcher_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,68 @@ import (
"errors"
"fmt"
"reflect"
"slices"
"time"

"github.com/cenkalti/backoff/v4"

"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/generics"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/eth/protocols/eth"
)

type RequestIdGenerator func() uint64

type FetcherConfig struct {
responseTimeout time.Duration
retryBackOff time.Duration
maxRetries uint64
}

type Fetcher interface {
// FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received.
FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error)
FetchHeaders(
ctx context.Context,
start uint64,
end uint64,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Header], error)

// FetchBodies fetches block bodies for the given headers from a peer. Blocks until data is received.
FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error)
// FetchBlocks fetches headers and bodies for a given [start, end) range from a peer and
// assembles them into blocks. Blocks until data is received.
FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error)
FetchBodies(
ctx context.Context,
headers []*types.Header,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Body], error)

// FetchBlocksBackwardsByHash fetches a number of blocks backwards starting from a block hash. Max amount is 1024
// blocks back. Blocks until data is received.
FetchBlocksBackwardsByHash(
ctx context.Context,
hash common.Hash,
amount uint64,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Block], error)
}

func NewFetcher(
logger log.Logger,
config FetcherConfig,
messageListener MessageListener,
messageSender MessageSender,
requestIdGenerator RequestIdGenerator,
) Fetcher {
return newFetcher(config, messageListener, messageSender, requestIdGenerator)
return newFetcher(logger, config, messageListener, messageSender, requestIdGenerator)
}

func newFetcher(
logger log.Logger,
config FetcherConfig,
messageListener MessageListener,
messageSender MessageSender,
requestIdGenerator RequestIdGenerator,
) *fetcher {
return &fetcher{
logger: logger,
config: config,
messageListener: messageListener,
messageSender: messageSender,
Expand All @@ -73,6 +91,7 @@ func newFetcher(
}

type fetcher struct {
logger log.Logger
config FetcherConfig
messageListener MessageListener
messageSender MessageSender
Expand All @@ -84,7 +103,13 @@ type FetcherResponse[T any] struct {
TotalSize int
}

func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
func (f *fetcher) FetchHeaders(
ctx context.Context,
start uint64,
end uint64,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Header], error) {
if start >= end {
return FetcherResponse[[]*types.Header]{}, &ErrInvalidFetchHeadersRange{
start: start,
Expand All @@ -103,8 +128,8 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
if amount%eth.MaxHeadersServe > 0 {
numChunks++
}
totalHeadersSize := 0

totalHeadersSize := 0
headers := make([]*types.Header, 0, amount)
for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ {
chunkStart := start + chunkNum*eth.MaxHeadersServe
Expand All @@ -113,9 +138,14 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
// a node may not respond with all MaxHeadersServe in 1 response,
// so we keep on consuming from last received number (akin to consuming a paging api)
// until we have all headers of the chunk or the peer stopped returning headers
headersChunk, err := fetchWithRetry(f.config, func() (FetcherResponse[[]*types.Header], error) {
return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId)
})
request := &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Number: chunkStart,
},
Amount: chunkEnd - chunkStart,
}

headersChunk, err := f.fetchHeadersWithRetry(ctx, request, peerId, f.config.CopyWithOptions(opts...))
if err != nil {
return FetcherResponse[[]*types.Header]{}, err
}
Expand All @@ -139,7 +169,12 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe
}, nil
}

func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) {
func (f *fetcher) FetchBodies(
ctx context.Context,
headers []*types.Header,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Body], error) {
var bodies []*types.Body
totalBodiesSize := 0

Expand All @@ -155,9 +190,7 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer
headersChunk = headers
}

bodiesChunk, err := fetchWithRetry(f.config, func() (*FetcherResponse[[]*types.Body], error) {
return f.fetchBodies(ctx, headersChunk, peerId)
})
bodiesChunk, err := f.fetchBodiesWithRetry(ctx, headersChunk, peerId, f.config.CopyWithOptions(opts...))
if err != nil {
return FetcherResponse[[]*types.Body]{}, err
}
Expand All @@ -176,29 +209,97 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer
}, nil
}

func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) {
headers, err := f.FetchHeaders(ctx, start, end, peerId)
func (f *fetcher) FetchBlocksBackwardsByHash(
ctx context.Context,
hash common.Hash,
amount uint64,
peerId *PeerId,
opts ...FetcherOption,
) (FetcherResponse[[]*types.Block], error) {
if amount == 0 || amount > eth.MaxHeadersServe {
return FetcherResponse[[]*types.Block]{}, fmt.Errorf("%w: amount=%d", ErrInvalidFetchBlocksAmount, amount)
}
request := &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Hash: hash,
},
Amount: amount,
Reverse: true,
}

config := f.config.CopyWithOptions(opts...)
headersResponse, err := f.fetchHeadersWithRetry(ctx, request, peerId, config)
if err != nil {
return FetcherResponse[[]*types.Block]{}, err
}

bodies, err := f.FetchBodies(ctx, headers.Data, peerId)
headers := headersResponse.Data
if len(headers) == 0 {
return FetcherResponse[[]*types.Block]{}, &ErrMissingHeaderHash{requested: hash}
}

startHeader := headers[0]
if startHeader.Hash() != hash {
err := &ErrUnexpectedHeaderHash{requested: hash, received: startHeader.Hash()}
return FetcherResponse[[]*types.Block]{}, err
}

offset := amount - 1 // safe, we check that amount > 0 at function start
startNum := startHeader.Number.Uint64()
if startNum > offset {
startNum = startNum - offset
} else {
startNum = 0
}

slices.Reverse(headers)

if err := f.validateHeadersResponse(headers, startNum, amount); err != nil {
return FetcherResponse[[]*types.Block]{}, err
}

bodiesResponse, err := f.FetchBodies(ctx, headers, peerId, opts...)
if err != nil {
return FetcherResponse[[]*types.Block]{}, err
}

blocks := make([]*types.Block, len(headers.Data))
for i, header := range headers.Data {
blocks[i] = types.NewBlockFromNetwork(header, bodies.Data[i])
bodies := bodiesResponse.Data
blocks := make([]*types.Block, len(headers))
for i, header := range headers {
blocks[i] = types.NewBlockFromNetwork(header, bodies[i])
}

return FetcherResponse[[]*types.Block]{
response := FetcherResponse[[]*types.Block]{
Data: blocks,
TotalSize: headers.TotalSize + bodies.TotalSize,
}, nil
TotalSize: headersResponse.TotalSize + bodiesResponse.TotalSize,
}

return response, nil
}

func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) {
func (f *fetcher) fetchHeadersWithRetry(
ctx context.Context,
request *eth.GetBlockHeadersPacket,
peerId *PeerId,
config FetcherConfig,
) (FetcherResponse[[]*types.Header], error) {
attempt := 1
return fetchWithRetry(config, func() (FetcherResponse[[]*types.Header], error) {
response, err := f.fetchHeaders(ctx, request, peerId, config.responseTimeout)
if err != nil {
f.logger.Debug("[p2p.fetcher] fetching headers failure", "attempt", attempt, "peerId", peerId, "err", err)
attempt++
}
return response, err
})
}

func (f *fetcher) fetchHeaders(
ctx context.Context,
request *eth.GetBlockHeadersPacket,
peerId *PeerId,
responseTimeout time.Duration,
) (FetcherResponse[[]*types.Header], error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -217,19 +318,14 @@ func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *P

requestId := f.requestIdGenerator()
err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{
RequestId: requestId,
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{
Number: start,
},
Amount: end - start,
},
RequestId: requestId,
GetBlockHeadersPacket: request,
})
if err != nil {
return FetcherResponse[[]*types.Header]{}, err
}

message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, responseTimeout, messages, filterBlockHeaders(peerId, requestId))
if err != nil {
return FetcherResponse[[]*types.Header]{}, err
}
Expand All @@ -249,6 +345,7 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
}
}

var prevHash common.Hash
for i, header := range headers {
expectedHeaderNum := start + uint64(i)
currentHeaderNumber := header.Number.Uint64()
Expand All @@ -258,6 +355,21 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
expected: expectedHeaderNum,
}
}

if i == 0 {
prevHash = header.Hash()
continue
}

if prevHash != header.ParentHash {
return &ErrNonSequentialHeaderHashes{
hash: header.Hash(),
parentHash: header.ParentHash,
prevHash: prevHash,
}
}

prevHash = header.Hash()
}

if headersLen < amount {
Expand All @@ -271,7 +383,29 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount
return nil
}

func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (*FetcherResponse[[]*types.Body], error) {
func (f *fetcher) fetchBodiesWithRetry(
ctx context.Context,
headers []*types.Header,
peerId *PeerId,
config FetcherConfig,
) (*FetcherResponse[[]*types.Body], error) {
attempt := 1
return fetchWithRetry(config, func() (*FetcherResponse[[]*types.Body], error) {
response, err := f.fetchBodies(ctx, headers, peerId, config.responseTimeout)
if err != nil {
f.logger.Debug("[p2p.fetcher] fetching bodies failure", "attempt", attempt, "peerId", peerId, "err", err)
attempt++
}
return response, err
})
}

func (f *fetcher) fetchBodies(
ctx context.Context,
headers []*types.Header,
peerId *PeerId,
responseTimeout time.Duration,
) (*FetcherResponse[[]*types.Body], error) {
// cleanup for the chan message observer
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -303,7 +437,7 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer
return nil, err
}

message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId))
message, messageSize, err := awaitResponse(ctx, responseTimeout, messages, filterBlockBodies(peerId, requestId))
if err != nil {
return nil, err
}
Expand Down
Loading
Loading