diff --git a/polygon/p2p/downloader.go b/polygon/p2p/downloader.go deleted file mode 100644 index b0dd2274f8a..00000000000 --- a/polygon/p2p/downloader.go +++ /dev/null @@ -1,129 +0,0 @@ -package p2p - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/ledgerwatch/log/v3" - "golang.org/x/sync/errgroup" - - "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/protocols/eth" - "github.com/ledgerwatch/erigon/rlp" -) - -const reqRespTimeout = 5 * time.Second - -var invalidDownloadHeadersRangeErr = errors.New("invalid download headers range") - -type RequestIdGenerator func() uint64 - -type Downloader interface { - DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) -} - -func NewDownloader( - logger log.Logger, - messageListener MessageListener, - messageBroadcaster MessageBroadcaster, - peerPenalizer PeerPenalizer, - requestIdGenerator RequestIdGenerator, -) Downloader { - return &downloader{ - logger: logger, - messageListener: messageListener, - messageBroadcaster: messageBroadcaster, - peerPenalizer: peerPenalizer, - requestIdGenerator: requestIdGenerator, - } -} - -type downloader struct { - logger log.Logger - messageListener MessageListener - messageBroadcaster MessageBroadcaster - peerPenalizer PeerPenalizer - requestIdGenerator RequestIdGenerator -} - -func (d *downloader) DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) { - if start > end { - return nil, fmt.Errorf("%w: start=%d, end=%d", invalidDownloadHeadersRangeErr, start, end) - } - - var headers []*types.Header - requestId := d.requestIdGenerator() - - observer := make(ChanMessageObserver[*sentry.InboundMessage]) - d.messageListener.RegisterBlockHeaders66Observer(observer) - defer d.messageListener.UnregisterBlockHeaders66Observer(observer) - - ctx, cancel := context.WithTimeout(ctx, reqRespTimeout) - defer cancel() - g, ctx := errgroup.WithContext(ctx) - - // - // TODO chunk request into smaller ranges if needed to fit in the 10 MiB response size - // TODO peer should return <= amount, check for > amount and penalize peer - // - - g.Go(func() error { - for { - select { - case <-ctx.Done(): - return fmt.Errorf("interrupted while waiting for msg from peer: %w", ctx.Err()) - case msg := <-observer: - msgPeerId := PeerIdFromH512(msg.PeerId) - if msgPeerId != peerId { - continue - } - - var pkt eth.BlockHeadersPacket66 - if err := rlp.DecodeBytes(msg.Data, &pkt); err != nil { - if rlp.IsInvalidRLPError(err) { - d.logger.Debug("penalizing peer for invalid rlp response", "peerId", peerId) - penalizeErr := d.peerPenalizer.Penalize(ctx, peerId) - if penalizeErr != nil { - err = fmt.Errorf("%w: %w", penalizeErr, err) - } - } - - return fmt.Errorf("failed to decode BlockHeadersPacket66: %w", err) - } - - if pkt.RequestId != requestId { - continue - } - - headers = pkt.BlockHeadersPacket - return nil - } - } - }) - - g.Go(func() error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - return d.messageBroadcaster.GetBlockHeaders66(ctx, peerId, eth.GetBlockHeadersPacket66{ - RequestId: requestId, - GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ - Origin: eth.HashOrNumber{ - Number: start, - }, - Amount: end - start + 1, - }, - }) - } - }) - - if err := g.Wait(); err != nil { - return nil, err - } - - return headers, nil -} diff --git a/polygon/p2p/fetcher.go b/polygon/p2p/fetcher.go new file mode 100644 index 00000000000..343de59316b --- /dev/null +++ b/polygon/p2p/fetcher.go @@ -0,0 +1,125 @@ +package p2p + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/errgroup" + + "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/rlp" +) + +const responseTimeout = 5 * time.Second + +var invalidFetchHeadersRangeErr = errors.New("invalid fetch headers range") + +type RequestIdGenerator func() uint64 + +type Fetcher interface { + FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) +} + +func NewFetcher( + logger log.Logger, + messageListener MessageListener, + messageSender MessageSender, + peerPenalizer PeerPenalizer, + requestIdGenerator RequestIdGenerator, +) Fetcher { + return &fetcher{ + logger: logger, + messageListener: messageListener, + messageSender: messageSender, + peerPenalizer: peerPenalizer, + requestIdGenerator: requestIdGenerator, + } +} + +type fetcher struct { + logger log.Logger + messageListener MessageListener + messageSender MessageSender + peerPenalizer PeerPenalizer + requestIdGenerator RequestIdGenerator +} + +func (f *fetcher) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) { + if start >= end { + return nil, fmt.Errorf("%w: start=%d, end=%d", invalidFetchHeadersRangeErr, start, end) + } + + var headers []*types.Header + requestId := f.requestIdGenerator() + + observer := make(ChanMessageObserver[*sentry.InboundMessage]) + f.messageListener.RegisterBlockHeadersObserver(observer) + defer f.messageListener.UnregisterBlockHeadersObserver(observer) + + // + // TODO 1) chunk request into smaller ranges if needed to fit in the 2 MiB response size soft limit + // and also 1024 max headers server (check AnswerGetBlockHeadersQuery) + // 2) peer should return <= amount, check for > amount and penalize peer + // + err := f.messageSender.SendGetBlockHeaders(ctx, peerId, eth.GetBlockHeadersPacket66{ + RequestId: requestId, + GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ + Origin: eth.HashOrNumber{ + Number: start, + }, + Amount: end - start, + }, + }) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(ctx, responseTimeout) + defer cancel() + wg, ctx := errgroup.WithContext(ctx) + + wg.Go(func() error { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("interrupted while waiting for msg from peer: %w", ctx.Err()) + case msg := <-observer: + msgPeerId := PeerIdFromH512(msg.PeerId) + if msgPeerId != peerId { + continue + } + + var pkt eth.BlockHeadersPacket66 + if err := rlp.DecodeBytes(msg.Data, &pkt); err != nil { + if rlp.IsInvalidRLPError(err) { + f.logger.Debug("penalizing peer for invalid rlp response", "peerId", peerId) + penalizeErr := f.peerPenalizer.Penalize(ctx, peerId) + if penalizeErr != nil { + err = fmt.Errorf("%w: %w", penalizeErr, err) + } + } + + return fmt.Errorf("failed to decode BlockHeadersPacket66: %w", err) + } + + if pkt.RequestId != requestId { + continue + } + + headers = pkt.BlockHeadersPacket + return nil + } + } + }) + + if err := wg.Wait(); err != nil { + return nil, err + } + + return headers, nil +} diff --git a/polygon/p2p/message_listener.go b/polygon/p2p/message_listener.go index 362e1981f39..0034e3982a6 100644 --- a/polygon/p2p/message_listener.go +++ b/polygon/p2p/message_listener.go @@ -15,8 +15,8 @@ import ( type MessageListener interface { Start(ctx context.Context) Stop() - RegisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) - UnregisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) + RegisterBlockHeadersObserver(observer MessageObserver[*sentry.InboundMessage]) + UnregisterBlockHeadersObserver(observer MessageObserver[*sentry.InboundMessage]) } func NewMessageListener(logger log.Logger, sentryClient direct.SentryClient) MessageListener { @@ -50,11 +50,11 @@ func (ml *messageListener) Stop() { ml.stopWg.Wait() } -func (ml *messageListener) RegisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) { +func (ml *messageListener) RegisterBlockHeadersObserver(observer MessageObserver[*sentry.InboundMessage]) { ml.registerInboundMessageObserver(observer, sentry.MessageId_BLOCK_HEADERS_66) } -func (ml *messageListener) UnregisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) { +func (ml *messageListener) UnregisterBlockHeadersObserver(observer MessageObserver[*sentry.InboundMessage]) { ml.unregisterInboundMessageObserver(observer, sentry.MessageId_BLOCK_HEADERS_66) } diff --git a/polygon/p2p/message_broadcaster.go b/polygon/p2p/message_sender.go similarity index 53% rename from polygon/p2p/message_broadcaster.go rename to polygon/p2p/message_sender.go index a1e5022e2f6..1ec1a121916 100644 --- a/polygon/p2p/message_broadcaster.go +++ b/polygon/p2p/message_sender.go @@ -9,27 +9,27 @@ import ( "github.com/ledgerwatch/erigon/rlp" ) -type MessageBroadcaster interface { - GetBlockHeaders66(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error +type MessageSender interface { + SendGetBlockHeaders(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error } -func NewMessageBroadcaster(sentryClient direct.SentryClient) MessageBroadcaster { - return &messageBroadcaster{ +func NewMessageSender(sentryClient direct.SentryClient) MessageSender { + return &messageSender{ sentryClient: sentryClient, } } -type messageBroadcaster struct { +type messageSender struct { sentryClient direct.SentryClient } -func (mb *messageBroadcaster) GetBlockHeaders66(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error { +func (ms *messageSender) SendGetBlockHeaders(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error { data, err := rlp.EncodeToBytes(req) if err != nil { return err } - _, err = mb.sentryClient.SendMessageById(ctx, &sentry.SendMessageByIdRequest{ + _, err = ms.sentryClient.SendMessageById(ctx, &sentry.SendMessageByIdRequest{ PeerId: peerId.H512(), Data: &sentry.OutboundMessageData{ Id: sentry.MessageId_GET_BLOCK_HEADERS_66, diff --git a/polygon/p2p/service.go b/polygon/p2p/service.go index 27190b0062c..af9697092cf 100644 --- a/polygon/p2p/service.go +++ b/polygon/p2p/service.go @@ -18,7 +18,8 @@ type Service interface { Stop() MaxPeers() int PeersSyncProgress() PeersSyncProgress - DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) + // FetchHeaders fetches [start,end) headers from a peer. Blocks until data is received. + FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) Penalize(ctx context.Context, peerId PeerId) error } @@ -33,12 +34,12 @@ func newService( requestIdGenerator RequestIdGenerator, ) Service { messageListener := NewMessageListener(logger, sentryClient) - messageBroadcaster := NewMessageBroadcaster(sentryClient) + messageSender := NewMessageSender(sentryClient) peerPenalizer := NewPeerPenalizer(sentryClient) - downloader := NewDownloader(logger, messageListener, messageBroadcaster, peerPenalizer, requestIdGenerator) + fetcher := NewFetcher(logger, messageListener, messageSender, peerPenalizer, requestIdGenerator) return &service{ config: config, - downloader: downloader, + fetcher: fetcher, messageListener: messageListener, peerPenalizer: peerPenalizer, } @@ -47,7 +48,7 @@ func newService( type service struct { once sync.Once config p2p.Config - downloader Downloader + fetcher Fetcher messageListener MessageListener peerPenalizer PeerPenalizer } @@ -66,8 +67,8 @@ func (s *service) MaxPeers() int { return s.config.MaxPeers } -func (s *service) DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) { - return s.downloader.DownloadHeaders(ctx, start, end, peerId) +func (s *service) FetchHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) { + return s.fetcher.FetchHeaders(ctx, start, end, peerId) } func (s *service) Penalize(ctx context.Context, peerId PeerId) error { diff --git a/polygon/p2p/service_mock.go b/polygon/p2p/service_mock.go index 5bc5899ce24..87962e2de48 100644 --- a/polygon/p2p/service_mock.go +++ b/polygon/p2p/service_mock.go @@ -35,19 +35,19 @@ func (m *MockService) EXPECT() *MockServiceMockRecorder { return m.recorder } -// DownloadHeaders mocks base method. -func (m *MockService) DownloadHeaders(arg0 context.Context, arg1, arg2 uint64, arg3 PeerId) ([]*types.Header, error) { +// FetchHeaders mocks base method. +func (m *MockService) FetchHeaders(arg0 context.Context, arg1, arg2 uint64, arg3 PeerId) ([]*types.Header, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DownloadHeaders", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "FetchHeaders", arg0, arg1, arg2, arg3) ret0, _ := ret[0].([]*types.Header) ret1, _ := ret[1].(error) return ret0, ret1 } -// DownloadHeaders indicates an expected call of DownloadHeaders. -func (mr *MockServiceMockRecorder) DownloadHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +// FetchHeaders indicates an expected call of FetchHeaders. +func (mr *MockServiceMockRecorder) FetchHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadHeaders", reflect.TypeOf((*MockService)(nil).DownloadHeaders), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchHeaders", reflect.TypeOf((*MockService)(nil).FetchHeaders), arg0, arg1, arg2, arg3) } // MaxPeers mocks base method. diff --git a/polygon/p2p/service_test.go b/polygon/p2p/service_test.go index 8d3820dc4a0..c93ac757792 100644 --- a/polygon/p2p/service_test.go +++ b/polygon/p2p/service_test.go @@ -77,7 +77,12 @@ func (st *serviceTest) mockExpectPenalizePeer(peerId PeerId) { Times(1) } -func (st *serviceTest) mockSentryBlockHeaders66InboundMessageStream(msgs []*sentry.InboundMessage, peerId PeerId) { +func (st *serviceTest) mockSentryBlockHeaders66InboundMessageStream( + msgs []*sentry.InboundMessage, + peerId PeerId, + wantOriginNumber uint64, + wantAmount uint64, +) { var wg sync.WaitGroup if len(msgs) > 0 { wg.Add(1) @@ -91,7 +96,7 @@ func (st *serviceTest) mockSentryBlockHeaders66InboundMessageStream(msgs []*sent st.sentryClient. EXPECT(). SendMessageById(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(newSendGetBlockHeaders66MessageMock(&wg, peerId, sentry.MessageId_GET_BLOCK_HEADERS_66, 1, 3)). + DoAndReturn(newSendGetBlockHeaders66MessageMock(&wg, peerId, wantOriginNumber, wantAmount)). AnyTimes() st.sentryClient. EXPECT(). @@ -195,7 +200,6 @@ func newMockBlockHeadersPacket66Bytes(t *testing.T, requestId uint64) []byte { func newSendGetBlockHeaders66MessageMock( wg *sync.WaitGroup, wantPeerId PeerId, - wantMessageId sentry.MessageId, wantOriginNumber uint64, wantAmount uint64, ) sendMessageByIdMock { @@ -207,8 +211,8 @@ func newSendGetBlockHeaders66MessageMock( return nil, fmt.Errorf("wantPeerId != reqPeerId - %v vs %v", wantPeerId, reqPeerId) } - if wantMessageId != req.Data.Id { - return nil, fmt.Errorf("wantMessageId != req.Data.Id - %v vs %v", wantMessageId, req.Data.Id) + if sentry.MessageId_GET_BLOCK_HEADERS_66 != req.Data.Id { + return nil, fmt.Errorf("MessageId_GET_BLOCK_HEADERS_66 != req.Data.Id - %v", req.Data.Id) } var pkt eth.GetBlockHeadersPacket66 @@ -257,9 +261,9 @@ func TestServiceDownloadHeaders(t *testing.T) { } test := newServiceTest(t, newMockRequestGenerator(requestId)) - test.mockSentryBlockHeaders66InboundMessageStream(mockInboundMessages, peerId) + test.mockSentryBlockHeaders66InboundMessageStream(mockInboundMessages, peerId, 1, 2) test.run(ctx, func(t *testing.T) { - headers, err := test.service.DownloadHeaders(ctx, 1, 3, peerId) + headers, err := test.service.FetchHeaders(ctx, 1, 3, peerId) require.NoError(t, err) require.Len(t, headers, 3) require.Equal(t, uint64(1), headers[0].Number.Uint64()) @@ -271,10 +275,10 @@ func TestServiceDownloadHeaders(t *testing.T) { func TestServiceInvalidDownloadHeadersRangeErr(t *testing.T) { ctx := context.Background() test := newServiceTest(t, newMockRequestGenerator(1)) - test.mockSentryBlockHeaders66InboundMessageStream(nil, PeerId{}) + test.mockSentryBlockHeaders66InboundMessageStream(nil, PeerId{}, 1, 2) test.run(ctx, func(t *testing.T) { - headers, err := test.service.DownloadHeaders(ctx, 3, 1, PeerIdFromUint64(1)) - require.ErrorIs(t, err, invalidDownloadHeadersRangeErr) + headers, err := test.service.FetchHeaders(ctx, 3, 1, PeerIdFromUint64(1)) + require.ErrorIs(t, err, invalidFetchHeadersRangeErr) require.Nil(t, headers) }) } @@ -292,10 +296,10 @@ func TestServiceDownloadHeadersShouldPenalizePeerWhenInvalidRlpErr(t *testing.T) } test := newServiceTest(t, newMockRequestGenerator(requestId)) - test.mockSentryBlockHeaders66InboundMessageStream(mockInboundMessages, peerId) + test.mockSentryBlockHeaders66InboundMessageStream(mockInboundMessages, peerId, 1, 2) test.mockExpectPenalizePeer(peerId) test.run(ctx, func(t *testing.T) { - headers, err := test.service.DownloadHeaders(ctx, 1, 3, peerId) + headers, err := test.service.FetchHeaders(ctx, 1, 3, peerId) require.Error(t, err) require.Nil(t, headers) }) diff --git a/polygon/sync/header_downloader.go b/polygon/sync/header_downloader.go index c2a3a1fc66c..a71d9b4a731 100644 --- a/polygon/sync/header_downloader.go +++ b/polygon/sync/header_downloader.go @@ -141,7 +141,7 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, waypoint } start, end := waypoint.StartBlock().Uint64(), waypoint.EndBlock().Uint64() - headers, err := hd.p2pService.DownloadHeaders(ctx, start, end, peerId) + headers, err := hd.p2pService.FetchHeaders(ctx, start, end, peerId) if err != nil { hd.logger.Debug( fmt.Sprintf("[%s] issue downloading headers, will try again", headerDownloaderLogPrefix), diff --git a/polygon/sync/header_downloader_test.go b/polygon/sync/header_downloader_test.go index 0f30937def5..2243aafb5f5 100644 --- a/polygon/sync/header_downloader_test.go +++ b/polygon/sync/header_downloader_test.go @@ -159,7 +159,7 @@ func TestHeaderDownloadUsingMilestones(t *testing.T) { Return(test.fakePeers(8)). Times(1) test.p2pService.EXPECT(). - DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + FetchHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(test.defaultDownloadHeadersMock()). Times(4) var persistedHeaders []*types.Header @@ -189,7 +189,7 @@ func TestHeaderDownloadUsingCheckpoints(t *testing.T) { Return(test.fakePeers(2)). Times(4) test.p2pService.EXPECT(). - DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + FetchHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(test.defaultDownloadHeadersMock()). Times(8) var persistedHeaders []*types.Header @@ -233,7 +233,7 @@ func TestHeaderDownloadWhenInvalidStateThenPenalizePeerAndReDownload(t *testing. Return(test.fakePeers(3)). Times(3) test.p2pService.EXPECT(). - DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + FetchHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(test.defaultDownloadHeadersMock()). // request 1,2,3 in parallel // -> 2 fails @@ -271,7 +271,7 @@ func TestHeaderDownloadWhenZeroPeersTriesAgain(t *testing.T) { Return(test.fakeCheckpoints(8), nil). Times(1) test.p2pService.EXPECT(). - DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + FetchHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(test.defaultDownloadHeadersMock()). Times(8) var persistedHeaders []*types.Header