From 3f96726840460e3e885802ef29a44a53a9a4baa6 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Mon, 26 Feb 2024 23:07:01 +0530 Subject: [PATCH 01/10] polygon/sync: use header downloader interface --- polygon/sync/header_downloader.go | 19 ++++++++++++------- polygon/sync/header_downloader_test.go | 2 +- polygon/sync/sync.go | 9 ++++++--- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/polygon/sync/header_downloader.go b/polygon/sync/header_downloader.go index 8f980b80958..9996a7a7856 100644 --- a/polygon/sync/header_downloader.go +++ b/polygon/sync/header_downloader.go @@ -25,14 +25,19 @@ type HeadersWriter interface { PutHeaders(ctx context.Context, headers []*types.Header) error } +type HeaderDownloader interface { + DownloadUsingCheckpoints(ctx context.Context, start uint64) error + DownloadUsingMilestones(ctx context.Context, start uint64) error +} + func NewHeaderDownloader( logger log.Logger, p2pService p2p.Service, heimdall heimdall.HeimdallNoStore, verify AccumulatedHeadersVerifier, headersWriter HeadersWriter, -) *HeaderDownloader { - return &HeaderDownloader{ +) HeaderDownloader { + return &headerDownloader{ logger: logger, p2pService: p2pService, heimdall: heimdall, @@ -42,7 +47,7 @@ func NewHeaderDownloader( } } -type HeaderDownloader struct { +type headerDownloader struct { logger log.Logger p2pService p2p.Service heimdall heimdall.HeimdallNoStore @@ -51,7 +56,7 @@ type HeaderDownloader struct { headersWriter HeadersWriter } -func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, start uint64) error { +func (hd *headerDownloader) DownloadUsingCheckpoints(ctx context.Context, start uint64) error { waypoints, err := hd.heimdall.FetchCheckpointsFromBlock(ctx, start) if err != nil { return err @@ -65,7 +70,7 @@ func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, start return nil } -func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, start uint64) error { +func (hd *headerDownloader) DownloadUsingMilestones(ctx context.Context, start uint64) error { waypoints, err := hd.heimdall.FetchMilestonesFromBlock(ctx, start) if err != nil { return err @@ -79,7 +84,7 @@ func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, start u return nil } -func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, waypoints heimdall.Waypoints) error { +func (hd *headerDownloader) downloadUsingWaypoints(ctx context.Context, waypoints heimdall.Waypoints) error { // waypoint rootHash->[headers part of waypoint] waypointHeadersMemo, err := lru.New[common.Hash, []*types.Header](hd.p2pService.MaxPeers()) if err != nil { @@ -216,7 +221,7 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, waypoint } // choosePeers assumes peers are sorted in ascending order based on block num -func (hd *HeaderDownloader) choosePeers(peers p2p.PeersSyncProgress, waypoints heimdall.Waypoints) p2p.PeersSyncProgress { +func (hd *headerDownloader) choosePeers(peers p2p.PeersSyncProgress, waypoints heimdall.Waypoints) p2p.PeersSyncProgress { var peersIdx int chosenPeers := make(p2p.PeersSyncProgress, 0, len(peers)) for _, waypoint := range waypoints { diff --git a/polygon/sync/header_downloader_test.go b/polygon/sync/header_downloader_test.go index 3ec08940654..bb939c07a40 100644 --- a/polygon/sync/header_downloader_test.go +++ b/polygon/sync/header_downloader_test.go @@ -63,7 +63,7 @@ func (opts headerDownloaderTestOpts) getOrCreateDefaultHeaderVerifier() Accumula type headerDownloaderTest struct { heimdall *heimdall.MockHeimdallNoStore p2pService *p2p.MockService - headerDownloader *HeaderDownloader + headerDownloader HeaderDownloader headersWriter *MockHeadersWriter } diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index b5cc51612d6..f8e6d165a9f 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -14,7 +14,7 @@ type Sync struct { execution ExecutionClient verify AccumulatedHeadersVerifier p2pService p2p.Service - downloader *HeaderDownloader + downloader HeaderDownloader ccBuilderFactory func(root *types.Header) CanonicalChainBuilder events chan Event logger log.Logger @@ -25,7 +25,7 @@ func NewSync( execution ExecutionClient, verify AccumulatedHeadersVerifier, p2pService p2p.Service, - downloader *HeaderDownloader, + downloader HeaderDownloader, ccBuilderFactory func(root *types.Header) CanonicalChainBuilder, events chan Event, logger log.Logger, @@ -77,7 +77,10 @@ func (s *Sync) onMilestoneEvent( } } - s.logger.Debug("sync.Sync.onMilestoneEvent: local chain tip does not match the milestone, unwinding to the previous verified milestone", "err", err) + s.logger.Debug( + "sync.Sync.onMilestoneEvent: local chain tip does not match the milestone, unwinding to the previous verified milestone", + "err", err, + ) // the milestone doesn't correspond to the tip of the chain // unwind to the previous verified milestone From e529f2cd4f2da2563bbe004892a5635596b388a6 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Mon, 26 Feb 2024 23:07:01 +0530 Subject: [PATCH 02/10] polygon/sync: use header downloader interface --- polygon/sync/header_downloader.go | 19 ++++++++++++------- polygon/sync/header_downloader_test.go | 2 +- polygon/sync/sync.go | 9 ++++++--- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/polygon/sync/header_downloader.go b/polygon/sync/header_downloader.go index 4e959b170c2..93d0d70c8fc 100644 --- a/polygon/sync/header_downloader.go +++ b/polygon/sync/header_downloader.go @@ -27,13 +27,18 @@ type HeadersWriter interface { PutHeaders(ctx context.Context, headers []*types.Header) error } +type HeaderDownloader interface { + DownloadUsingCheckpoints(ctx context.Context, start uint64) error + DownloadUsingMilestones(ctx context.Context, start uint64) error +} + func NewHeaderDownloader( logger log.Logger, p2pService p2p.Service, heimdall heimdall.HeimdallNoStore, headersVerifier AccumulatedHeadersVerifier, headersWriter HeadersWriter, -) *HeaderDownloader { +) HeaderDownloader { return newHeaderDownloader( logger, p2pService, @@ -51,8 +56,8 @@ func newHeaderDownloader( headersVerifier AccumulatedHeadersVerifier, headersWriter HeadersWriter, notEnoughPeersBackOffDuration time.Duration, -) *HeaderDownloader { - return &HeaderDownloader{ +) *headerDownloader { + return &headerDownloader{ logger: logger, p2pService: p2pService, heimdall: heimdall, @@ -62,7 +67,7 @@ func newHeaderDownloader( } } -type HeaderDownloader struct { +type headerDownloader struct { logger log.Logger p2pService p2p.Service heimdall heimdall.HeimdallNoStore @@ -71,7 +76,7 @@ type HeaderDownloader struct { notEnoughPeersBackOffDuration time.Duration } -func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, start uint64) error { +func (hd *headerDownloader) DownloadUsingCheckpoints(ctx context.Context, start uint64) error { waypoints, err := hd.heimdall.FetchCheckpointsFromBlock(ctx, start) if err != nil { return err @@ -85,7 +90,7 @@ func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, start return nil } -func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, start uint64) error { +func (hd *headerDownloader) DownloadUsingMilestones(ctx context.Context, start uint64) error { waypoints, err := hd.heimdall.FetchMilestonesFromBlock(ctx, start) if err != nil { return err @@ -99,7 +104,7 @@ func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, start u return nil } -func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, waypoints heimdall.Waypoints) error { +func (hd *headerDownloader) downloadUsingWaypoints(ctx context.Context, waypoints heimdall.Waypoints) error { // waypoint rootHash->[headers part of waypoint] waypointHeadersMemo, err := lru.New[common.Hash, []*types.Header](hd.p2pService.MaxPeers()) if err != nil { diff --git a/polygon/sync/header_downloader_test.go b/polygon/sync/header_downloader_test.go index 1013012fd8e..e5029a21e5b 100644 --- a/polygon/sync/header_downloader_test.go +++ b/polygon/sync/header_downloader_test.go @@ -64,7 +64,7 @@ func (opts headerDownloaderTestOpts) getOrCreateDefaultHeaderVerifier() Accumula type headerDownloaderTest struct { heimdall *heimdall.MockHeimdallNoStore p2pService *p2p.MockService - headerDownloader *HeaderDownloader + headerDownloader HeaderDownloader headersWriter *MockHeadersWriter } diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index b5cc51612d6..f8e6d165a9f 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -14,7 +14,7 @@ type Sync struct { execution ExecutionClient verify AccumulatedHeadersVerifier p2pService p2p.Service - downloader *HeaderDownloader + downloader HeaderDownloader ccBuilderFactory func(root *types.Header) CanonicalChainBuilder events chan Event logger log.Logger @@ -25,7 +25,7 @@ func NewSync( execution ExecutionClient, verify AccumulatedHeadersVerifier, p2pService p2p.Service, - downloader *HeaderDownloader, + downloader HeaderDownloader, ccBuilderFactory func(root *types.Header) CanonicalChainBuilder, events chan Event, logger log.Logger, @@ -77,7 +77,10 @@ func (s *Sync) onMilestoneEvent( } } - s.logger.Debug("sync.Sync.onMilestoneEvent: local chain tip does not match the milestone, unwinding to the previous verified milestone", "err", err) + s.logger.Debug( + "sync.Sync.onMilestoneEvent: local chain tip does not match the milestone, unwinding to the previous verified milestone", + "err", err, + ) // the milestone doesn't correspond to the tip of the chain // unwind to the previous verified milestone From c574e056e5baafc1e397d3933565ed1bde36e7ba Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Tue, 27 Feb 2024 15:53:26 +0530 Subject: [PATCH 03/10] fix windows test --- polygon/p2p/peer_sync_progress.go | 2 +- polygon/p2p/peer_sync_progress_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/polygon/p2p/peer_sync_progress.go b/polygon/p2p/peer_sync_progress.go index e1551c1b764..239b892e2c0 100644 --- a/polygon/p2p/peer_sync_progress.go +++ b/polygon/p2p/peer_sync_progress.go @@ -13,7 +13,7 @@ type peerSyncProgress struct { func (psp *peerSyncProgress) blockNumPresent(blockNum uint64) { if psp.minMissingBlockNum <= blockNum { psp.minMissingBlockNum = 0 - psp.minMissingBlockNumTs = time.Unix(0, 0) + psp.minMissingBlockNumTs = time.Time{} } } diff --git a/polygon/p2p/peer_sync_progress_test.go b/polygon/p2p/peer_sync_progress_test.go index c620779675f..2151c289d3d 100644 --- a/polygon/p2p/peer_sync_progress_test.go +++ b/polygon/p2p/peer_sync_progress_test.go @@ -2,6 +2,7 @@ package p2p import ( "testing" + "time" "github.com/stretchr/testify/require" ) @@ -25,7 +26,7 @@ func TestPeerMayHaveBlockNum(t *testing.T) { require.False(t, psp.peerMayHaveBlockNum(1_000)) // expired timestamp - psp.minMissingBlockNumTs = psp.minMissingBlockNumTs.Add(-missingBlockNumExpiry) + psp.minMissingBlockNumTs = psp.minMissingBlockNumTs.Add(-missingBlockNumExpiry).Add(-time.Second) require.True(t, psp.peerMayHaveBlockNum(0)) require.True(t, psp.peerMayHaveBlockNum(200)) require.True(t, psp.peerMayHaveBlockNum(500)) From 178a06968d4447ab9890b00d4f81d2b14672a4b7 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Tue, 27 Feb 2024 21:05:50 +0530 Subject: [PATCH 04/10] add more validations in fetch headers --- polygon/p2p/fetcher.go | 141 +++++++++++++++++++++++-- polygon/p2p/fetcher_tracking.go | 2 +- polygon/p2p/service_test.go | 182 ++++++++++++++++++++++++++++++-- 3 files changed, 306 insertions(+), 19 deletions(-) diff --git a/polygon/p2p/fetcher.go b/polygon/p2p/fetcher.go index e9806907094..55b08191d03 100644 --- a/polygon/p2p/fetcher.go +++ b/polygon/p2p/fetcher.go @@ -2,11 +2,13 @@ package p2p import ( "context" + "errors" "fmt" "time" "github.com/ledgerwatch/log/v3" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/protocols/eth" @@ -63,8 +65,6 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe // // TODO 1) chunk request into smaller ranges if needed to fit in the 2 MiB response size soft limit // and also 1024 max headers server (check AnswerGetBlockHeadersQuery) - // 2) peer should return <= amount, check for > amount and penalize peer - // err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{ RequestId: requestId, GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ @@ -115,14 +115,35 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe } } - if uint64(len(headers)) != amount { + if err = f.validateHeadersResponse(headers, start, end, amount); err != nil { + shouldPenalize := errors.Is(err, &ErrIncorrectOriginHeader{}) || + errors.Is(err, &ErrTooManyHeaders{}) || + errors.Is(err, &ErrDisconnectedHeaders{}) + + if shouldPenalize { + f.logger.Debug("penalizing peer", "peerId", peerId, "err", err.Error()) + + penalizeErr := f.peerPenalizer.Penalize(ctx, peerId) + if penalizeErr != nil { + err = fmt.Errorf("%w: %w", penalizeErr, err) + } + } + + return nil, err + } + + return headers, nil +} + +func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, end, amount uint64) error { + if uint64(len(headers)) < amount { var first, last uint64 if len(headers) > 0 { first = headers[0].Number.Uint64() last = headers[len(headers)-1].Number.Uint64() } - return nil, &ErrIncompleteFetchHeadersResponse{ + return &ErrIncompleteHeaders{ requestStart: start, requestEnd: end, first: first, @@ -131,7 +152,44 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe } } - return headers, nil + if uint64(len(headers)) > amount { + return &ErrTooManyHeaders{ + requested: int(amount), + received: len(headers), + } + } + + if start != headers[0].Number.Uint64() { + return &ErrIncorrectOriginHeader{ + requested: start, + received: headers[0].Number.Uint64(), + } + } + + var parentHeader *types.Header + for _, header := range headers { + if parentHeader == nil { + parentHeader = header + continue + } + + parentHeaderHash := parentHeader.Hash() + currentHeaderNum := header.Number.Uint64() + parentHeaderNum := parentHeader.Number.Uint64() + if header.ParentHash != parentHeaderHash || currentHeaderNum != parentHeaderNum+1 { + return &ErrDisconnectedHeaders{ + currentHash: header.Hash(), + currentParentHash: header.ParentHash, + currentNum: currentHeaderNum, + parentHash: parentHeaderHash, + parentNum: parentHeaderNum, + } + } + + parentHeader = header + } + + return nil } type ErrInvalidFetchHeadersRange struct { @@ -143,7 +201,7 @@ func (e ErrInvalidFetchHeadersRange) Error() string { return fmt.Sprintf("invalid fetch headers range: start=%d, end=%d", e.start, e.end) } -type ErrIncompleteFetchHeadersResponse struct { +type ErrIncompleteHeaders struct { requestStart uint64 requestEnd uint64 first uint64 @@ -151,17 +209,84 @@ type ErrIncompleteFetchHeadersResponse struct { amount int } -func (e ErrIncompleteFetchHeadersResponse) Error() string { +func (e ErrIncompleteHeaders) Error() string { return fmt.Sprintf( "incomplete fetch headers response: first=%d, last=%d, amount=%d, requested [%d, %d)", e.first, e.last, e.amount, e.requestStart, e.requestEnd, ) } -func (e ErrIncompleteFetchHeadersResponse) LowestMissingBlockNum() uint64 { +func (e ErrIncompleteHeaders) LowestMissingBlockNum() uint64 { if e.last == 0 || e.first == 0 || e.first != e.requestStart { return e.requestStart } return e.last + 1 } + +type ErrTooManyHeaders struct { + requested int + received int +} + +func (e ErrTooManyHeaders) Error() string { + return fmt.Sprintf("too many headers in fetch headers response: requested=%d, received=%d", e.requested, e.received) +} + +func (e ErrTooManyHeaders) Is(err error) bool { + var errTooManyHeaders *ErrTooManyHeaders + switch { + case errors.As(err, &errTooManyHeaders): + return true + default: + return false + } +} + +type ErrDisconnectedHeaders struct { + currentHash libcommon.Hash + currentParentHash libcommon.Hash + currentNum uint64 + parentHash libcommon.Hash + parentNum uint64 +} + +func (e ErrDisconnectedHeaders) Error() string { + return fmt.Sprintf( + "disconnected headers in fetch headers response: %s, %s, %s, %s, %s", + fmt.Sprintf("currentHash=%v", e.currentHash), + fmt.Sprintf("currentParentHash=%v", e.currentParentHash), + fmt.Sprintf("currentNum=%v", e.currentNum), + fmt.Sprintf("parentHash=%v", e.parentHash), + fmt.Sprintf("parentNum=%v", e.parentNum), + ) +} + +func (e ErrDisconnectedHeaders) Is(err error) bool { + var errDisconnectedHeaders *ErrDisconnectedHeaders + switch { + case errors.As(err, &errDisconnectedHeaders): + return true + default: + return false + } +} + +type ErrIncorrectOriginHeader struct { + requested uint64 + received uint64 +} + +func (e ErrIncorrectOriginHeader) Error() string { + return fmt.Sprintf("incorrect origin header: requested=%d, received=%d", e.requested, e.received) +} + +func (e ErrIncorrectOriginHeader) Is(err error) bool { + var errIncorrectOriginHeader *ErrIncorrectOriginHeader + switch { + case errors.As(err, &errIncorrectOriginHeader): + return true + default: + return false + } +} diff --git a/polygon/p2p/fetcher_tracking.go b/polygon/p2p/fetcher_tracking.go index 3a3f38ee36e..be7e0434df8 100644 --- a/polygon/p2p/fetcher_tracking.go +++ b/polygon/p2p/fetcher_tracking.go @@ -31,7 +31,7 @@ type trackingFetcher struct { func (tf *trackingFetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) { res, err := tf.Fetcher.FetchHeaders(ctx, start, end, peerId) if err != nil { - var errIncompleteResponse *ErrIncompleteFetchHeadersResponse + var errIncompleteResponse *ErrIncompleteHeaders if errors.As(err, &errIncompleteResponse) { tf.peerTracker.BlockNumMissing(peerId, errIncompleteResponse.LowestMissingBlockNum()) } else if errors.Is(err, context.DeadlineExceeded) { diff --git a/polygon/p2p/service_test.go b/polygon/p2p/service_test.go index f52fe322dee..e43ad3b3616 100644 --- a/polygon/p2p/service_test.go +++ b/polygon/p2p/service_test.go @@ -16,6 +16,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/emptypb" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/direct" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon/core/types" @@ -291,13 +292,31 @@ func newMockRequestGenerator(requestIds ...uint64) RequestIdGenerator { } func newMockBlockHeadersPacket66Bytes(t *testing.T, requestId uint64, numHeaders int) []byte { + headers := newMockBlockHeaders(numHeaders) + return blockHeadersPacket66Bytes(t, requestId, headers) +} + +func newMockBlockHeaders(numHeaders int) []*types.Header { headers := make([]*types.Header, numHeaders) + var parentHeader *types.Header for i := range headers { + var parentHash libcommon.Hash + if parentHeader != nil { + parentHash = parentHeader.Hash() + } + headers[i] = &types.Header{ - Number: big.NewInt(int64(i) + 1), + Number: big.NewInt(int64(i) + 1), + ParentHash: parentHash, } + + parentHeader = headers[i] } + return headers +} + +func blockHeadersPacket66Bytes(t *testing.T, requestId uint64, headers []*types.Header) []byte { blockHeadersPacket66 := eth.BlockHeadersPacket66{ RequestId: requestId, BlockHeadersPacket: headers, @@ -367,7 +386,38 @@ func TestServiceErrInvalidFetchHeadersRange(t *testing.T) { }) } -func TestServiceFetchHeadersShouldPenalizePeerWhenErrInvalidRlpErr(t *testing.T) { +func TestServiceErrIncompleteHeaders(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockInboundMessages := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: newMockBlockHeadersPacket66Bytes(t, requestId, 2), + }, + } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 3, + } + + test := newServiceTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + test.run(func(ctx context.Context, t *testing.T) { + var errIncompleteHeaders *ErrIncompleteHeaders + headers, err := test.service.FetchHeaders(ctx, 1, 4, peerId) + require.ErrorAs(t, err, &errIncompleteHeaders) + require.Equal(t, uint64(3), errIncompleteHeaders.LowestMissingBlockNum()) + require.Nil(t, headers) + }) +} + +func TestServiceFetchHeadersShouldPenalizePeerWhenErrInvalidRlp(t *testing.T) { t.Parallel() peerId := PeerIdFromUint64(1) @@ -389,6 +439,7 @@ func TestServiceFetchHeadersShouldPenalizePeerWhenErrInvalidRlpErr(t *testing.T) test := newServiceTest(t, newMockRequestGenerator(requestId)) test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized test.mockExpectPenalizePeer(peerId) test.run(func(ctx context.Context, t *testing.T) { headers, err := test.service.FetchHeaders(ctx, 1, 3, peerId) @@ -397,6 +448,117 @@ func TestServiceFetchHeadersShouldPenalizePeerWhenErrInvalidRlpErr(t *testing.T) }) } +func TestServiceFetchHeadersShouldPenalizePeerWhenErrTooManyHeaders(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockInboundMessages := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + // response should contain 2 headers instead we return 5 + Data: newMockBlockHeadersPacket66Bytes(t, requestId, 5), + }, + } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 2, + } + + test := newServiceTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized + test.mockExpectPenalizePeer(peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errTooManyHeaders *ErrTooManyHeaders + headers, err := test.service.FetchHeaders(ctx, 1, 3, peerId) + require.ErrorAs(t, err, &errTooManyHeaders) + require.Equal(t, 2, errTooManyHeaders.requested) + require.Equal(t, 5, errTooManyHeaders.received) + require.Nil(t, headers) + }) +} + +func TestServiceFetchHeadersShouldPenalizePeerWhenErrDisconnectedHeaders(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockBlockHeaders := newMockBlockHeaders(5) + disconnectedHeaders := make([]*types.Header, 3) + disconnectedHeaders[0] = mockBlockHeaders[0] + disconnectedHeaders[1] = mockBlockHeaders[2] + disconnectedHeaders[2] = mockBlockHeaders[4] + mockInboundMessages := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: blockHeadersPacket66Bytes(t, requestId, disconnectedHeaders), + }, + } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 3, + } + + test := newServiceTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized + test.mockExpectPenalizePeer(peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errDisconnectedHeaders *ErrDisconnectedHeaders + headers, err := test.service.FetchHeaders(ctx, 1, 4, peerId) + require.ErrorAs(t, err, &errDisconnectedHeaders) + require.Equal(t, uint64(3), errDisconnectedHeaders.currentNum) + require.Equal(t, uint64(1), errDisconnectedHeaders.parentNum) + require.Nil(t, headers) + }) +} + +func TestServiceFetchHeadersShouldPenalizePeerWhenErrIncorrectOriginHeader(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockBlockHeaders := newMockBlockHeaders(3) + incorrectOriginHeaders := mockBlockHeaders[1:] + mockInboundMessages := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + // response should contain 2 headers instead we return 5 + Data: blockHeadersPacket66Bytes(t, requestId, incorrectOriginHeaders), + }, + } + mockRequestResponse := requestResponseMock{ + requestId: requestId, + mockResponseInboundMessages: mockInboundMessages, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 2, + } + + test := newServiceTest(t, newMockRequestGenerator(requestId)) + test.mockSentryStreams(mockRequestResponse) + // setup expectation that peer should be penalized + test.mockExpectPenalizePeer(peerId) + test.run(func(ctx context.Context, t *testing.T) { + var errIncorrectOriginHeader *ErrIncorrectOriginHeader + headers, err := test.service.FetchHeaders(ctx, 1, 3, peerId) + require.ErrorAs(t, err, &errIncorrectOriginHeader) + require.Equal(t, uint64(1), errIncorrectOriginHeader.requested) + require.Equal(t, uint64(2), errIncorrectOriginHeader.received) + require.Nil(t, headers) + }) +} + func TestListPeersMayHaveBlockNum(t *testing.T) { t.Parallel() @@ -451,15 +613,15 @@ func TestListPeersMayHaveBlockNum(t *testing.T) { peerIds = test.service.ListPeersMayHaveBlockNum(4) // peers which may have blocks 1,2,3,4 require.Len(t, peerIds, 2) - var errIncompleteFetchHeadersResponse *ErrIncompleteFetchHeadersResponse + var errIncompleteHeaders *ErrIncompleteHeaders headers, err = test.service.FetchHeaders(ctx, 3, 5, peerId1) // fetch headers 3 and 4 - require.ErrorAs(t, err, &errIncompleteFetchHeadersResponse) // peer 1 does not have headers 3 and 4 - require.Equal(t, uint64(3), errIncompleteFetchHeadersResponse.requestStart) - require.Equal(t, uint64(5), errIncompleteFetchHeadersResponse.requestEnd) - require.Equal(t, uint64(0), errIncompleteFetchHeadersResponse.first) - require.Equal(t, uint64(0), errIncompleteFetchHeadersResponse.last) - require.Equal(t, 0, errIncompleteFetchHeadersResponse.amount) - require.Equal(t, uint64(3), errIncompleteFetchHeadersResponse.LowestMissingBlockNum()) + require.ErrorAs(t, err, &errIncompleteHeaders) // peer 1 does not have headers 3 and 4 + require.Equal(t, uint64(3), errIncompleteHeaders.requestStart) + require.Equal(t, uint64(5), errIncompleteHeaders.requestEnd) + require.Equal(t, uint64(0), errIncompleteHeaders.first) + require.Equal(t, uint64(0), errIncompleteHeaders.last) + require.Equal(t, 0, errIncompleteHeaders.amount) + require.Equal(t, uint64(3), errIncompleteHeaders.LowestMissingBlockNum()) require.Nil(t, headers) // should be one peer less now given that we know that peer 1 does not have block num 4 From 0323b19ee6eadb190139848eef3172be68a82df3 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Wed, 28 Feb 2024 19:19:07 +0000 Subject: [PATCH 05/10] implement chunking in fetch headers --- polygon/p2p/fetcher.go | 87 +++++++++++++++++++++++++------------ polygon/p2p/service_test.go | 55 +++++++++++++++++++++++ 2 files changed, 114 insertions(+), 28 deletions(-) diff --git a/polygon/p2p/fetcher.go b/polygon/p2p/fetcher.go index 55b08191d03..772407ebd0a 100644 --- a/polygon/p2p/fetcher.go +++ b/polygon/p2p/fetcher.go @@ -7,15 +7,19 @@ import ( "time" "github.com/ledgerwatch/log/v3" + "modernc.org/mathutil" - libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/protocols/eth" "github.com/ledgerwatch/erigon/rlp" ) -const responseTimeout = 5 * time.Second +const ( + responseTimeout = 5 * time.Second + maxFetchHeadersRange = 16384 +) type RequestIdGenerator func() uint64 @@ -56,34 +60,53 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe } amount := end - start - requestId := f.requestIdGenerator() - observer := make(ChanMessageObserver[*sentry.InboundMessage]) + if amount > maxFetchHeadersRange { + return nil, &ErrInvalidFetchHeadersRange{ + start: start, + end: end, + } + } + // Soft response limits are: + // 1. 2 MB size + // 2. 1024 headers + // + // A header is approximately 500 bytes, hence 1024 headers should be less than 2 MB. + // As a simplification we can only use MaxHeadersServe for chunking. + chunks := amount / eth.MaxHeadersServe + if amount%eth.MaxHeadersServe > 0 { + chunks++ + } + + observer := make(ChanMessageObserver[*sentry.InboundMessage], chunks) f.messageListener.RegisterBlockHeadersObserver(observer) defer f.messageListener.UnregisterBlockHeadersObserver(observer) - // - // TODO 1) chunk request into smaller ranges if needed to fit in the 2 MiB response size soft limit - // and also 1024 max headers server (check AnswerGetBlockHeadersQuery) - err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{ - RequestId: requestId, - GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ - Origin: eth.HashOrNumber{ - Number: start, + requestIds := make(map[uint64]uint64, chunks) + for i := uint64(0); i < chunks; i++ { + chunkStart := start + i*eth.MaxHeadersServe + chunkAmount := mathutil.MinUint64(end-chunkStart, eth.MaxHeadersServe) + requestId := f.requestIdGenerator() + requestIds[requestId] = i + + if err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{ + RequestId: requestId, + GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ + Origin: eth.HashOrNumber{ + Number: chunkStart, + }, + Amount: chunkAmount, }, - Amount: amount, - }, - }) - if err != nil { - return nil, err + }); err != nil { + return nil, err + } } - ctx, cancel := context.WithTimeout(ctx, responseTimeout) + ctx, cancel := context.WithTimeout(ctx, time.Duration(len(requestIds))*responseTimeout) defer cancel() - var headers []*types.Header - var requestReceived bool - for !requestReceived { + headerChunks := make([][]*types.Header, chunks) + for len(requestIds) > 0 { select { case <-ctx.Done(): return nil, fmt.Errorf("interrupted while waiting for msg from peer: %w", ctx.Err()) @@ -106,16 +129,24 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe return nil, fmt.Errorf("failed to decode BlockHeadersPacket66: %w", err) } - if pkt.RequestId != requestId { + requestIdIndex, ok := requestIds[pkt.RequestId] + if !ok { continue } - headers = pkt.BlockHeadersPacket - requestReceived = true + headerChunks[requestIdIndex] = pkt.BlockHeadersPacket + delete(requestIds, pkt.RequestId) + } + } + + headers := make([]*types.Header, 0, amount) + for _, headerChunk := range headerChunks { + for _, header := range headerChunk { + headers = append(headers, header) } } - if err = f.validateHeadersResponse(headers, start, end, amount); err != nil { + if err := f.validateHeadersResponse(headers, start, end, amount); err != nil { shouldPenalize := errors.Is(err, &ErrIncorrectOriginHeader{}) || errors.Is(err, &ErrTooManyHeaders{}) || errors.Is(err, &ErrDisconnectedHeaders{}) @@ -244,10 +275,10 @@ func (e ErrTooManyHeaders) Is(err error) bool { } type ErrDisconnectedHeaders struct { - currentHash libcommon.Hash - currentParentHash libcommon.Hash + currentHash common.Hash + currentParentHash common.Hash currentNum uint64 - parentHash libcommon.Hash + parentHash common.Hash parentNum uint64 } diff --git a/polygon/p2p/service_test.go b/polygon/p2p/service_test.go index e43ad3b3616..8639d75e970 100644 --- a/polygon/p2p/service_test.go +++ b/polygon/p2p/service_test.go @@ -371,6 +371,55 @@ func TestServiceFetchHeaders(t *testing.T) { }) } +func TestServiceFetchHeadersWithChunking(t *testing.T) { + t.Parallel() + + peerId := PeerIdFromUint64(1) + mockHeaders := newMockBlockHeaders(1999) + requestId1 := uint64(1234) + mockInboundMessages1 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + // 1024 headers in first response + Data: blockHeadersPacket66Bytes(t, requestId1, mockHeaders[:1025]), + }, + } + mockRequestResponse1 := requestResponseMock{ + requestId: requestId1, + mockResponseInboundMessages: mockInboundMessages1, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1, + wantRequestAmount: 1024, + } + requestId2 := uint64(1235) + mockInboundMessages2 := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + // remaining 975 headers in second response + Data: blockHeadersPacket66Bytes(t, requestId2, mockHeaders[1025:]), + }, + } + mockRequestResponse2 := requestResponseMock{ + requestId: requestId2, + mockResponseInboundMessages: mockInboundMessages2, + wantRequestPeerId: peerId, + wantRequestOriginNumber: 1025, + wantRequestAmount: 975, + } + + test := newServiceTest(t, newMockRequestGenerator(requestId1, requestId2)) + test.mockSentryStreams(mockRequestResponse1, mockRequestResponse2) + test.run(func(ctx context.Context, t *testing.T) { + headers, err := test.service.FetchHeaders(ctx, 1, 2000, peerId) + require.NoError(t, err) + require.Len(t, headers, 1999) + require.Equal(t, uint64(1), headers[0].Number.Uint64()) + require.Equal(t, uint64(1999), headers[len(headers)-1].Number.Uint64()) + }) +} + func TestServiceErrInvalidFetchHeadersRange(t *testing.T) { t.Parallel() @@ -383,6 +432,12 @@ func TestServiceErrInvalidFetchHeadersRange(t *testing.T) { require.Equal(t, uint64(3), errInvalidFetchHeadersRange.start) require.Equal(t, uint64(1), errInvalidFetchHeadersRange.end) require.Nil(t, headers) + + headers, err = test.service.FetchHeaders(ctx, 1, 1+maxFetchHeadersRange+1, PeerIdFromUint64(1)) + require.ErrorAs(t, err, &errInvalidFetchHeadersRange) + require.Equal(t, uint64(1), errInvalidFetchHeadersRange.start) + require.Equal(t, uint64(1+maxFetchHeadersRange+1), errInvalidFetchHeadersRange.end) + require.Nil(t, headers) }) } From c361a950e9077bf62659e6313d5256698d5da8bf Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Wed, 28 Feb 2024 19:24:13 +0000 Subject: [PATCH 06/10] tidy up imports --- polygon/sync/service.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/polygon/sync/service.go b/polygon/sync/service.go index 75ba1b8c6aa..26c6c3d25db 100644 --- a/polygon/sync/service.go +++ b/polygon/sync/service.go @@ -7,14 +7,14 @@ import ( "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon-lib/chain" - libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/direct" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/polygon/bor/borcfg" "github.com/ledgerwatch/erigon/polygon/heimdall" - polygonP2P "github.com/ledgerwatch/erigon/polygon/p2p" + "github.com/ledgerwatch/erigon/polygon/p2p" ) type Service interface { @@ -24,7 +24,7 @@ type Service interface { type service struct { sync *Sync - p2pService polygonP2P.Service + p2pService p2p.Service } func NewService( @@ -39,7 +39,7 @@ func NewService( storage := NewStorage() execution := NewExecutionClient(engine) verify := VerifyAccumulatedHeaders - p2pService := polygonP2P.NewService(maxPeers, logger, sentryClient) + p2pService := p2p.NewService(maxPeers, logger, sentryClient) heimdallClient := heimdall.NewHeimdallClient(heimdallURL, logger) heimdallService := heimdall.NewHeimdallNoStore(heimdallClient, logger) downloader := NewHeaderDownloader( @@ -50,7 +50,7 @@ func NewService( storage, ) spansCache := NewSpansCache() - signaturesCache, err := lru.NewARC[libcommon.Hash, libcommon.Address](stagedsync.InMemorySignatures) + signaturesCache, err := lru.NewARC[common.Hash, common.Address](stagedsync.InMemorySignatures) if err != nil { panic(err) } From d3750b43cf7f77cb9376ca029f06e92cdf942c86 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Thu, 29 Feb 2024 11:48:19 +0000 Subject: [PATCH 07/10] go mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 7b324c9f913..c998a1c8dfc 100644 --- a/go.mod +++ b/go.mod @@ -104,6 +104,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 + modernc.org/mathutil v1.6.0 modernc.org/sqlite v1.28.0 pgregory.net/rapid v1.1.0 sigs.k8s.io/yaml v1.4.0 @@ -280,7 +281,6 @@ require ( modernc.org/cc/v3 v3.41.0 // indirect modernc.org/ccgo/v3 v3.16.15 // indirect modernc.org/libc v1.29.0 // indirect - modernc.org/mathutil v1.6.0 // indirect modernc.org/memory v1.7.2 // indirect modernc.org/opt v0.1.3 // indirect modernc.org/strutil v1.2.0 // indirect From b73eaee53366aa650416b8010954b4f46e1f6471 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Thu, 29 Feb 2024 12:01:03 +0000 Subject: [PATCH 08/10] sequential requests sent to peer --- polygon/p2p/fetcher.go | 96 +++++++++++++++++-------------------- polygon/p2p/service_test.go | 6 --- 2 files changed, 45 insertions(+), 57 deletions(-) diff --git a/polygon/p2p/fetcher.go b/polygon/p2p/fetcher.go index 772407ebd0a..bda98a97457 100644 --- a/polygon/p2p/fetcher.go +++ b/polygon/p2p/fetcher.go @@ -16,10 +16,7 @@ import ( "github.com/ledgerwatch/erigon/rlp" ) -const ( - responseTimeout = 5 * time.Second - maxFetchHeadersRange = 16384 -) +const responseTimeout = 5 * time.Second type RequestIdGenerator func() uint64 @@ -59,37 +56,29 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe } } - amount := end - start - if amount > maxFetchHeadersRange { - return nil, &ErrInvalidFetchHeadersRange{ - start: start, - end: end, - } - } - // Soft response limits are: // 1. 2 MB size // 2. 1024 headers // // A header is approximately 500 bytes, hence 1024 headers should be less than 2 MB. // As a simplification we can only use MaxHeadersServe for chunking. + amount := end - start chunks := amount / eth.MaxHeadersServe if amount%eth.MaxHeadersServe > 0 { chunks++ } - observer := make(ChanMessageObserver[*sentry.InboundMessage], chunks) + headers := make([]*types.Header, 0, amount) + observer := make(ChanMessageObserver[*sentry.InboundMessage]) f.messageListener.RegisterBlockHeadersObserver(observer) defer f.messageListener.UnregisterBlockHeadersObserver(observer) - requestIds := make(map[uint64]uint64, chunks) for i := uint64(0); i < chunks; i++ { chunkStart := start + i*eth.MaxHeadersServe chunkAmount := mathutil.MinUint64(end-chunkStart, eth.MaxHeadersServe) requestId := f.requestIdGenerator() - requestIds[requestId] = i - if err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{ + err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{ RequestId: requestId, GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ Origin: eth.HashOrNumber{ @@ -97,19 +86,52 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe }, Amount: chunkAmount, }, - }); err != nil { + }) + if err != nil { return nil, err } + + headerChunk, err := f.awaitHeadersResponse(ctx, requestId, peerId, observer) + if err != nil { + return nil, err + } + + headers = append(headers, headerChunk...) } - ctx, cancel := context.WithTimeout(ctx, time.Duration(len(requestIds))*responseTimeout) + if err := f.validateHeadersResponse(headers, start, end, amount); err != nil { + shouldPenalize := errors.Is(err, &ErrIncorrectOriginHeader{}) || + errors.Is(err, &ErrTooManyHeaders{}) || + errors.Is(err, &ErrDisconnectedHeaders{}) + + if shouldPenalize { + f.logger.Debug("penalizing peer", "peerId", peerId, "err", err.Error()) + + penalizeErr := f.peerPenalizer.Penalize(ctx, peerId) + if penalizeErr != nil { + err = fmt.Errorf("%w: %w", penalizeErr, err) + } + } + + return nil, err + } + + return headers, nil +} + +func (f *fetcher) awaitHeadersResponse( + ctx context.Context, + requestId uint64, + peerId PeerId, + observer ChanMessageObserver[*sentry.InboundMessage], +) ([]*types.Header, error) { + ctx, cancel := context.WithTimeout(ctx, responseTimeout) defer cancel() - headerChunks := make([][]*types.Header, chunks) - for len(requestIds) > 0 { + for { select { case <-ctx.Done(): - return nil, fmt.Errorf("interrupted while waiting for msg from peer: %w", ctx.Err()) + return nil, fmt.Errorf("headers response await interrupted: %w", ctx.Err()) case msg := <-observer: msgPeerId := PeerIdFromH512(msg.PeerId) if msgPeerId != peerId { @@ -129,41 +151,13 @@ func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, pe return nil, fmt.Errorf("failed to decode BlockHeadersPacket66: %w", err) } - requestIdIndex, ok := requestIds[pkt.RequestId] - if !ok { + if pkt.RequestId != requestId { continue } - headerChunks[requestIdIndex] = pkt.BlockHeadersPacket - delete(requestIds, pkt.RequestId) - } - } - - headers := make([]*types.Header, 0, amount) - for _, headerChunk := range headerChunks { - for _, header := range headerChunk { - headers = append(headers, header) - } - } - - if err := f.validateHeadersResponse(headers, start, end, amount); err != nil { - shouldPenalize := errors.Is(err, &ErrIncorrectOriginHeader{}) || - errors.Is(err, &ErrTooManyHeaders{}) || - errors.Is(err, &ErrDisconnectedHeaders{}) - - if shouldPenalize { - f.logger.Debug("penalizing peer", "peerId", peerId, "err", err.Error()) - - penalizeErr := f.peerPenalizer.Penalize(ctx, peerId) - if penalizeErr != nil { - err = fmt.Errorf("%w: %w", penalizeErr, err) - } + return pkt.BlockHeadersPacket, nil } - - return nil, err } - - return headers, nil } func (f *fetcher) validateHeadersResponse(headers []*types.Header, start, end, amount uint64) error { diff --git a/polygon/p2p/service_test.go b/polygon/p2p/service_test.go index 8639d75e970..2264511d9c3 100644 --- a/polygon/p2p/service_test.go +++ b/polygon/p2p/service_test.go @@ -432,12 +432,6 @@ func TestServiceErrInvalidFetchHeadersRange(t *testing.T) { require.Equal(t, uint64(3), errInvalidFetchHeadersRange.start) require.Equal(t, uint64(1), errInvalidFetchHeadersRange.end) require.Nil(t, headers) - - headers, err = test.service.FetchHeaders(ctx, 1, 1+maxFetchHeadersRange+1, PeerIdFromUint64(1)) - require.ErrorAs(t, err, &errInvalidFetchHeadersRange) - require.Equal(t, uint64(1), errInvalidFetchHeadersRange.start) - require.Equal(t, uint64(1+maxFetchHeadersRange+1), errInvalidFetchHeadersRange.end) - require.Nil(t, headers) }) } From 74ae8cee98d7cdbdcffbc21f54c242c6c97bca17 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Thu, 29 Feb 2024 12:13:40 +0000 Subject: [PATCH 09/10] simplify some test code --- polygon/p2p/service_test.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/polygon/p2p/service_test.go b/polygon/p2p/service_test.go index 2264511d9c3..32484059b52 100644 --- a/polygon/p2p/service_test.go +++ b/polygon/p2p/service_test.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math/big" - "sync" "testing" "time" @@ -41,14 +40,13 @@ func newServiceTest(t *testing.T, requestIdGenerator RequestIdGenerator) *servic } type serviceTest struct { - ctx context.Context - ctxCancel context.CancelFunc - t *testing.T - sentryClient *direct.MockSentryClient - service Service - headersRequestResponseMocksMu sync.Mutex - headersRequestResponseMocks map[uint64]requestResponseMock - peerEvents chan *sentry.PeerEvent + ctx context.Context + ctxCancel context.CancelFunc + t *testing.T + sentryClient *direct.MockSentryClient + service Service + headersRequestResponseMocks map[uint64]requestResponseMock + peerEvents chan *sentry.PeerEvent } // run is needed so that we can properly shut down tests involving the p2p service due to how the sentry multi @@ -112,8 +110,6 @@ func (st *serviceTest) mockSentryStreams(mocks ...requestResponseMock) { } func (st *serviceTest) mockSentryInboundMessagesStream(mocks ...requestResponseMock) { - st.headersRequestResponseMocksMu.Lock() - defer st.headersRequestResponseMocksMu.Unlock() for _, mock := range mocks { st.headersRequestResponseMocks[mock.requestId] = mock } @@ -142,13 +138,12 @@ func (st *serviceTest) mockSentryInboundMessagesStream(mocks ...requestResponseM return nil, err } - st.headersRequestResponseMocksMu.Lock() - defer st.headersRequestResponseMocksMu.Unlock() mock, ok := st.headersRequestResponseMocks[pkt.RequestId] if !ok { return nil, fmt.Errorf("unexpected request id: %d", pkt.RequestId) } + delete(st.headersRequestResponseMocks, pkt.RequestId) reqPeerId := PeerIdFromH512(req.PeerId) if mock.wantRequestPeerId != reqPeerId { return nil, fmt.Errorf("wantRequestPeerId != reqPeerId - %v vs %v", mock.wantRequestPeerId, reqPeerId) From 973659ebb28b7de041529c915c1ce7e05b07b9af Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Thu, 29 Feb 2024 12:16:58 +0000 Subject: [PATCH 10/10] tidy log msg --- polygon/p2p/fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polygon/p2p/fetcher.go b/polygon/p2p/fetcher.go index bda98a97457..c7dc0add742 100644 --- a/polygon/p2p/fetcher.go +++ b/polygon/p2p/fetcher.go @@ -131,7 +131,7 @@ func (f *fetcher) awaitHeadersResponse( for { select { case <-ctx.Done(): - return nil, fmt.Errorf("headers response await interrupted: %w", ctx.Err()) + return nil, fmt.Errorf("await headers response interrupted: %w", ctx.Err()) case msg := <-observer: msgPeerId := PeerIdFromH512(msg.PeerId) if msgPeerId != peerId {