diff --git a/polygon/p2p/fetcher_base.go b/polygon/p2p/fetcher_base.go index 5b678b9a8a1..142585148f8 100644 --- a/polygon/p2p/fetcher_base.go +++ b/polygon/p2p/fetcher_base.go @@ -21,50 +21,68 @@ import ( "errors" "fmt" "reflect" + "slices" "time" "github.com/cenkalti/backoff/v4" "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/generics" + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/core/types" "github.com/erigontech/erigon/eth/protocols/eth" ) type RequestIdGenerator func() uint64 -type FetcherConfig struct { - responseTimeout time.Duration - retryBackOff time.Duration - maxRetries uint64 -} - type Fetcher interface { // FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received. - FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) + FetchHeaders( + ctx context.Context, + start uint64, + end uint64, + peerId *PeerId, + opts ...FetcherOption, + ) (FetcherResponse[[]*types.Header], error) + // FetchBodies fetches block bodies for the given headers from a peer. Blocks until data is received. - FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) - // FetchBlocks fetches headers and bodies for a given [start, end) range from a peer and - // assembles them into blocks. Blocks until data is received. - FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) + FetchBodies( + ctx context.Context, + headers []*types.Header, + peerId *PeerId, + opts ...FetcherOption, + ) (FetcherResponse[[]*types.Body], error) + + // FetchBlocksBackwardsByHash fetches a number of blocks backwards starting from a block hash. Max amount is 1024 + // blocks back. Blocks until data is received. + FetchBlocksBackwardsByHash( + ctx context.Context, + hash common.Hash, + amount uint64, + peerId *PeerId, + opts ...FetcherOption, + ) (FetcherResponse[[]*types.Block], error) } func NewFetcher( + logger log.Logger, config FetcherConfig, messageListener MessageListener, messageSender MessageSender, requestIdGenerator RequestIdGenerator, ) Fetcher { - return newFetcher(config, messageListener, messageSender, requestIdGenerator) + return newFetcher(logger, config, messageListener, messageSender, requestIdGenerator) } func newFetcher( + logger log.Logger, config FetcherConfig, messageListener MessageListener, messageSender MessageSender, requestIdGenerator RequestIdGenerator, ) *fetcher { return &fetcher{ + logger: logger, config: config, messageListener: messageListener, messageSender: messageSender, @@ -73,6 +91,7 @@ func newFetcher( } type fetcher struct { + logger log.Logger config FetcherConfig messageListener MessageListener messageSender MessageSender @@ -84,7 +103,13 @@ type FetcherResponse[T any] struct { TotalSize int } -func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { +func (f *fetcher) FetchHeaders( + ctx context.Context, + start uint64, + end uint64, + peerId *PeerId, + opts ...FetcherOption, +) (FetcherResponse[[]*types.Header], error) { if start >= end { return FetcherResponse[[]*types.Header]{}, &ErrInvalidFetchHeadersRange{ start: start, @@ -103,8 +128,8 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe if amount%eth.MaxHeadersServe > 0 { numChunks++ } - totalHeadersSize := 0 + totalHeadersSize := 0 headers := make([]*types.Header, 0, amount) for chunkNum := uint64(0); chunkNum < numChunks; chunkNum++ { chunkStart := start + chunkNum*eth.MaxHeadersServe @@ -113,9 +138,14 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe // a node may not respond with all MaxHeadersServe in 1 response, // so we keep on consuming from last received number (akin to consuming a paging api) // until we have all headers of the chunk or the peer stopped returning headers - headersChunk, err := fetchWithRetry(f.config, func() (FetcherResponse[[]*types.Header], error) { - return f.fetchHeaders(ctx, chunkStart, chunkEnd, peerId) - }) + request := ð.GetBlockHeadersPacket{ + Origin: eth.HashOrNumber{ + Number: chunkStart, + }, + Amount: chunkEnd - chunkStart, + } + + headersChunk, err := f.fetchHeadersWithRetry(ctx, request, peerId, f.config.CopyWithOptions(opts...)) if err != nil { return FetcherResponse[[]*types.Header]{}, err } @@ -139,7 +169,12 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe }, nil } -func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) { +func (f *fetcher) FetchBodies( + ctx context.Context, + headers []*types.Header, + peerId *PeerId, + opts ...FetcherOption, +) (FetcherResponse[[]*types.Body], error) { var bodies []*types.Body totalBodiesSize := 0 @@ -155,9 +190,7 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer headersChunk = headers } - bodiesChunk, err := fetchWithRetry(f.config, func() (*FetcherResponse[[]*types.Body], error) { - return f.fetchBodies(ctx, headersChunk, peerId) - }) + bodiesChunk, err := f.fetchBodiesWithRetry(ctx, headersChunk, peerId, f.config.CopyWithOptions(opts...)) if err != nil { return FetcherResponse[[]*types.Body]{}, err } @@ -176,29 +209,97 @@ func (f *fetcher) FetchBodies(ctx context.Context, headers []*types.Header, peer }, nil } -func (f *fetcher) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) { - headers, err := f.FetchHeaders(ctx, start, end, peerId) +func (f *fetcher) FetchBlocksBackwardsByHash( + ctx context.Context, + hash common.Hash, + amount uint64, + peerId *PeerId, + opts ...FetcherOption, +) (FetcherResponse[[]*types.Block], error) { + if amount == 0 || amount > eth.MaxHeadersServe { + return FetcherResponse[[]*types.Block]{}, fmt.Errorf("%w: amount=%d", ErrInvalidFetchBlocksAmount, amount) + } + request := ð.GetBlockHeadersPacket{ + Origin: eth.HashOrNumber{ + Hash: hash, + }, + Amount: amount, + Reverse: true, + } + + config := f.config.CopyWithOptions(opts...) + headersResponse, err := f.fetchHeadersWithRetry(ctx, request, peerId, config) if err != nil { return FetcherResponse[[]*types.Block]{}, err } - bodies, err := f.FetchBodies(ctx, headers.Data, peerId) + headers := headersResponse.Data + if len(headers) == 0 { + return FetcherResponse[[]*types.Block]{}, &ErrMissingHeaderHash{requested: hash} + } + + startHeader := headers[0] + if startHeader.Hash() != hash { + err := &ErrUnexpectedHeaderHash{requested: hash, received: startHeader.Hash()} + return FetcherResponse[[]*types.Block]{}, err + } + + offset := amount - 1 // safe, we check that amount > 0 at function start + startNum := startHeader.Number.Uint64() + if startNum > offset { + startNum = startNum - offset + } else { + startNum = 0 + } + + slices.Reverse(headers) + + if err := f.validateHeadersResponse(headers, startNum, amount); err != nil { + return FetcherResponse[[]*types.Block]{}, err + } + + bodiesResponse, err := f.FetchBodies(ctx, headers, peerId, opts...) if err != nil { return FetcherResponse[[]*types.Block]{}, err } - blocks := make([]*types.Block, len(headers.Data)) - for i, header := range headers.Data { - blocks[i] = types.NewBlockFromNetwork(header, bodies.Data[i]) + bodies := bodiesResponse.Data + blocks := make([]*types.Block, len(headers)) + for i, header := range headers { + blocks[i] = types.NewBlockFromNetwork(header, bodies[i]) } - return FetcherResponse[[]*types.Block]{ + response := FetcherResponse[[]*types.Block]{ Data: blocks, - TotalSize: headers.TotalSize + bodies.TotalSize, - }, nil + TotalSize: headersResponse.TotalSize + bodiesResponse.TotalSize, + } + + return response, nil } -func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { +func (f *fetcher) fetchHeadersWithRetry( + ctx context.Context, + request *eth.GetBlockHeadersPacket, + peerId *PeerId, + config FetcherConfig, +) (FetcherResponse[[]*types.Header], error) { + attempt := 1 + return fetchWithRetry(config, func() (FetcherResponse[[]*types.Header], error) { + response, err := f.fetchHeaders(ctx, request, peerId, config.responseTimeout) + if err != nil { + f.logger.Debug("[p2p.fetcher] fetching headers failure", "attempt", attempt, "peerId", peerId, "err", err) + attempt++ + } + return response, err + }) +} + +func (f *fetcher) fetchHeaders( + ctx context.Context, + request *eth.GetBlockHeadersPacket, + peerId *PeerId, + responseTimeout time.Duration, +) (FetcherResponse[[]*types.Header], error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -217,19 +318,14 @@ func (f *fetcher) fetchHeaders(ctx context.Context, start, end uint64, peerId *P requestId := f.requestIdGenerator() err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{ - RequestId: requestId, - GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ - Origin: eth.HashOrNumber{ - Number: start, - }, - Amount: end - start, - }, + RequestId: requestId, + GetBlockHeadersPacket: request, }) if err != nil { return FetcherResponse[[]*types.Header]{}, err } - message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockHeaders(peerId, requestId)) + message, messageSize, err := awaitResponse(ctx, responseTimeout, messages, filterBlockHeaders(peerId, requestId)) if err != nil { return FetcherResponse[[]*types.Header]{}, err } @@ -249,6 +345,7 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount } } + var prevHash common.Hash for i, header := range headers { expectedHeaderNum := start + uint64(i) currentHeaderNumber := header.Number.Uint64() @@ -258,6 +355,21 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount expected: expectedHeaderNum, } } + + if i == 0 { + prevHash = header.Hash() + continue + } + + if prevHash != header.ParentHash { + return &ErrNonSequentialHeaderHashes{ + hash: header.Hash(), + parentHash: header.ParentHash, + prevHash: prevHash, + } + } + + prevHash = header.Hash() } if headersLen < amount { @@ -271,7 +383,29 @@ func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, amount return nil } -func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (*FetcherResponse[[]*types.Body], error) { +func (f *fetcher) fetchBodiesWithRetry( + ctx context.Context, + headers []*types.Header, + peerId *PeerId, + config FetcherConfig, +) (*FetcherResponse[[]*types.Body], error) { + attempt := 1 + return fetchWithRetry(config, func() (*FetcherResponse[[]*types.Body], error) { + response, err := f.fetchBodies(ctx, headers, peerId, config.responseTimeout) + if err != nil { + f.logger.Debug("[p2p.fetcher] fetching bodies failure", "attempt", attempt, "peerId", peerId, "err", err) + attempt++ + } + return response, err + }) +} + +func (f *fetcher) fetchBodies( + ctx context.Context, + headers []*types.Header, + peerId *PeerId, + responseTimeout time.Duration, +) (*FetcherResponse[[]*types.Body], error) { // cleanup for the chan message observer ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -303,7 +437,7 @@ func (f *fetcher) fetchBodies(ctx context.Context, headers []*types.Header, peer return nil, err } - message, messageSize, err := awaitResponse(ctx, f.config.responseTimeout, messages, filterBlockBodies(peerId, requestId)) + message, messageSize, err := awaitResponse(ctx, responseTimeout, messages, filterBlockBodies(peerId, requestId)) if err != nil { return nil, err } diff --git a/polygon/p2p/fetcher_base_test.go b/polygon/p2p/fetcher_base_test.go index 810d29921c3..7fe41620bc2 100644 --- a/polygon/p2p/fetcher_base_test.go +++ b/polygon/p2p/fetcher_base_test.go @@ -525,6 +525,244 @@ func TestFetcherFetchBodiesErrMissingBodies(t *testing.T) { }) } +func TestFetcherFetchBlockByHash(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + mockHeader := newMockBlockHeaders(1)[0] + mockHash := mockHeader.Hash() + requestId1 := uint64(1234) + mockInboundMessages1 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, []*types.Header{mockHeader}), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: mockHash, + wantRequestAmount: 1, + wantReverse: true, + } + requestId2 := uint64(1235) + mockInboundMessages2 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_BODIES_66, + PeerId: peerId.H512(), + Data: newMockBlockBodiesPacketBytes(t, requestId2, &types.Body{}), + }, + } + mockRequestResponse2 := requestResponseMock{ + requestId: requestId2, + mockResponseInboundMessages: mockInboundMessages2, + wantRequestPeerId: peerId, + wantRequestHashes: []common.Hash{mockHash}, + } + + test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2)) + test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2) + test.run(func(ctx context.Context, t *testing.T) { + response, err := test.fetcher.FetchBlocksBackwardsByHash(ctx, mockHash, 1, peerId) + require.NoError(t, err) + require.Len(t, response.Data, 1) + require.Equal(t, mockHash, response.Data[0].Header().Hash()) + }) +} + +func TestFetcherFetchBlockByHashErrMissingHeaderHash(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + mockHeader := newMockBlockHeaders(1)[0] + mockHash := mockHeader.Hash() + requestId1 := uint64(1234) + mockInboundMessages1 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, []*types.Header{}), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: mockHash, + wantRequestAmount: 1, + wantReverse: true, + } + + test := newFetcherTest(t, newMockRequestGenerator(requestId1)) + test.mockSentryStreams(mockRequestResponse1) + test.run(func(ctx context.Context, t *testing.T) { + response, err := test.fetcher.FetchBlocksBackwardsByHash(ctx, mockHash, 1, peerId) + require.ErrorIs(t, err, &ErrMissingHeaderHash{}) + require.Nil(t, response.Data, response) + }) +} + +func TestFetcherFetchBlockByHashErrTooManyHeaders(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + mockHeaders := newMockBlockHeaders(3) + mockHeader := mockHeaders[0] + mockHash := mockHeader.Hash() + requestId1 := uint64(1234) + mockInboundMessages1 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: mockHash, + wantRequestAmount: 1, + wantReverse: true, + } + + test := newFetcherTest(t, newMockRequestGenerator(requestId1)) + test.mockSentryStreams(mockRequestResponse1) + test.run(func(ctx context.Context, t *testing.T) { + response, err := test.fetcher.FetchBlocksBackwardsByHash(ctx, mockHash, 1, peerId) + require.ErrorIs(t, err, &ErrTooManyHeaders{}) + require.Nil(t, response.Data, response) + }) +} + +func TestFetcherFetchBlockByHashErrUnexpectedHeaderHash(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + mockHeaders := newMockBlockHeaders(2) + mockHeader := mockHeaders[0] + mockHash := mockHeader.Hash() + incorrectHeaderResponse := mockHeaders[1] + requestId1 := uint64(1234) + mockInboundMessages1 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, []*types.Header{incorrectHeaderResponse}), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: mockHash, + wantRequestAmount: 1, + wantReverse: true, + } + + test := newFetcherTest(t, newMockRequestGenerator(requestId1)) + test.mockSentryStreams(mockRequestResponse1) + test.run(func(ctx context.Context, t *testing.T) { + response, err := test.fetcher.FetchBlocksBackwardsByHash(ctx, mockHash, 1, peerId) + require.ErrorIs(t, err, &ErrUnexpectedHeaderHash{}) + require.Nil(t, response.Data, response) + }) +} + +func TestFetcherFetchBlockByHashErrTooManyBodies(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + mockHeader := newMockBlockHeaders(1)[0] + mockHash := mockHeader.Hash() + requestId1 := uint64(1234) + mockInboundMessages1 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, []*types.Header{mockHeader}), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: mockHash, + wantRequestAmount: 1, + wantReverse: true, + } + requestId2 := uint64(1235) + mockInboundMessages2 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_BODIES_66, + PeerId: peerId.H512(), + Data: newMockBlockBodiesPacketBytes(t, requestId2, &types.Body{}, &types.Body{}), + }, + } + mockRequestResponse2 := requestResponseMock{ + requestId: requestId2, + mockResponseInboundMessages: mockInboundMessages2, + wantRequestPeerId: peerId, + wantRequestHashes: []common.Hash{mockHash}, + } + + test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2)) + test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2) + test.run(func(ctx context.Context, t *testing.T) { + response, err := test.fetcher.FetchBlocksBackwardsByHash(ctx, mockHash, 1, peerId) + require.ErrorIs(t, err, &ErrTooManyBodies{}) + require.Nil(t, response.Data) + }) +} + +func TestFetcherFetchBlockByHashErrMissingBodies(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + mockHeader := newMockBlockHeaders(1)[0] + mockHash := mockHeader.Hash() + requestId1 := uint64(1234) + mockInboundMessages1 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, []*types.Header{mockHeader}), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: mockHash, + wantRequestAmount: 1, + wantReverse: true, + } + requestId2 := uint64(1235) + mockInboundMessages2 := []*sentryproto.InboundMessage{ + { + Id: sentryproto.MessageId_BLOCK_BODIES_66, + PeerId: peerId.H512(), + Data: newMockBlockBodiesPacketBytes(t, requestId2), + }, + } + mockRequestResponse2 := requestResponseMock{ + requestId: requestId2, + mockResponseInboundMessages: mockInboundMessages2, + wantRequestPeerId: peerId, + wantRequestHashes: []common.Hash{mockHash}, + } + + test := newFetcherTest(t, newMockRequestGenerator(requestId1, requestId2)) + test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2) + test.run(func(ctx context.Context, t *testing.T) { + response, err := test.fetcher.FetchBlocksBackwardsByHash(ctx, mockHash, 1, peerId) + require.ErrorIs(t, err, &ErrMissingBodies{}) + require.Nil(t, response.Data) + }) +} + func newFetcherTest(t *testing.T, requestIdGenerator RequestIdGenerator) *fetcherTest { ctx, cancel := context.WithCancel(context.Background()) fetcherConfig := FetcherConfig{ @@ -541,7 +779,7 @@ func newFetcherTest(t *testing.T, requestIdGenerator RequestIdGenerator) *fetche peerPenalizer := NewPeerPenalizer(sentryClient) messageListener := NewMessageListener(logger, sentryClient, statusDataFactory, peerPenalizer) messageSender := NewMessageSender(sentryClient) - fetcher := newFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator) + fetcher := newFetcher(logger, fetcherConfig, messageListener, messageSender, requestIdGenerator) return &fetcherTest{ ctx: ctx, ctxCancel: cancel, @@ -682,10 +920,18 @@ func (ft *fetcherTest) mockSendMessageByIdForHeaders(req *sentryproto.SendMessag return requestResponseMock{}, fmt.Errorf("wantRequestOriginNumber != pkt.Origin.Number - %v vs %v", mock.wantRequestOriginNumber, pkt.Origin.Number) } + if mock.wantRequestOriginHash != pkt.Origin.Hash { + return requestResponseMock{}, fmt.Errorf("wantRequestOriginHash != pkt.Origin.Hash - %v vs %v", mock.wantRequestOriginHash, pkt.Origin.Hash) + } + if mock.wantRequestAmount != pkt.Amount { return requestResponseMock{}, fmt.Errorf("wantRequestAmount != pkt.Amount - %v vs %v", mock.wantRequestAmount, pkt.Amount) } + if mock.wantReverse != pkt.Reverse { + return requestResponseMock{}, fmt.Errorf("wantReverse != pkt.Reverse - %v vs %v", mock.wantReverse, pkt.Reverse) + } + return mock, nil } @@ -744,7 +990,9 @@ type requestResponseMock struct { // FetchHeaders only wantRequestOriginNumber uint64 + wantRequestOriginHash common.Hash wantRequestAmount uint64 + wantReverse bool // FetchBodies only wantRequestHashes []common.Hash diff --git a/polygon/p2p/fetcher_config.go b/polygon/p2p/fetcher_config.go new file mode 100644 index 00000000000..433de2b453d --- /dev/null +++ b/polygon/p2p/fetcher_config.go @@ -0,0 +1,55 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package p2p + +import "time" + +var defaultFetcherConfig = FetcherConfig{ + responseTimeout: 5 * time.Second, + retryBackOff: time.Second, + maxRetries: 1, +} + +type FetcherConfig struct { + responseTimeout time.Duration + retryBackOff time.Duration + maxRetries uint64 +} + +func (fc FetcherConfig) CopyWithOptions(opts ...FetcherOption) FetcherConfig { + res := fc + for _, opt := range opts { + res = opt(res) + } + return res +} + +type FetcherOption func(FetcherConfig) FetcherConfig + +func WithResponseTimeout(responseTimeout time.Duration) FetcherOption { + return func(config FetcherConfig) FetcherConfig { + config.responseTimeout = responseTimeout + return config + } +} + +func WithMaxRetries(maxRetries uint64) FetcherOption { + return func(config FetcherConfig) FetcherConfig { + config.maxRetries = maxRetries + return config + } +} diff --git a/polygon/p2p/fetcher_errors.go b/polygon/p2p/fetcher_errors.go index d9ec918b876..76152c39134 100644 --- a/polygon/p2p/fetcher_errors.go +++ b/polygon/p2p/fetcher_errors.go @@ -21,9 +21,12 @@ import ( "fmt" "sort" + "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon/core/types" ) +var ErrInvalidFetchBlocksAmount = errors.New("invalid fetch blocks amount") + type ErrInvalidFetchHeadersRange struct { start uint64 end uint64 @@ -60,6 +63,33 @@ func (e ErrIncompleteHeaders) LowestMissingBlockNum() uint64 { return e.start + e.received } +type ErrMissingHeaderHash struct { + requested common.Hash +} + +func (e ErrMissingHeaderHash) Error() string { + return fmt.Sprintf("missing header hash: requested=%s", e.requested) +} + +func (e ErrMissingHeaderHash) Is(err error) bool { + var errMissingHeaderHash *ErrMissingHeaderHash + return errors.As(err, &errMissingHeaderHash) +} + +type ErrUnexpectedHeaderHash struct { + requested common.Hash + received common.Hash +} + +func (e ErrUnexpectedHeaderHash) Error() string { + return fmt.Sprintf("unexpected headers hash: requested=%s, received=%s", e.requested, e.received) +} + +func (e ErrUnexpectedHeaderHash) Is(err error) bool { + var errUnexpectedHeaderHash *ErrUnexpectedHeaderHash + return errors.As(err, &errUnexpectedHeaderHash) +} + type ErrTooManyHeaders struct { requested int received int @@ -91,6 +121,24 @@ func (e ErrNonSequentialHeaderNumbers) Is(err error) bool { return errors.As(err, &errDisconnectedHeaders) } +type ErrNonSequentialHeaderHashes struct { + hash common.Hash + parentHash common.Hash + prevHash common.Hash +} + +func (e ErrNonSequentialHeaderHashes) Error() string { + return fmt.Sprintf( + "non sequential header hashes in fetch headers response: hash=%s parentHash=%s, prevHash=%s", + e.hash, e.parentHash, e.prevHash, + ) +} + +func (e ErrNonSequentialHeaderHashes) Is(err error) bool { + var errNonSequentialHeaderHashes *ErrNonSequentialHeaderHashes + return errors.As(err, &errNonSequentialHeaderHashes) +} + type ErrTooManyBodies struct { requested int received int diff --git a/polygon/p2p/fetcher_penalizing.go b/polygon/p2p/fetcher_penalizing.go index d9037c6453b..0753629432f 100644 --- a/polygon/p2p/fetcher_penalizing.go +++ b/polygon/p2p/fetcher_penalizing.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" + "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/core/types" @@ -34,37 +35,45 @@ func newPenalizingFetcher(logger log.Logger, fetcher Fetcher, peerPenalizer Peer fetchHeadersPenalizeErrs := []error{ &ErrTooManyHeaders{}, &ErrNonSequentialHeaderNumbers{}, + &ErrNonSequentialHeaderHashes{}, } fetchBodiesPenalizeErrs := []error{ &ErrTooManyBodies{}, + &ErrMissingBodies{}, } - fetchBlocksPenalizeErrs := make([]error, 0, len(fetchHeadersPenalizeErrs)+len(fetchBodiesPenalizeErrs)) - fetchBlocksPenalizeErrs = append(fetchBlocksPenalizeErrs, fetchHeadersPenalizeErrs...) - fetchBlocksPenalizeErrs = append(fetchBlocksPenalizeErrs, fetchBodiesPenalizeErrs...) + fetchBlocksBackwardsByHashPenalizeErrs := append([]error{}, fetchHeadersPenalizeErrs...) + fetchBlocksBackwardsByHashPenalizeErrs = append(fetchBlocksBackwardsByHashPenalizeErrs, &ErrUnexpectedHeaderHash{}) + fetchBlocksBackwardsByHashPenalizeErrs = append(fetchBlocksBackwardsByHashPenalizeErrs, fetchBodiesPenalizeErrs...) return &penalizingFetcher{ - Fetcher: fetcher, - logger: logger, - peerPenalizer: peerPenalizer, - fetchHeadersPenalizeErrs: fetchHeadersPenalizeErrs, - fetchBodiesPenalizeErrs: fetchBodiesPenalizeErrs, - fetchBlocksPenalizeErrs: fetchBlocksPenalizeErrs, + Fetcher: fetcher, + logger: logger, + peerPenalizer: peerPenalizer, + fetchHeadersPenalizeErrs: fetchHeadersPenalizeErrs, + fetchBodiesPenalizeErrs: fetchBodiesPenalizeErrs, + fetchBlocksBackwardsByHashPenalizeErrs: fetchBlocksBackwardsByHashPenalizeErrs, } } type penalizingFetcher struct { Fetcher - logger log.Logger - peerPenalizer PeerPenalizer - fetchHeadersPenalizeErrs []error - fetchBodiesPenalizeErrs []error - fetchBlocksPenalizeErrs []error + logger log.Logger + peerPenalizer PeerPenalizer + fetchHeadersPenalizeErrs []error + fetchBodiesPenalizeErrs []error + fetchBlocksBackwardsByHashPenalizeErrs []error } -func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { - headers, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId) +func (pf *penalizingFetcher) FetchHeaders( + ctx context.Context, + start uint64, + end uint64, + peerId *PeerId, + opts ...FetcherOption, +) (FetcherResponse[[]*types.Header], error) { + headers, err := pf.Fetcher.FetchHeaders(ctx, start, end, peerId, opts...) if err != nil { return FetcherResponse[[]*types.Header]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchHeadersPenalizeErrs...) } @@ -72,8 +81,13 @@ func (pf *penalizingFetcher) FetchHeaders(ctx context.Context, start uint64, end return headers, nil } -func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) { - bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId) +func (pf *penalizingFetcher) FetchBodies( + ctx context.Context, + headers []*types.Header, + peerId *PeerId, + opts ...FetcherOption, +) (FetcherResponse[[]*types.Body], error) { + bodies, err := pf.Fetcher.FetchBodies(ctx, headers, peerId, opts...) if err != nil { return FetcherResponse[[]*types.Body]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchBodiesPenalizeErrs...) } @@ -81,10 +95,17 @@ func (pf *penalizingFetcher) FetchBodies(ctx context.Context, headers []*types.H return bodies, nil } -func (pf *penalizingFetcher) FetchBlocks(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) { - blocks, err := pf.Fetcher.FetchBlocks(ctx, start, end, peerId) +func (pf *penalizingFetcher) FetchBlocksBackwardsByHash( + ctx context.Context, + hash common.Hash, + amount uint64, + peerId *PeerId, + opts ...FetcherOption, +) (FetcherResponse[[]*types.Block], error) { + blocks, err := pf.Fetcher.FetchBlocksBackwardsByHash(ctx, hash, amount, peerId, opts...) if err != nil { - return FetcherResponse[[]*types.Block]{}, pf.maybePenalize(ctx, peerId, err, pf.fetchBlocksPenalizeErrs...) + err = pf.maybePenalize(ctx, peerId, err, pf.fetchBlocksBackwardsByHashPenalizeErrs...) + return FetcherResponse[[]*types.Block]{}, err } return blocks, nil diff --git a/polygon/p2p/fetcher_penalizing_test.go b/polygon/p2p/fetcher_penalizing_test.go index 161fc9d4334..38af2958e66 100644 --- a/polygon/p2p/fetcher_penalizing_test.go +++ b/polygon/p2p/fetcher_penalizing_test.go @@ -28,184 +28,155 @@ import ( "github.com/erigontech/erigon/core/types" ) -func TestPenalizingFetcherShouldPenalizePeerWhenErrTooManyHeaders(t *testing.T) { +func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrTooManyHeaders(t *testing.T) { t.Parallel() - for _, tc := range []struct { - name string - method func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error - }{ - { - name: "FetchHeaders", - method: func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error { - headers, err := test.penalizingFetcher.FetchHeaders(ctx, start, end, peerId) - require.Nil(test.t, headers.Data) - return err - }, - }, + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockInboundMessages := []*sentry.InboundMessage{ { - name: "FetchBlocks", - method: func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error { - blocks, err := test.penalizingFetcher.FetchBlocks(ctx, start, end, peerId) - require.Nil(test.t, blocks.Data) - return err - }, + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + // response should contain 2 headers instead we return 5 + Data: newMockBlockHeadersPacket66Bytes(t, requestId, 5), }, - } { - t.Run(tc.name, func(t *testing.T) { - peerId := PeerIdFromUint64(1) - requestId := uint64(1234) - mockInboundMessages := []*sentry.InboundMessage{ - { - Id: sentry.MessageId_BLOCK_HEADERS_66, - PeerId: peerId.H512(), - // response should contain 2 headers instead we return 5 - Data: newMockBlockHeadersPacket66Bytes(t, requestId, 5), - }, - } - mockRequestResponse := requestResponseMock{ - requestId: requestId, - mockResponseInboundMessages: mockInboundMessages, - wantRequestPeerId: peerId, - wantRequestOriginNumber: 1, - wantRequestAmount: 2, - } - - test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) - test.mockSentryStreams(mockRequestResponse) - // setup expectation that peer should be penalized - mockExpectPenalizePeer(t, test.sentryClient, peerId) - test.run(func(ctx context.Context, t *testing.T) { - var errTooManyHeaders *ErrTooManyHeaders - err := tc.method(ctx, 1, 3, peerId, test) - require.ErrorAs(t, err, &errTooManyHeaders) - require.Equal(t, 2, errTooManyHeaders.requested) - require.Equal(t, 5, errTooManyHeaders.received) - }) - }) } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 2, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errTooManyHeaders *ErrTooManyHeaders + headers, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 3, peerId) + require.ErrorAs(t, err, &errTooManyHeaders) + require.Equal(t, 2, errTooManyHeaders.requested) + require.Equal(t, 5, errTooManyHeaders.received) + require.Nil(t, headers.Data) + }) } -func TestPenalizingFetcherShouldPenalizePeerWhenErrNonSequentialHeaderNumbers(t *testing.T) { +func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrNonSequentialHeaderNumbers(t *testing.T) { t.Parallel() - for _, tc := range []struct { - name string - method func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error - }{ - { - name: "FetchHeaders", - method: func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error { - headers, err := test.penalizingFetcher.FetchHeaders(ctx, start, end, peerId) - require.Nil(test.t, headers.Data) - return err - }, - }, + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockBlockHeaders := newMockBlockHeaders(5) + disconnectedHeaders := make([]*types.Header, 3) + disconnectedHeaders[0] = mockBlockHeaders[0] + disconnectedHeaders[1] = mockBlockHeaders[2] + disconnectedHeaders[2] = mockBlockHeaders[4] + mockInboundMessages := []*sentry.InboundMessage{ { - name: "FetchBlocks", - method: func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error { - blocks, err := test.penalizingFetcher.FetchBlocks(ctx, start, end, peerId) - require.Nil(test.t, blocks.Data) - return err - }, + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId, disconnectedHeaders), }, - } { - t.Run(tc.name, func(t *testing.T) { - peerId := PeerIdFromUint64(1) - requestId := uint64(1234) - mockBlockHeaders := newMockBlockHeaders(5) - disconnectedHeaders := make([]*types.Header, 3) - disconnectedHeaders[0] = mockBlockHeaders[0] - disconnectedHeaders[1] = mockBlockHeaders[2] - disconnectedHeaders[2] = mockBlockHeaders[4] - mockInboundMessages := []*sentry.InboundMessage{ - { - Id: sentry.MessageId_BLOCK_HEADERS_66, - PeerId: peerId.H512(), - Data: blockHeadersPacket66Bytes(t, requestId, disconnectedHeaders), - }, - } - mockRequestResponse := requestResponseMock{ - requestId: requestId, - mockResponseInboundMessages: mockInboundMessages, - wantRequestPeerId: peerId, - wantRequestOriginNumber: 1, - wantRequestAmount: 3, - } - - test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) - test.mockSentryStreams(mockRequestResponse) - // setup expectation that peer should be penalized - mockExpectPenalizePeer(t, test.sentryClient, peerId) - test.run(func(ctx context.Context, t *testing.T) { - var errNonSequentialHeaderNumbers *ErrNonSequentialHeaderNumbers - err := tc.method(ctx, 1, 4, peerId, test) - require.ErrorAs(t, err, &errNonSequentialHeaderNumbers) - require.Equal(t, uint64(3), errNonSequentialHeaderNumbers.current) - require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.expected) - }) - }) } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 3, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errNonSequentialHeaderNumbers *ErrNonSequentialHeaderNumbers + headers, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 4, peerId) + require.ErrorAs(t, err, &errNonSequentialHeaderNumbers) + require.Equal(t, uint64(3), errNonSequentialHeaderNumbers.current) + require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.expected) + require.Nil(t, headers.Data) + }) } -func TestPenalizingFetcherShouldPenalizePeerWhenHeaderGtRequestedStart(t *testing.T) { +func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenErrNonSequentialHeaderHashes(t *testing.T) { t.Parallel() - for _, tc := range []struct { - name string - method func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error - }{ + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + disconnectedHeaders := newMockBlockHeaders(2) + disconnectedHeaders[0] = &types.Header{ + Number: big.NewInt(1), + GasLimit: 1234, // change a random value in order to change the header Hash + } + mockInboundMessages := []*sentry.InboundMessage{ { - name: "FetchHeaders", - method: func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error { - headers, err := test.penalizingFetcher.FetchHeaders(ctx, start, end, peerId) - require.Nil(test.t, headers.Data) - return err - }, + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId, disconnectedHeaders), }, + } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 2, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errNonSequentialHeaderHashes *ErrNonSequentialHeaderHashes + headers, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 3, peerId) + require.ErrorAs(t, err, &errNonSequentialHeaderHashes) + require.Equal(t, disconnectedHeaders[1].Hash(), errNonSequentialHeaderHashes.hash) + require.Equal(t, disconnectedHeaders[1].ParentHash, errNonSequentialHeaderHashes.parentHash) + require.Equal(t, disconnectedHeaders[0].Hash(), errNonSequentialHeaderHashes.prevHash) + require.Nil(t, headers.Data) + }) +} + +func TestPenalizingFetcherFetchHeadersShouldPenalizePeerWhenHeaderGtRequestedStart(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockBlockHeaders := newMockBlockHeaders(3) + incorrectOriginHeaders := mockBlockHeaders[1:] + mockInboundMessages := []*sentry.InboundMessage{ { - name: "FetchBlocks", - method: func(ctx context.Context, start, end uint64, peerId *PeerId, test *penalizingFetcherTest) error { - blocks, err := test.penalizingFetcher.FetchBlocks(ctx, start, end, peerId) - require.Nil(test.t, blocks.Data) - return err - }, + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + // response headers should be 2 and start at 1 - instead we start at 2 + Data: blockHeadersPacket66Bytes(t, requestId, incorrectOriginHeaders), }, - } { - t.Run(tc.name, func(t *testing.T) { - peerId := PeerIdFromUint64(1) - requestId := uint64(1234) - mockBlockHeaders := newMockBlockHeaders(3) - incorrectOriginHeaders := mockBlockHeaders[1:] - mockInboundMessages := []*sentry.InboundMessage{ - { - Id: sentry.MessageId_BLOCK_HEADERS_66, - PeerId: peerId.H512(), - // response headers should be 2 and start at 1 - instead we start at 2 - Data: blockHeadersPacket66Bytes(t, requestId, incorrectOriginHeaders), - }, - } - mockRequestResponse := requestResponseMock{ - requestId: requestId, - mockResponseInboundMessages: mockInboundMessages, - wantRequestPeerId: peerId, - wantRequestOriginNumber: 1, - wantRequestAmount: 2, - } - - test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) - test.mockSentryStreams(mockRequestResponse) - // setup expectation that peer should be penalized - mockExpectPenalizePeer(t, test.sentryClient, peerId) - test.run(func(ctx context.Context, t *testing.T) { - var errNonSequentialHeaderNumbers *ErrNonSequentialHeaderNumbers - err := tc.method(ctx, 1, 3, peerId, test) - require.ErrorAs(t, err, &errNonSequentialHeaderNumbers) - require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.current) - require.Equal(t, uint64(1), errNonSequentialHeaderNumbers.expected) - }) - }) } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 2, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errNonSequentialHeaderNumbers *ErrNonSequentialHeaderNumbers + headers, err := test.penalizingFetcher.FetchHeaders(ctx, 1, 3, peerId) + require.ErrorAs(t, err, &errNonSequentialHeaderNumbers) + require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.current) + require.Equal(t, uint64(1), errNonSequentialHeaderNumbers.expected) + require.Nil(t, headers.Data) + }) } func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrTooManyBodies(t *testing.T) { @@ -243,12 +214,49 @@ func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrTooManyBodies(t *t }) } -func TestPenalizingFetcherFetchBlocksShouldPenalizePeerWhenErrTooManyBodies(t *testing.T) { +func TestPenalizingFetcherFetchBodiesShouldPenalizePeerWhenErrMissingBodies(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + headers := []*types.Header{{Number: big.NewInt(1)}} + hashes := []common.Hash{headers[0].Hash()} + mockInboundMessages := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_BODIES_66, + PeerId: peerId.H512(), + Data: newMockBlockBodiesPacketBytes(t, requestId), + }, + } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestHashes: hashes, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errMissingBodies *ErrMissingBodies + bodies, err := test.penalizingFetcher.FetchBodies(ctx, headers, peerId) + require.ErrorAs(t, err, &errMissingBodies) + lowest, ok := errMissingBodies.LowestMissingBlockNum() + require.True(t, ok) + require.Equal(t, uint64(1), lowest) + require.Nil(t, bodies.Data) + }) +} + +func TestPenalizingFetcherFetchBlocksBackwardsByHashShouldPenalizePeerWhenErrTooManyBodies(t *testing.T) { t.Parallel() peerId := PeerIdFromUint64(1) requestId1 := uint64(1233) headers := newMockBlockHeaders(1) + hash := headers[0].Hash() mockInboundMessages1 := []*sentry.InboundMessage{ { Id: sentry.MessageId_BLOCK_HEADERS_66, @@ -260,8 +268,9 @@ func TestPenalizingFetcherFetchBlocksShouldPenalizePeerWhenErrTooManyBodies(t *t requestId: requestId1, mockResponseInboundMessages: mockInboundMessages1, wantRequestPeerId: peerId, - wantRequestOriginNumber: 1, + wantRequestOriginHash: hash, wantRequestAmount: 1, + wantReverse: true, } requestId2 := uint64(1234) mockInboundMessages2 := []*sentry.InboundMessage{ @@ -275,7 +284,7 @@ func TestPenalizingFetcherFetchBlocksShouldPenalizePeerWhenErrTooManyBodies(t *t requestId: requestId2, mockResponseInboundMessages: mockInboundMessages2, wantRequestPeerId: peerId, - wantRequestHashes: []common.Hash{headers[0].Hash()}, + wantRequestHashes: []common.Hash{hash}, } test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId1, requestId2)) @@ -284,11 +293,217 @@ func TestPenalizingFetcherFetchBlocksShouldPenalizePeerWhenErrTooManyBodies(t *t mockExpectPenalizePeer(t, test.sentryClient, peerId) test.run(func(ctx context.Context, t *testing.T) { var errTooManyBodies *ErrTooManyBodies - bodies, err := test.penalizingFetcher.FetchBlocks(ctx, 1, 2, peerId) + blocks, err := test.penalizingFetcher.FetchBlocksBackwardsByHash(ctx, hash, 1, peerId) require.ErrorAs(t, err, &errTooManyBodies) require.Equal(t, 1, errTooManyBodies.requested) require.Equal(t, 2, errTooManyBodies.received) - require.Nil(t, bodies.Data) + require.Nil(t, blocks.Data) + }) +} + +func TestPenalizingFetcherFetchBlocksBackwardsByHashShouldPenalizePeerWhenErrMissingBodies(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId1 := uint64(1233) + headers := newMockBlockHeaders(1) + hash := headers[0].Hash() + mockInboundMessages1 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, headers), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: hash, + wantRequestAmount: 1, + wantReverse: true, + } + requestId2 := uint64(1234) + mockInboundMessages2 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_BODIES_66, + PeerId: peerId.H512(), + Data: newMockBlockBodiesPacketBytes(t, requestId2), + }, + } + mockRequestResponse2 := requestResponseMock{ + requestId: requestId2, + mockResponseInboundMessages: mockInboundMessages2, + wantRequestPeerId: peerId, + wantRequestHashes: []common.Hash{hash}, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId1, requestId2)) + test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errMissingBodies *ErrMissingBodies + blocks, err := test.penalizingFetcher.FetchBlocksBackwardsByHash(ctx, hash, 1, peerId) + require.ErrorAs(t, err, &errMissingBodies) + lowest, ok := errMissingBodies.LowestMissingBlockNum() + require.True(t, ok) + require.Equal(t, uint64(1), lowest) + require.Nil(t, blocks.Data) + }) +} + +func TestPenalizingFetcherFetchBlocksBackwardsByHashShouldPenalizePeerWhenErrUnexpectedHeaderHash(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId1 := uint64(1233) + headers := newMockBlockHeaders(2) + hash := headers[0].Hash() + mockInboundMessages1 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, headers[1:]), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: hash, + wantRequestAmount: 1, + wantReverse: true, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId1)) + test.mockSentryStreams(mockRequestResponse1) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errUnexpectedHeaderHash *ErrUnexpectedHeaderHash + blocks, err := test.penalizingFetcher.FetchBlocksBackwardsByHash(ctx, hash, 1, peerId) + require.ErrorAs(t, err, &errUnexpectedHeaderHash) + require.Equal(t, hash, errUnexpectedHeaderHash.requested) + require.Equal(t, headers[1].Hash(), errUnexpectedHeaderHash.received) + require.Nil(t, blocks.Data) + }) +} + +func TestPenalizingFetcherFetchBlocksBackwardsByHashShouldPenalizePeerWhenErrTooManyHeaders(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId1 := uint64(1233) + headers := newMockBlockHeaders(2) + hash := headers[0].Hash() + mockInboundMessages1 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, headers), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: hash, + wantRequestAmount: 1, + wantReverse: true, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId1)) + test.mockSentryStreams(mockRequestResponse1) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errTooManyHeaders *ErrTooManyHeaders + blocks, err := test.penalizingFetcher.FetchBlocksBackwardsByHash(ctx, hash, 1, peerId) + require.ErrorAs(t, err, &errTooManyHeaders) + require.Equal(t, 1, errTooManyHeaders.requested) + require.Equal(t, 2, errTooManyHeaders.received) + require.Nil(t, blocks.Data) + }) +} + +func TestPenalizingFetcherFetchBlocksBackwardsByHashShouldPenalizePeerWhenErrNonSequentialHeaderNumbers(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId1 := uint64(1233) + headers := newMockBlockHeaders(3) + hash := headers[2].Hash() + mockInboundMessages1 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, []*types.Header{headers[2], headers[0]}), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: hash, + wantRequestAmount: 2, + wantReverse: true, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId1)) + test.mockSentryStreams(mockRequestResponse1) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errNonSequentialHeaderNumbers *ErrNonSequentialHeaderNumbers + blocks, err := test.penalizingFetcher.FetchBlocksBackwardsByHash(ctx, hash, 2, peerId) + require.ErrorAs(t, err, &errNonSequentialHeaderNumbers) + require.Equal(t, uint64(1), errNonSequentialHeaderNumbers.current) + require.Equal(t, uint64(2), errNonSequentialHeaderNumbers.expected) + require.Nil(t, blocks.Data) + }) +} + +func TestPenalizingFetcherFetchBlocksBackwardsByHashShouldPenalizePeerWhenErrNonSequentialHeaderHashes(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId1 := uint64(1233) + headers := newMockBlockHeaders(2) + hash := headers[1].Hash() + incorrectHeader := &types.Header{ + Number: big.NewInt(1), + GasLimit: 1234, + } + incorrectHeader.Hash() + mockInboundMessages1 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId1, []*types.Header{headers[1], incorrectHeader}), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginHash: hash, + wantRequestAmount: 2, + wantReverse: true, + } + + test := newPenalizingFetcherTest(t, newMockRequestGenerator(requestId1)) + test.mockSentryStreams(mockRequestResponse1) + // setup expectation that peer should be penalized + mockExpectPenalizePeer(t, test.sentryClient, peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errNonSequentialHeaderHashes *ErrNonSequentialHeaderHashes + blocks, err := test.penalizingFetcher.FetchBlocksBackwardsByHash(ctx, hash, 2, peerId) + require.ErrorAs(t, err, &errNonSequentialHeaderHashes) + require.Equal(t, headers[1].Hash(), errNonSequentialHeaderHashes.hash) + require.Equal(t, headers[0].Hash(), errNonSequentialHeaderHashes.parentHash) + require.Equal(t, incorrectHeader.Hash(), errNonSequentialHeaderHashes.prevHash) + require.Nil(t, blocks.Data) }) } diff --git a/polygon/p2p/fetcher_tracking.go b/polygon/p2p/fetcher_tracking.go index bdf021e7d50..be0dda54346 100644 --- a/polygon/p2p/fetcher_tracking.go +++ b/polygon/p2p/fetcher_tracking.go @@ -39,8 +39,14 @@ type trackingFetcher struct { peerTracker PeerTracker } -func (tf *trackingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { - res, err := tf.Fetcher.FetchHeaders(ctx, start, end, peerId) +func (tf *trackingFetcher) FetchHeaders( + ctx context.Context, + start uint64, + end uint64, + peerId *PeerId, + opts ...FetcherOption, +) (FetcherResponse[[]*types.Header], error) { + res, err := tf.Fetcher.FetchHeaders(ctx, start, end, peerId, opts...) if err != nil { var errIncompleteHeaders *ErrIncompleteHeaders if errors.As(err, &errIncompleteHeaders) { @@ -56,8 +62,13 @@ func (tf *trackingFetcher) FetchHeaders(ctx context.Context, start uint64, end u return res, nil } -func (tf *trackingFetcher) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) { - bodies, err := tf.Fetcher.FetchBodies(ctx, headers, peerId) +func (tf *trackingFetcher) FetchBodies( + ctx context.Context, + headers []*types.Header, + peerId *PeerId, + opts ...FetcherOption, +) (FetcherResponse[[]*types.Body], error) { + bodies, err := tf.Fetcher.FetchBodies(ctx, headers, peerId, opts...) if err != nil { var errMissingBodies *ErrMissingBodies if errors.As(err, &errMissingBodies) { diff --git a/polygon/p2p/service.go b/polygon/p2p/service.go index a68c79e9e51..add6f3915d0 100644 --- a/polygon/p2p/service.go +++ b/polygon/p2p/service.go @@ -19,7 +19,6 @@ package p2p import ( "context" "math/rand" - "time" "golang.org/x/sync/errgroup" @@ -44,13 +43,7 @@ func NewService( sentryClient sentryproto.SentryClient, statusDataFactory sentry.StatusDataFactory, ) Service { - fetcherConfig := FetcherConfig{ - responseTimeout: 5 * time.Second, - retryBackOff: 10 * time.Second, - maxRetries: 2, - } - - return newService(maxPeers, fetcherConfig, logger, sentryClient, statusDataFactory, rand.Uint64) + return newService(maxPeers, defaultFetcherConfig, logger, sentryClient, statusDataFactory, rand.Uint64) } func newService( @@ -65,7 +58,7 @@ func newService( messageListener := NewMessageListener(logger, sentryClient, statusDataFactory, peerPenalizer) peerTracker := NewPeerTracker(logger, sentryClient, messageListener) messageSender := NewMessageSender(sentryClient) - fetcher := NewFetcher(fetcherConfig, messageListener, messageSender, requestIdGenerator) + fetcher := NewFetcher(logger, fetcherConfig, messageListener, messageSender, requestIdGenerator) fetcher = NewPenalizingFetcher(logger, fetcher, peerPenalizer) fetcher = NewTrackingFetcher(fetcher, peerTracker) return &service{ diff --git a/polygon/p2p/service_mock.go b/polygon/p2p/service_mock.go index cf0dd00bbc5..39b6f7e7ff4 100644 --- a/polygon/p2p/service_mock.go +++ b/polygon/p2p/service_mock.go @@ -13,6 +13,7 @@ import ( context "context" reflect "reflect" + common "github.com/erigontech/erigon-lib/common" sentryproto "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" types "github.com/erigontech/erigon/core/types" eth "github.com/erigontech/erigon/eth/protocols/eth" @@ -115,58 +116,68 @@ func (c *MockServiceBlockNumPresentCall) DoAndReturn(f func(*PeerId, uint64)) *M return c } -// FetchBlocks mocks base method. -func (m *MockService) FetchBlocks(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Block], error) { +// FetchBlocksBackwardsByHash mocks base method. +func (m *MockService) FetchBlocksBackwardsByHash(ctx context.Context, hash common.Hash, amount uint64, peerId *PeerId, opts ...FetcherOption) (FetcherResponse[[]*types.Block], error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchBlocks", ctx, start, end, peerId) + varargs := []any{ctx, hash, amount, peerId} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "FetchBlocksBackwardsByHash", varargs...) ret0, _ := ret[0].(FetcherResponse[[]*types.Block]) ret1, _ := ret[1].(error) return ret0, ret1 } -// FetchBlocks indicates an expected call of FetchBlocks. -func (mr *MockServiceMockRecorder) FetchBlocks(ctx, start, end, peerId any) *MockServiceFetchBlocksCall { +// FetchBlocksBackwardsByHash indicates an expected call of FetchBlocksBackwardsByHash. +func (mr *MockServiceMockRecorder) FetchBlocksBackwardsByHash(ctx, hash, amount, peerId any, opts ...any) *MockServiceFetchBlocksBackwardsByHashCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBlocks", reflect.TypeOf((*MockService)(nil).FetchBlocks), ctx, start, end, peerId) - return &MockServiceFetchBlocksCall{Call: call} + varargs := append([]any{ctx, hash, amount, peerId}, opts...) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBlocksBackwardsByHash", reflect.TypeOf((*MockService)(nil).FetchBlocksBackwardsByHash), varargs...) + return &MockServiceFetchBlocksBackwardsByHashCall{Call: call} } -// MockServiceFetchBlocksCall wrap *gomock.Call -type MockServiceFetchBlocksCall struct { +// MockServiceFetchBlocksBackwardsByHashCall wrap *gomock.Call +type MockServiceFetchBlocksBackwardsByHashCall struct { *gomock.Call } // Return rewrite *gomock.Call.Return -func (c *MockServiceFetchBlocksCall) Return(arg0 FetcherResponse[[]*types.Block], arg1 error) *MockServiceFetchBlocksCall { +func (c *MockServiceFetchBlocksBackwardsByHashCall) Return(arg0 FetcherResponse[[]*types.Block], arg1 error) *MockServiceFetchBlocksBackwardsByHashCall { c.Call = c.Call.Return(arg0, arg1) return c } // Do rewrite *gomock.Call.Do -func (c *MockServiceFetchBlocksCall) Do(f func(context.Context, uint64, uint64, *PeerId) (FetcherResponse[[]*types.Block], error)) *MockServiceFetchBlocksCall { +func (c *MockServiceFetchBlocksBackwardsByHashCall) Do(f func(context.Context, common.Hash, uint64, *PeerId, ...FetcherOption) (FetcherResponse[[]*types.Block], error)) *MockServiceFetchBlocksBackwardsByHashCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockServiceFetchBlocksCall) DoAndReturn(f func(context.Context, uint64, uint64, *PeerId) (FetcherResponse[[]*types.Block], error)) *MockServiceFetchBlocksCall { +func (c *MockServiceFetchBlocksBackwardsByHashCall) DoAndReturn(f func(context.Context, common.Hash, uint64, *PeerId, ...FetcherOption) (FetcherResponse[[]*types.Block], error)) *MockServiceFetchBlocksBackwardsByHashCall { c.Call = c.Call.DoAndReturn(f) return c } // FetchBodies mocks base method. -func (m *MockService) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId) (FetcherResponse[[]*types.Body], error) { +func (m *MockService) FetchBodies(ctx context.Context, headers []*types.Header, peerId *PeerId, opts ...FetcherOption) (FetcherResponse[[]*types.Body], error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchBodies", ctx, headers, peerId) + varargs := []any{ctx, headers, peerId} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "FetchBodies", varargs...) ret0, _ := ret[0].(FetcherResponse[[]*types.Body]) ret1, _ := ret[1].(error) return ret0, ret1 } // FetchBodies indicates an expected call of FetchBodies. -func (mr *MockServiceMockRecorder) FetchBodies(ctx, headers, peerId any) *MockServiceFetchBodiesCall { +func (mr *MockServiceMockRecorder) FetchBodies(ctx, headers, peerId any, opts ...any) *MockServiceFetchBodiesCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBodies", reflect.TypeOf((*MockService)(nil).FetchBodies), ctx, headers, peerId) + varargs := append([]any{ctx, headers, peerId}, opts...) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBodies", reflect.TypeOf((*MockService)(nil).FetchBodies), varargs...) return &MockServiceFetchBodiesCall{Call: call} } @@ -182,30 +193,35 @@ func (c *MockServiceFetchBodiesCall) Return(arg0 FetcherResponse[[]*types.Body], } // Do rewrite *gomock.Call.Do -func (c *MockServiceFetchBodiesCall) Do(f func(context.Context, []*types.Header, *PeerId) (FetcherResponse[[]*types.Body], error)) *MockServiceFetchBodiesCall { +func (c *MockServiceFetchBodiesCall) Do(f func(context.Context, []*types.Header, *PeerId, ...FetcherOption) (FetcherResponse[[]*types.Body], error)) *MockServiceFetchBodiesCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockServiceFetchBodiesCall) DoAndReturn(f func(context.Context, []*types.Header, *PeerId) (FetcherResponse[[]*types.Body], error)) *MockServiceFetchBodiesCall { +func (c *MockServiceFetchBodiesCall) DoAndReturn(f func(context.Context, []*types.Header, *PeerId, ...FetcherOption) (FetcherResponse[[]*types.Body], error)) *MockServiceFetchBodiesCall { c.Call = c.Call.DoAndReturn(f) return c } // FetchHeaders mocks base method. -func (m *MockService) FetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId) (FetcherResponse[[]*types.Header], error) { +func (m *MockService) FetchHeaders(ctx context.Context, start, end uint64, peerId *PeerId, opts ...FetcherOption) (FetcherResponse[[]*types.Header], error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchHeaders", ctx, start, end, peerId) + varargs := []any{ctx, start, end, peerId} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "FetchHeaders", varargs...) ret0, _ := ret[0].(FetcherResponse[[]*types.Header]) ret1, _ := ret[1].(error) return ret0, ret1 } // FetchHeaders indicates an expected call of FetchHeaders. -func (mr *MockServiceMockRecorder) FetchHeaders(ctx, start, end, peerId any) *MockServiceFetchHeadersCall { +func (mr *MockServiceMockRecorder) FetchHeaders(ctx, start, end, peerId any, opts ...any) *MockServiceFetchHeadersCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchHeaders", reflect.TypeOf((*MockService)(nil).FetchHeaders), ctx, start, end, peerId) + varargs := append([]any{ctx, start, end, peerId}, opts...) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchHeaders", reflect.TypeOf((*MockService)(nil).FetchHeaders), varargs...) return &MockServiceFetchHeadersCall{Call: call} } @@ -221,13 +237,13 @@ func (c *MockServiceFetchHeadersCall) Return(arg0 FetcherResponse[[]*types.Heade } // Do rewrite *gomock.Call.Do -func (c *MockServiceFetchHeadersCall) Do(f func(context.Context, uint64, uint64, *PeerId) (FetcherResponse[[]*types.Header], error)) *MockServiceFetchHeadersCall { +func (c *MockServiceFetchHeadersCall) Do(f func(context.Context, uint64, uint64, *PeerId, ...FetcherOption) (FetcherResponse[[]*types.Header], error)) *MockServiceFetchHeadersCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockServiceFetchHeadersCall) DoAndReturn(f func(context.Context, uint64, uint64, *PeerId) (FetcherResponse[[]*types.Header], error)) *MockServiceFetchHeadersCall { +func (c *MockServiceFetchHeadersCall) DoAndReturn(f func(context.Context, uint64, uint64, *PeerId, ...FetcherOption) (FetcherResponse[[]*types.Header], error)) *MockServiceFetchHeadersCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/polygon/sync/block_downloader_test.go b/polygon/sync/block_downloader_test.go index 4ebd7c327fc..83cb655f102 100644 --- a/polygon/sync/block_downloader_test.go +++ b/polygon/sync/block_downloader_test.go @@ -171,13 +171,26 @@ func (hdt blockDownloaderTest) fakeMilestones(count int) heimdall.Milestones { return milestones } -type fetchHeadersMock func(ctx context.Context, start uint64, end uint64, peerId *p2p.PeerId) (p2p.FetcherResponse[[]*types.Header], error) +type fetchHeadersMock func( + ctx context.Context, + start uint64, + end uint64, + peerId *p2p.PeerId, + opts ...p2p.FetcherOption, +) (p2p.FetcherResponse[[]*types.Header], error) func (hdt blockDownloaderTest) defaultFetchHeadersMock() fetchHeadersMock { // p2p.Service.FetchHeaders interface is using [start, end) so we stick to that - return func(ctx context.Context, start uint64, end uint64, _ *p2p.PeerId) (p2p.FetcherResponse[[]*types.Header], error) { + return func( + ctx context.Context, + start uint64, + end uint64, + peerId *p2p.PeerId, + opts ...p2p.FetcherOption, + ) (p2p.FetcherResponse[[]*types.Header], error) { if start >= end { - return p2p.FetcherResponse[[]*types.Header]{Data: nil, TotalSize: 0}, fmt.Errorf("unexpected start >= end in test: start=%d, end=%d", start, end) + err := fmt.Errorf("unexpected start >= end in test: start=%d, end=%d", start, end) + return p2p.FetcherResponse[[]*types.Header]{}, err } res := make([]*types.Header, end-start) @@ -194,10 +207,20 @@ func (hdt blockDownloaderTest) defaultFetchHeadersMock() fetchHeadersMock { } } -type fetchBodiesMock func(context.Context, []*types.Header, *p2p.PeerId) (p2p.FetcherResponse[[]*types.Body], error) +type fetchBodiesMock func( + ctx context.Context, + headers []*types.Header, + peerId *p2p.PeerId, + opts ...p2p.FetcherOption, +) (p2p.FetcherResponse[[]*types.Body], error) func (hdt blockDownloaderTest) defaultFetchBodiesMock() fetchBodiesMock { - return func(ctx context.Context, headers []*types.Header, _ *p2p.PeerId) (p2p.FetcherResponse[[]*types.Body], error) { + return func( + ctx context.Context, + headers []*types.Header, + peerId *p2p.PeerId, + opts ...p2p.FetcherOption, + ) (p2p.FetcherResponse[[]*types.Body], error) { bodies := make([]*types.Body, len(headers)) size := 0 @@ -558,13 +581,19 @@ func TestBlockDownloaderDownloadBlocksWhenMissingBodiesThenPenalizePeerAndReDown Times(1) test.p2pService.EXPECT(). FetchBodies(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(ctx context.Context, headers []*types.Header, peerId *p2p.PeerId) (p2p.FetcherResponse[[]*types.Body], error) { - if peerId.Equal(p2p.PeerIdFromUint64(2)) { - return p2p.FetcherResponse[[]*types.Body]{Data: nil, TotalSize: 0}, p2p.NewErrMissingBodies(headers) - } - - return test.defaultFetchBodiesMock()(ctx, headers, peerId) - }). + DoAndReturn( + func( + ctx context.Context, + headers []*types.Header, + peerId *p2p.PeerId, + opts ...p2p.FetcherOption, + ) (p2p.FetcherResponse[[]*types.Body], error) { + if peerId.Equal(p2p.PeerIdFromUint64(2)) { + return p2p.FetcherResponse[[]*types.Body]{}, p2p.NewErrMissingBodies(headers) + } + + return test.defaultFetchBodiesMock()(ctx, headers, peerId) + }). // request checkpoints 1,2,3 in parallel (we have 3 peers) // -> peer 2 returns missing bodies error, checkpoints 1 and 3 fetch succeeds // requests 2,3 in parallel (now we have only 2 peers) diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index 55e114c20df..38ce264b848 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -191,15 +191,17 @@ func (s *Sync) applyNewBlockOnTip( ) error { newBlockHeader := event.NewBlock.Header() newBlockHeaderNum := newBlockHeader.Number.Uint64() + newBlockHeaderHash := newBlockHeader.Hash() rootNum := ccBuilder.Root().Number.Uint64() - if newBlockHeaderNum <= rootNum || ccBuilder.ContainsHash(newBlockHeader.Hash()) { + if newBlockHeaderNum <= rootNum || ccBuilder.ContainsHash(newBlockHeaderHash) { return nil } s.logger.Debug( syncLogPrefix("applying new block event"), "blockNum", newBlockHeaderNum, - "blockHash", newBlockHeader.Hash(), + "blockHash", newBlockHeaderHash, + "source", event.Source, "parentBlockHash", newBlockHeader.ParentHash, ) @@ -207,7 +209,17 @@ func (s *Sync) applyNewBlockOnTip( if ccBuilder.ContainsHash(newBlockHeader.ParentHash) { blockChain = []*types.Block{event.NewBlock} } else { - blocks, err := s.p2pService.FetchBlocks(ctx, rootNum, newBlockHeaderNum+1, event.PeerId) + amount := newBlockHeaderNum - rootNum + 1 + s.logger.Debug( + syncLogPrefix("block parent hash not in ccb, fetching blocks backwards to root"), + "rootNum", rootNum, + "blockNum", newBlockHeaderNum, + "blockHash", newBlockHeaderHash, + "amount", amount, + ) + + opts := []p2p.FetcherOption{p2p.WithMaxRetries(0), p2p.WithResponseTimeout(time.Second)} + blocks, err := s.p2pService.FetchBlocksBackwardsByHash(ctx, newBlockHeaderHash, amount, event.PeerId, opts...) if err != nil { if s.ignoreFetchBlocksErrOnTipEvent(err) { s.logger.Debug( @@ -286,25 +298,26 @@ func (s *Sync) applyNewBlockHashesOnTip( event EventNewBlockHashes, ccBuilder CanonicalChainBuilder, ) error { - for _, headerHashNum := range event.NewBlockHashes { - if (headerHashNum.Number <= ccBuilder.Root().Number.Uint64()) || ccBuilder.ContainsHash(headerHashNum.Hash) { + for _, hashOrNum := range event.NewBlockHashes { + if (hashOrNum.Number <= ccBuilder.Root().Number.Uint64()) || ccBuilder.ContainsHash(hashOrNum.Hash) { continue } s.logger.Debug( - syncLogPrefix("applying new block hash event"), - "blockNum", headerHashNum.Number, - "blockHash", headerHashNum.Hash, + syncLogPrefix("applying new block hash"), + "blockNum", hashOrNum.Number, + "blockHash", hashOrNum.Hash, ) - newBlocks, err := s.p2pService.FetchBlocks(ctx, headerHashNum.Number, headerHashNum.Number+1, event.PeerId) + fetchOpts := []p2p.FetcherOption{p2p.WithMaxRetries(0), p2p.WithResponseTimeout(time.Second)} + newBlocks, err := s.p2pService.FetchBlocksBackwardsByHash(ctx, hashOrNum.Hash, 1, event.PeerId, fetchOpts...) if err != nil { if s.ignoreFetchBlocksErrOnTipEvent(err) { s.logger.Debug( syncLogPrefix("applyNewBlockHashesOnTip: failed to fetch complete blocks, ignoring event"), "err", err, "peerId", event.PeerId, - "lastBlockNum", headerHashNum.Number, + "lastBlockNum", hashOrNum.Number, ) continue @@ -316,6 +329,7 @@ func (s *Sync) applyNewBlockHashesOnTip( newBlockEvent := EventNewBlock{ NewBlock: newBlocks.Data[0], PeerId: event.PeerId, + Source: EventSourceP2PNewBlockHashes, } err = s.applyNewBlockOnTip(ctx, newBlockEvent, ccBuilder) @@ -439,6 +453,9 @@ func (s *Sync) sync(ctx context.Context, tip *types.Header, tipDownloader tipDow func (s *Sync) ignoreFetchBlocksErrOnTipEvent(err error) bool { return errors.Is(err, &p2p.ErrIncompleteHeaders{}) || errors.Is(err, &p2p.ErrNonSequentialHeaderNumbers{}) || + errors.Is(err, &p2p.ErrNonSequentialHeaderHashes{}) || + errors.Is(err, &p2p.ErrMissingHeaderHash{}) || + errors.Is(err, &p2p.ErrUnexpectedHeaderHash{}) || errors.Is(err, &p2p.ErrTooManyHeaders{}) || errors.Is(err, &p2p.ErrMissingBodies{}) || errors.Is(err, &p2p.ErrTooManyBodies{}) || diff --git a/polygon/sync/tip_events.go b/polygon/sync/tip_events.go index 4713d6dccd2..d27d94b8060 100644 --- a/polygon/sync/tip_events.go +++ b/polygon/sync/tip_events.go @@ -31,9 +31,15 @@ const EventTypeNewBlock = "new-block" const EventTypeNewBlockHashes = "new-block-hashes" const EventTypeNewMilestone = "new-milestone" +type EventSource string + +var EventSourceP2PNewBlockHashes EventSource = "p2p-new-block-hashes-source" +var EventSourceP2PNewBlock EventSource = "p2p-new-block-source" + type EventNewBlock struct { NewBlock *types.Block PeerId *p2p.PeerId + Source EventSource } type EventNewBlockHashes struct { @@ -116,6 +122,7 @@ func (te *TipEvents) Run(ctx context.Context) error { newBlock: EventNewBlock{ NewBlock: message.Decoded.Block, PeerId: message.PeerId, + Source: EventSourceP2PNewBlock, }, }) })