From 6b39197f0b74bc6b4bc5cdf0207e0e1d1fb36744 Mon Sep 17 00:00:00 2001 From: milen <94537774+taratorio@users.noreply.github.com> Date: Mon, 19 Feb 2024 16:59:17 +0530 Subject: [PATCH] polygon/p2p: implement download headers (#9399) --- erigon-lib/direct/sentry_client.go | 1 + erigon-lib/direct/sentry_client_mock.go | 379 ++++++++++++++++++ .../sentry_multi_client.go | 43 +- polygon/p2p/downloader.go | 129 ++++++ polygon/p2p/message_broadcaster.go | 41 ++ polygon/p2p/message_listener.go | 141 +++++++ polygon/p2p/message_observer.go | 11 + polygon/p2p/peer_id.go | 30 ++ polygon/p2p/peer_penalizer.go | 31 ++ polygon/p2p/peer_sync_progress.go | 20 + polygon/p2p/service.go | 80 ++++ polygon/p2p/service_mock.go | 117 ++++++ polygon/p2p/service_test.go | 302 ++++++++++++++ polygon/sync/header_downloader.go | 58 ++- polygon/sync/header_downloader_test.go | 80 ++-- polygon/sync/peer_with_block_num_info.go | 22 - polygon/sync/sentry.go | 16 - polygon/sync/sentry_mock.go | 92 ----- polygon/sync/storage.go | 2 +- 19 files changed, 1387 insertions(+), 208 deletions(-) create mode 100644 erigon-lib/direct/sentry_client_mock.go create mode 100644 polygon/p2p/downloader.go create mode 100644 polygon/p2p/message_broadcaster.go create mode 100644 polygon/p2p/message_listener.go create mode 100644 polygon/p2p/message_observer.go create mode 100644 polygon/p2p/peer_id.go create mode 100644 polygon/p2p/peer_penalizer.go create mode 100644 polygon/p2p/peer_sync_progress.go create mode 100644 polygon/p2p/service.go create mode 100644 polygon/p2p/service_mock.go create mode 100644 polygon/p2p/service_test.go delete mode 100644 polygon/sync/peer_with_block_num_info.go delete mode 100644 polygon/sync/sentry.go delete mode 100644 polygon/sync/sentry_mock.go diff --git a/erigon-lib/direct/sentry_client.go b/erigon-lib/direct/sentry_client.go index cf01003e71d..baab93d87a8 100644 --- a/erigon-lib/direct/sentry_client.go +++ b/erigon-lib/direct/sentry_client.go @@ -100,6 +100,7 @@ var ProtoIds = map[uint]map[sentry.MessageId]struct{}{ }, } +//go:generate mockgen -destination=./sentry_client_mock.go -package=direct . SentryClient type SentryClient interface { sentry.SentryClient Protocol() uint diff --git a/erigon-lib/direct/sentry_client_mock.go b/erigon-lib/direct/sentry_client_mock.go new file mode 100644 index 00000000000..6f10b71d0fa --- /dev/null +++ b/erigon-lib/direct/sentry_client_mock.go @@ -0,0 +1,379 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ledgerwatch/erigon-lib/direct (interfaces: SentryClient) + +// Package direct is a generated GoMock package. +package direct + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + types "github.com/ledgerwatch/erigon-lib/gointerfaces/types" + grpc "google.golang.org/grpc" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// MockSentryClient is a mock of SentryClient interface. +type MockSentryClient struct { + ctrl *gomock.Controller + recorder *MockSentryClientMockRecorder +} + +// MockSentryClientMockRecorder is the mock recorder for MockSentryClient. +type MockSentryClientMockRecorder struct { + mock *MockSentryClient +} + +// NewMockSentryClient creates a new mock instance. +func NewMockSentryClient(ctrl *gomock.Controller) *MockSentryClient { + mock := &MockSentryClient{ctrl: ctrl} + mock.recorder = &MockSentryClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSentryClient) EXPECT() *MockSentryClientMockRecorder { + return m.recorder +} + +// AddPeer mocks base method. +func (m *MockSentryClient) AddPeer(arg0 context.Context, arg1 *sentry.AddPeerRequest, arg2 ...grpc.CallOption) (*sentry.AddPeerReply, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "AddPeer", varargs...) + ret0, _ := ret[0].(*sentry.AddPeerReply) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AddPeer indicates an expected call of AddPeer. +func (mr *MockSentryClientMockRecorder) AddPeer(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPeer", reflect.TypeOf((*MockSentryClient)(nil).AddPeer), varargs...) +} + +// HandShake mocks base method. +func (m *MockSentryClient) HandShake(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*sentry.HandShakeReply, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "HandShake", varargs...) + ret0, _ := ret[0].(*sentry.HandShakeReply) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HandShake indicates an expected call of HandShake. +func (mr *MockSentryClientMockRecorder) HandShake(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandShake", reflect.TypeOf((*MockSentryClient)(nil).HandShake), varargs...) +} + +// MarkDisconnected mocks base method. +func (m *MockSentryClient) MarkDisconnected() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "MarkDisconnected") +} + +// MarkDisconnected indicates an expected call of MarkDisconnected. +func (mr *MockSentryClientMockRecorder) MarkDisconnected() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkDisconnected", reflect.TypeOf((*MockSentryClient)(nil).MarkDisconnected)) +} + +// Messages mocks base method. +func (m *MockSentryClient) Messages(arg0 context.Context, arg1 *sentry.MessagesRequest, arg2 ...grpc.CallOption) (sentry.Sentry_MessagesClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Messages", varargs...) + ret0, _ := ret[0].(sentry.Sentry_MessagesClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Messages indicates an expected call of Messages. +func (mr *MockSentryClientMockRecorder) Messages(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Messages", reflect.TypeOf((*MockSentryClient)(nil).Messages), varargs...) +} + +// NodeInfo mocks base method. +func (m *MockSentryClient) NodeInfo(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*types.NodeInfoReply, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "NodeInfo", varargs...) + ret0, _ := ret[0].(*types.NodeInfoReply) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NodeInfo indicates an expected call of NodeInfo. +func (mr *MockSentryClientMockRecorder) NodeInfo(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NodeInfo", reflect.TypeOf((*MockSentryClient)(nil).NodeInfo), varargs...) +} + +// PeerById mocks base method. +func (m *MockSentryClient) PeerById(arg0 context.Context, arg1 *sentry.PeerByIdRequest, arg2 ...grpc.CallOption) (*sentry.PeerByIdReply, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PeerById", varargs...) + ret0, _ := ret[0].(*sentry.PeerByIdReply) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PeerById indicates an expected call of PeerById. +func (mr *MockSentryClientMockRecorder) PeerById(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerById", reflect.TypeOf((*MockSentryClient)(nil).PeerById), varargs...) +} + +// PeerCount mocks base method. +func (m *MockSentryClient) PeerCount(arg0 context.Context, arg1 *sentry.PeerCountRequest, arg2 ...grpc.CallOption) (*sentry.PeerCountReply, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PeerCount", varargs...) + ret0, _ := ret[0].(*sentry.PeerCountReply) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PeerCount indicates an expected call of PeerCount. +func (mr *MockSentryClientMockRecorder) PeerCount(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerCount", reflect.TypeOf((*MockSentryClient)(nil).PeerCount), varargs...) +} + +// PeerEvents mocks base method. +func (m *MockSentryClient) PeerEvents(arg0 context.Context, arg1 *sentry.PeerEventsRequest, arg2 ...grpc.CallOption) (sentry.Sentry_PeerEventsClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PeerEvents", varargs...) + ret0, _ := ret[0].(sentry.Sentry_PeerEventsClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PeerEvents indicates an expected call of PeerEvents. +func (mr *MockSentryClientMockRecorder) PeerEvents(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerEvents", reflect.TypeOf((*MockSentryClient)(nil).PeerEvents), varargs...) +} + +// PeerMinBlock mocks base method. +func (m *MockSentryClient) PeerMinBlock(arg0 context.Context, arg1 *sentry.PeerMinBlockRequest, arg2 ...grpc.CallOption) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PeerMinBlock", varargs...) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PeerMinBlock indicates an expected call of PeerMinBlock. +func (mr *MockSentryClientMockRecorder) PeerMinBlock(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerMinBlock", reflect.TypeOf((*MockSentryClient)(nil).PeerMinBlock), varargs...) +} + +// Peers mocks base method. +func (m *MockSentryClient) Peers(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*sentry.PeersReply, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Peers", varargs...) + ret0, _ := ret[0].(*sentry.PeersReply) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Peers indicates an expected call of Peers. +func (mr *MockSentryClientMockRecorder) Peers(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockSentryClient)(nil).Peers), varargs...) +} + +// PenalizePeer mocks base method. +func (m *MockSentryClient) PenalizePeer(arg0 context.Context, arg1 *sentry.PenalizePeerRequest, arg2 ...grpc.CallOption) (*emptypb.Empty, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PenalizePeer", varargs...) + ret0, _ := ret[0].(*emptypb.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PenalizePeer indicates an expected call of PenalizePeer. +func (mr *MockSentryClientMockRecorder) PenalizePeer(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PenalizePeer", reflect.TypeOf((*MockSentryClient)(nil).PenalizePeer), varargs...) +} + +// Protocol mocks base method. +func (m *MockSentryClient) Protocol() uint { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Protocol") + ret0, _ := ret[0].(uint) + return ret0 +} + +// Protocol indicates an expected call of Protocol. +func (mr *MockSentryClientMockRecorder) Protocol() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Protocol", reflect.TypeOf((*MockSentryClient)(nil).Protocol)) +} + +// Ready mocks base method. +func (m *MockSentryClient) Ready() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ready") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Ready indicates an expected call of Ready. +func (mr *MockSentryClientMockRecorder) Ready() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ready", reflect.TypeOf((*MockSentryClient)(nil).Ready)) +} + +// SendMessageById mocks base method. +func (m *MockSentryClient) SendMessageById(arg0 context.Context, arg1 *sentry.SendMessageByIdRequest, arg2 ...grpc.CallOption) (*sentry.SentPeers, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SendMessageById", varargs...) + ret0, _ := ret[0].(*sentry.SentPeers) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMessageById indicates an expected call of SendMessageById. +func (mr *MockSentryClientMockRecorder) SendMessageById(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageById", reflect.TypeOf((*MockSentryClient)(nil).SendMessageById), varargs...) +} + +// SendMessageByMinBlock mocks base method. +func (m *MockSentryClient) SendMessageByMinBlock(arg0 context.Context, arg1 *sentry.SendMessageByMinBlockRequest, arg2 ...grpc.CallOption) (*sentry.SentPeers, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SendMessageByMinBlock", varargs...) + ret0, _ := ret[0].(*sentry.SentPeers) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMessageByMinBlock indicates an expected call of SendMessageByMinBlock. +func (mr *MockSentryClientMockRecorder) SendMessageByMinBlock(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageByMinBlock", reflect.TypeOf((*MockSentryClient)(nil).SendMessageByMinBlock), varargs...) +} + +// SendMessageToAll mocks base method. +func (m *MockSentryClient) SendMessageToAll(arg0 context.Context, arg1 *sentry.OutboundMessageData, arg2 ...grpc.CallOption) (*sentry.SentPeers, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SendMessageToAll", varargs...) + ret0, _ := ret[0].(*sentry.SentPeers) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMessageToAll indicates an expected call of SendMessageToAll. +func (mr *MockSentryClientMockRecorder) SendMessageToAll(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageToAll", reflect.TypeOf((*MockSentryClient)(nil).SendMessageToAll), varargs...) +} + +// SendMessageToRandomPeers mocks base method. +func (m *MockSentryClient) SendMessageToRandomPeers(arg0 context.Context, arg1 *sentry.SendMessageToRandomPeersRequest, arg2 ...grpc.CallOption) (*sentry.SentPeers, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SendMessageToRandomPeers", varargs...) + ret0, _ := ret[0].(*sentry.SentPeers) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMessageToRandomPeers indicates an expected call of SendMessageToRandomPeers. +func (mr *MockSentryClientMockRecorder) SendMessageToRandomPeers(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageToRandomPeers", reflect.TypeOf((*MockSentryClient)(nil).SendMessageToRandomPeers), varargs...) +} + +// SetStatus mocks base method. +func (m *MockSentryClient) SetStatus(arg0 context.Context, arg1 *sentry.StatusData, arg2 ...grpc.CallOption) (*sentry.SetStatusReply, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SetStatus", varargs...) + ret0, _ := ret[0].(*sentry.SetStatusReply) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SetStatus indicates an expected call of SetStatus. +func (mr *MockSentryClientMockRecorder) SetStatus(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetStatus", reflect.TypeOf((*MockSentryClient)(nil).SetStatus), varargs...) +} diff --git a/p2p/sentry/sentry_multi_client/sentry_multi_client.go b/p2p/sentry/sentry_multi_client/sentry_multi_client.go index a95d68af794..8d2501e59b9 100644 --- a/p2p/sentry/sentry_multi_client/sentry_multi_client.go +++ b/p2p/sentry/sentry_multi_client/sentry_multi_client.go @@ -44,8 +44,13 @@ import ( "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" ) -type sentryMessageStream grpc.ClientStream -type sentryMessageStreamFactory func(context.Context, direct.SentryClient) (sentryMessageStream, error) +type ( + SentryMessageStream grpc.ClientStream + SentryMessageStreamFactory func(context.Context, direct.SentryClient) (SentryMessageStream, error) + StatusDataFactory func() *proto_sentry.StatusData + MessageFactory[T any] func() T + MessageHandler[T any] func(context.Context, T, direct.SentryClient) error +) // StartStreamLoops starts message processing loops for all sentries. // The processing happens in several streams: @@ -73,11 +78,11 @@ func (cs *MultiClient) RecvUploadMessageLoop( eth.ToProto[direct.ETH66][eth.GetBlockBodiesMsg], eth.ToProto[direct.ETH66][eth.GetReceiptsMsg], } - streamFactory := func(streamCtx context.Context, sentry direct.SentryClient) (sentryMessageStream, error) { + streamFactory := func(streamCtx context.Context, sentry direct.SentryClient) (SentryMessageStream, error) { return sentry.Messages(streamCtx, &proto_sentry.MessagesRequest{Ids: ids}, grpc.WaitForReady(true)) } - sentryReconnectAndPumpStreamLoop(ctx, sentry, cs.makeStatusData, "RecvUploadMessage", streamFactory, makeInboundMessage, cs.HandleInboundMessage, wg, cs.logger) + SentryReconnectAndPumpStreamLoop(ctx, sentry, cs.makeStatusData, "RecvUploadMessage", streamFactory, MakeInboundMessage, cs.HandleInboundMessage, wg, cs.logger) } func (cs *MultiClient) RecvUploadHeadersMessageLoop( @@ -88,11 +93,11 @@ func (cs *MultiClient) RecvUploadHeadersMessageLoop( ids := []proto_sentry.MessageId{ eth.ToProto[direct.ETH66][eth.GetBlockHeadersMsg], } - streamFactory := func(streamCtx context.Context, sentry direct.SentryClient) (sentryMessageStream, error) { + streamFactory := func(streamCtx context.Context, sentry direct.SentryClient) (SentryMessageStream, error) { return sentry.Messages(streamCtx, &proto_sentry.MessagesRequest{Ids: ids}, grpc.WaitForReady(true)) } - sentryReconnectAndPumpStreamLoop(ctx, sentry, cs.makeStatusData, "RecvUploadHeadersMessage", streamFactory, makeInboundMessage, cs.HandleInboundMessage, wg, cs.logger) + SentryReconnectAndPumpStreamLoop(ctx, sentry, cs.makeStatusData, "RecvUploadHeadersMessage", streamFactory, MakeInboundMessage, cs.HandleInboundMessage, wg, cs.logger) } func (cs *MultiClient) RecvMessageLoop( @@ -106,11 +111,11 @@ func (cs *MultiClient) RecvMessageLoop( eth.ToProto[direct.ETH66][eth.NewBlockHashesMsg], eth.ToProto[direct.ETH66][eth.NewBlockMsg], } - streamFactory := func(streamCtx context.Context, sentry direct.SentryClient) (sentryMessageStream, error) { + streamFactory := func(streamCtx context.Context, sentry direct.SentryClient) (SentryMessageStream, error) { return sentry.Messages(streamCtx, &proto_sentry.MessagesRequest{Ids: ids}, grpc.WaitForReady(true)) } - sentryReconnectAndPumpStreamLoop(ctx, sentry, cs.makeStatusData, "RecvMessage", streamFactory, makeInboundMessage, cs.HandleInboundMessage, wg, cs.logger) + SentryReconnectAndPumpStreamLoop(ctx, sentry, cs.makeStatusData, "RecvMessage", streamFactory, MakeInboundMessage, cs.HandleInboundMessage, wg, cs.logger) } func (cs *MultiClient) PeerEventsLoop( @@ -118,24 +123,24 @@ func (cs *MultiClient) PeerEventsLoop( sentry direct.SentryClient, wg *sync.WaitGroup, ) { - streamFactory := func(streamCtx context.Context, sentry direct.SentryClient) (sentryMessageStream, error) { + streamFactory := func(streamCtx context.Context, sentry direct.SentryClient) (SentryMessageStream, error) { return sentry.PeerEvents(streamCtx, &proto_sentry.PeerEventsRequest{}, grpc.WaitForReady(true)) } messageFactory := func() *proto_sentry.PeerEvent { return new(proto_sentry.PeerEvent) } - sentryReconnectAndPumpStreamLoop(ctx, sentry, cs.makeStatusData, "PeerEvents", streamFactory, messageFactory, cs.HandlePeerEvent, wg, cs.logger) + SentryReconnectAndPumpStreamLoop(ctx, sentry, cs.makeStatusData, "PeerEvents", streamFactory, messageFactory, cs.HandlePeerEvent, wg, cs.logger) } -func sentryReconnectAndPumpStreamLoop[TMessage interface{}]( +func SentryReconnectAndPumpStreamLoop[TMessage interface{}]( ctx context.Context, sentry direct.SentryClient, - statusDataFactory func() *proto_sentry.StatusData, + statusDataFactory StatusDataFactory, streamName string, - streamFactory sentryMessageStreamFactory, - messageFactory func() TMessage, - handleInboundMessage func(context.Context, TMessage, direct.SentryClient) error, + streamFactory SentryMessageStreamFactory, + messageFactory MessageFactory[TMessage], + handleInboundMessage MessageHandler[TMessage], wg *sync.WaitGroup, logger log.Logger, ) { @@ -191,9 +196,9 @@ func pumpStreamLoop[TMessage interface{}]( ctx context.Context, sentry direct.SentryClient, streamName string, - streamFactory sentryMessageStreamFactory, - messageFactory func() TMessage, - handleInboundMessage func(context.Context, TMessage, direct.SentryClient) error, + streamFactory SentryMessageStreamFactory, + messageFactory MessageFactory[TMessage], + handleInboundMessage MessageHandler[TMessage], wg *sync.WaitGroup, logger log.Logger, ) (err error) { @@ -698,7 +703,7 @@ func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.In return nil } -func makeInboundMessage() *proto_sentry.InboundMessage { +func MakeInboundMessage() *proto_sentry.InboundMessage { return new(proto_sentry.InboundMessage) } diff --git a/polygon/p2p/downloader.go b/polygon/p2p/downloader.go new file mode 100644 index 00000000000..b0dd2274f8a --- /dev/null +++ b/polygon/p2p/downloader.go @@ -0,0 +1,129 @@ +package p2p + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/errgroup" + + "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/rlp" +) + +const reqRespTimeout = 5 * time.Second + +var invalidDownloadHeadersRangeErr = errors.New("invalid download headers range") + +type RequestIdGenerator func() uint64 + +type Downloader interface { + DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) +} + +func NewDownloader( + logger log.Logger, + messageListener MessageListener, + messageBroadcaster MessageBroadcaster, + peerPenalizer PeerPenalizer, + requestIdGenerator RequestIdGenerator, +) Downloader { + return &downloader{ + logger: logger, + messageListener: messageListener, + messageBroadcaster: messageBroadcaster, + peerPenalizer: peerPenalizer, + requestIdGenerator: requestIdGenerator, + } +} + +type downloader struct { + logger log.Logger + messageListener MessageListener + messageBroadcaster MessageBroadcaster + peerPenalizer PeerPenalizer + requestIdGenerator RequestIdGenerator +} + +func (d *downloader) DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) { + if start > end { + return nil, fmt.Errorf("%w: start=%d, end=%d", invalidDownloadHeadersRangeErr, start, end) + } + + var headers []*types.Header + requestId := d.requestIdGenerator() + + observer := make(ChanMessageObserver[*sentry.InboundMessage]) + d.messageListener.RegisterBlockHeaders66Observer(observer) + defer d.messageListener.UnregisterBlockHeaders66Observer(observer) + + ctx, cancel := context.WithTimeout(ctx, reqRespTimeout) + defer cancel() + g, ctx := errgroup.WithContext(ctx) + + // + // TODO chunk request into smaller ranges if needed to fit in the 10 MiB response size + // TODO peer should return <= amount, check for > amount and penalize peer + // + + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("interrupted while waiting for msg from peer: %w", ctx.Err()) + case msg := <-observer: + msgPeerId := PeerIdFromH512(msg.PeerId) + if msgPeerId != peerId { + continue + } + + var pkt eth.BlockHeadersPacket66 + if err := rlp.DecodeBytes(msg.Data, &pkt); err != nil { + if rlp.IsInvalidRLPError(err) { + d.logger.Debug("penalizing peer for invalid rlp response", "peerId", peerId) + penalizeErr := d.peerPenalizer.Penalize(ctx, peerId) + if penalizeErr != nil { + err = fmt.Errorf("%w: %w", penalizeErr, err) + } + } + + return fmt.Errorf("failed to decode BlockHeadersPacket66: %w", err) + } + + if pkt.RequestId != requestId { + continue + } + + headers = pkt.BlockHeadersPacket + return nil + } + } + }) + + g.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + return d.messageBroadcaster.GetBlockHeaders66(ctx, peerId, eth.GetBlockHeadersPacket66{ + RequestId: requestId, + GetBlockHeadersPacket: ð.GetBlockHeadersPacket{ + Origin: eth.HashOrNumber{ + Number: start, + }, + Amount: end - start + 1, + }, + }) + } + }) + + if err := g.Wait(); err != nil { + return nil, err + } + + return headers, nil +} diff --git a/polygon/p2p/message_broadcaster.go b/polygon/p2p/message_broadcaster.go new file mode 100644 index 00000000000..a1e5022e2f6 --- /dev/null +++ b/polygon/p2p/message_broadcaster.go @@ -0,0 +1,41 @@ +package p2p + +import ( + "context" + + "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/rlp" +) + +type MessageBroadcaster interface { + GetBlockHeaders66(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error +} + +func NewMessageBroadcaster(sentryClient direct.SentryClient) MessageBroadcaster { + return &messageBroadcaster{ + sentryClient: sentryClient, + } +} + +type messageBroadcaster struct { + sentryClient direct.SentryClient +} + +func (mb *messageBroadcaster) GetBlockHeaders66(ctx context.Context, peerId PeerId, req eth.GetBlockHeadersPacket66) error { + data, err := rlp.EncodeToBytes(req) + if err != nil { + return err + } + + _, err = mb.sentryClient.SendMessageById(ctx, &sentry.SendMessageByIdRequest{ + PeerId: peerId.H512(), + Data: &sentry.OutboundMessageData{ + Id: sentry.MessageId_GET_BLOCK_HEADERS_66, + Data: data, + }, + }) + + return err +} diff --git a/polygon/p2p/message_listener.go b/polygon/p2p/message_listener.go new file mode 100644 index 00000000000..362e1981f39 --- /dev/null +++ b/polygon/p2p/message_listener.go @@ -0,0 +1,141 @@ +package p2p + +import ( + "context" + "sync" + + "github.com/ledgerwatch/log/v3" + "google.golang.org/grpc" + + "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + sentrymulticlient "github.com/ledgerwatch/erigon/p2p/sentry/sentry_multi_client" +) + +type MessageListener interface { + Start(ctx context.Context) + Stop() + RegisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) + UnregisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) +} + +func NewMessageListener(logger log.Logger, sentryClient direct.SentryClient) MessageListener { + return &messageListener{ + logger: logger, + sentryClient: sentryClient, + inboundMessageObservers: map[sentry.MessageId]map[MessageObserver[*sentry.InboundMessage]]struct{}{}, + } +} + +type messageListener struct { + once sync.Once + streamCtx context.Context + streamCtxCancel context.CancelFunc + logger log.Logger + sentryClient direct.SentryClient + observersMu sync.Mutex + inboundMessageObservers map[sentry.MessageId]map[MessageObserver[*sentry.InboundMessage]]struct{} + stopWg sync.WaitGroup +} + +func (ml *messageListener) Start(ctx context.Context) { + ml.once.Do(func() { + ml.streamCtx, ml.streamCtxCancel = context.WithCancel(ctx) + go ml.listenBlockHeaders66() + }) +} + +func (ml *messageListener) Stop() { + ml.streamCtxCancel() + ml.stopWg.Wait() +} + +func (ml *messageListener) RegisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) { + ml.registerInboundMessageObserver(observer, sentry.MessageId_BLOCK_HEADERS_66) +} + +func (ml *messageListener) UnregisterBlockHeaders66Observer(observer MessageObserver[*sentry.InboundMessage]) { + ml.unregisterInboundMessageObserver(observer, sentry.MessageId_BLOCK_HEADERS_66) +} + +func (ml *messageListener) registerInboundMessageObserver(observer MessageObserver[*sentry.InboundMessage], messageId sentry.MessageId) { + ml.observersMu.Lock() + defer ml.observersMu.Unlock() + + if observers, ok := ml.inboundMessageObservers[messageId]; ok { + observers[observer] = struct{}{} + } else { + ml.inboundMessageObservers[messageId] = map[MessageObserver[*sentry.InboundMessage]]struct{}{ + observer: {}, + } + } +} + +func (ml *messageListener) unregisterInboundMessageObserver(observer MessageObserver[*sentry.InboundMessage], messageId sentry.MessageId) { + ml.observersMu.Lock() + defer ml.observersMu.Unlock() + + if observers, ok := ml.inboundMessageObservers[messageId]; ok { + delete(observers, observer) + } +} + +func (ml *messageListener) listenBlockHeaders66() { + ml.listenInboundMessage("BlockHeaders66", sentry.MessageId_BLOCK_HEADERS_66) +} + +func (ml *messageListener) listenInboundMessage(name string, msgId sentry.MessageId) { + ml.stopWg.Add(1) + defer ml.stopWg.Done() + + sentrymulticlient.SentryReconnectAndPumpStreamLoop( + ml.streamCtx, + ml.sentryClient, + ml.statusDataFactory(), + name, + ml.messageStreamFactory([]sentry.MessageId{msgId}), + ml.inboundMessageFactory(), + ml.inboundMessageHandler(), + nil, + ml.logger, + ) +} + +func (ml *messageListener) statusDataFactory() sentrymulticlient.StatusDataFactory { + return func() *sentry.StatusData { + return &sentry.StatusData{} + } +} + +func (ml *messageListener) messageStreamFactory(ids []sentry.MessageId) sentrymulticlient.SentryMessageStreamFactory { + return func(streamCtx context.Context, sentryClient direct.SentryClient) (sentrymulticlient.SentryMessageStream, error) { + return sentryClient.Messages(streamCtx, &sentry.MessagesRequest{Ids: ids}, grpc.WaitForReady(true)) + } +} + +func (ml *messageListener) inboundMessageFactory() sentrymulticlient.MessageFactory[*sentry.InboundMessage] { + return func() *sentry.InboundMessage { + return new(sentry.InboundMessage) + } +} + +func (ml *messageListener) inboundMessageHandler() sentrymulticlient.MessageHandler[*sentry.InboundMessage] { + return func(_ context.Context, msg *sentry.InboundMessage, _ direct.SentryClient) error { + ml.notifyInboundMessageObservers(msg) + return nil + } +} + +func (ml *messageListener) notifyInboundMessageObservers(msg *sentry.InboundMessage) { + ml.observersMu.Lock() + defer ml.observersMu.Unlock() + + observers, ok := ml.inboundMessageObservers[msg.Id] + if !ok { + return + } + + for observer := range observers { + go observer.Notify(msg) + } +} diff --git a/polygon/p2p/message_observer.go b/polygon/p2p/message_observer.go new file mode 100644 index 00000000000..77d082d8376 --- /dev/null +++ b/polygon/p2p/message_observer.go @@ -0,0 +1,11 @@ +package p2p + +type MessageObserver[M any] interface { + Notify(M) +} + +type ChanMessageObserver[M any] chan M + +func (cmo ChanMessageObserver[M]) Notify(msg M) { + cmo <- msg +} diff --git a/polygon/p2p/peer_id.go b/polygon/p2p/peer_id.go new file mode 100644 index 00000000000..94a3f72d4f0 --- /dev/null +++ b/polygon/p2p/peer_id.go @@ -0,0 +1,30 @@ +package p2p + +import ( + "encoding/binary" + "encoding/hex" + + "github.com/ledgerwatch/erigon-lib/gointerfaces" + "github.com/ledgerwatch/erigon-lib/gointerfaces/types" +) + +func PeerIdFromH512(peerId *types.H512) PeerId { + return gointerfaces.ConvertH512ToHash(peerId) +} + +// PeerIdFromUint64 is useful for testing and that is its main intended purpose +func PeerIdFromUint64(peerId uint64) PeerId { + var peerIdBytes [64]byte + binary.BigEndian.PutUint64(peerIdBytes[:8], peerId) + return peerIdBytes +} + +type PeerId [64]byte + +func (pid PeerId) H512() *types.H512 { + return gointerfaces.ConvertHashToH512(pid) +} + +func (pid PeerId) String() string { + return hex.EncodeToString(pid[:]) +} diff --git a/polygon/p2p/peer_penalizer.go b/polygon/p2p/peer_penalizer.go new file mode 100644 index 00000000000..3763b8ebb4b --- /dev/null +++ b/polygon/p2p/peer_penalizer.go @@ -0,0 +1,31 @@ +package p2p + +import ( + "context" + + "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" +) + +func NewPeerPenalizer(sentryClient direct.SentryClient) PeerPenalizer { + return &peerPenalizer{ + sentryClient: sentryClient, + } +} + +type PeerPenalizer interface { + Penalize(ctx context.Context, peerId PeerId) error +} + +type peerPenalizer struct { + sentryClient direct.SentryClient +} + +func (pp *peerPenalizer) Penalize(ctx context.Context, peerId PeerId) error { + _, err := pp.sentryClient.PenalizePeer(ctx, &sentry.PenalizePeerRequest{ + PeerId: peerId.H512(), + Penalty: sentry.PenaltyKind_Kick, + }) + + return err +} diff --git a/polygon/p2p/peer_sync_progress.go b/polygon/p2p/peer_sync_progress.go new file mode 100644 index 00000000000..38b73edbb22 --- /dev/null +++ b/polygon/p2p/peer_sync_progress.go @@ -0,0 +1,20 @@ +package p2p + +type PeerSyncProgress struct { + Id PeerId + MaxSeenBlockNum uint64 +} + +type PeersSyncProgress []*PeerSyncProgress + +func (psp PeersSyncProgress) Len() int { + return len(psp) +} + +func (psp PeersSyncProgress) Less(i int, j int) bool { + return psp[i].MaxSeenBlockNum < psp[j].MaxSeenBlockNum +} + +func (psp PeersSyncProgress) Swap(i int, j int) { + psp[i], psp[j] = psp[j], psp[i] +} diff --git a/polygon/p2p/service.go b/polygon/p2p/service.go new file mode 100644 index 00000000000..27190b0062c --- /dev/null +++ b/polygon/p2p/service.go @@ -0,0 +1,80 @@ +package p2p + +import ( + "context" + "math/rand" + "sync" + + "github.com/ledgerwatch/log/v3" + + "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/p2p" +) + +//go:generate mockgen -destination=./service_mock.go -package=p2p . Service +type Service interface { + Start(ctx context.Context) + Stop() + MaxPeers() int + PeersSyncProgress() PeersSyncProgress + DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) + Penalize(ctx context.Context, peerId PeerId) error +} + +func NewService(config p2p.Config, logger log.Logger, sentryClient direct.SentryClient) Service { + return newService(config, logger, sentryClient, rand.Uint64) +} + +func newService( + config p2p.Config, + logger log.Logger, + sentryClient direct.SentryClient, + requestIdGenerator RequestIdGenerator, +) Service { + messageListener := NewMessageListener(logger, sentryClient) + messageBroadcaster := NewMessageBroadcaster(sentryClient) + peerPenalizer := NewPeerPenalizer(sentryClient) + downloader := NewDownloader(logger, messageListener, messageBroadcaster, peerPenalizer, requestIdGenerator) + return &service{ + config: config, + downloader: downloader, + messageListener: messageListener, + peerPenalizer: peerPenalizer, + } +} + +type service struct { + once sync.Once + config p2p.Config + downloader Downloader + messageListener MessageListener + peerPenalizer PeerPenalizer +} + +func (s *service) Start(ctx context.Context) { + s.once.Do(func() { + s.messageListener.Start(ctx) + }) +} + +func (s *service) Stop() { + s.messageListener.Stop() +} + +func (s *service) MaxPeers() int { + return s.config.MaxPeers +} + +func (s *service) DownloadHeaders(ctx context.Context, start uint64, end uint64, peerId PeerId) ([]*types.Header, error) { + return s.downloader.DownloadHeaders(ctx, start, end, peerId) +} + +func (s *service) Penalize(ctx context.Context, peerId PeerId) error { + return s.peerPenalizer.Penalize(ctx, peerId) +} + +func (s *service) PeersSyncProgress() PeersSyncProgress { + // TODO implement peer tracker + return nil +} diff --git a/polygon/p2p/service_mock.go b/polygon/p2p/service_mock.go new file mode 100644 index 00000000000..5bc5899ce24 --- /dev/null +++ b/polygon/p2p/service_mock.go @@ -0,0 +1,117 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ledgerwatch/erigon/polygon/p2p (interfaces: Service) + +// Package p2p is a generated GoMock package. +package p2p + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + types "github.com/ledgerwatch/erigon/core/types" +) + +// MockService is a mock of Service interface. +type MockService struct { + ctrl *gomock.Controller + recorder *MockServiceMockRecorder +} + +// MockServiceMockRecorder is the mock recorder for MockService. +type MockServiceMockRecorder struct { + mock *MockService +} + +// NewMockService creates a new mock instance. +func NewMockService(ctrl *gomock.Controller) *MockService { + mock := &MockService{ctrl: ctrl} + mock.recorder = &MockServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockService) EXPECT() *MockServiceMockRecorder { + return m.recorder +} + +// DownloadHeaders mocks base method. +func (m *MockService) DownloadHeaders(arg0 context.Context, arg1, arg2 uint64, arg3 PeerId) ([]*types.Header, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DownloadHeaders", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].([]*types.Header) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DownloadHeaders indicates an expected call of DownloadHeaders. +func (mr *MockServiceMockRecorder) DownloadHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadHeaders", reflect.TypeOf((*MockService)(nil).DownloadHeaders), arg0, arg1, arg2, arg3) +} + +// MaxPeers mocks base method. +func (m *MockService) MaxPeers() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxPeers") + ret0, _ := ret[0].(int) + return ret0 +} + +// MaxPeers indicates an expected call of MaxPeers. +func (mr *MockServiceMockRecorder) MaxPeers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxPeers", reflect.TypeOf((*MockService)(nil).MaxPeers)) +} + +// PeersSyncProgress mocks base method. +func (m *MockService) PeersSyncProgress() PeersSyncProgress { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PeersSyncProgress") + ret0, _ := ret[0].(PeersSyncProgress) + return ret0 +} + +// PeersSyncProgress indicates an expected call of PeersSyncProgress. +func (mr *MockServiceMockRecorder) PeersSyncProgress() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeersSyncProgress", reflect.TypeOf((*MockService)(nil).PeersSyncProgress)) +} + +// Penalize mocks base method. +func (m *MockService) Penalize(arg0 context.Context, arg1 PeerId) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Penalize", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Penalize indicates an expected call of Penalize. +func (mr *MockServiceMockRecorder) Penalize(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Penalize", reflect.TypeOf((*MockService)(nil).Penalize), arg0, arg1) +} + +// Start mocks base method. +func (m *MockService) Start(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start", arg0) +} + +// Start indicates an expected call of Start. +func (mr *MockServiceMockRecorder) Start(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockService)(nil).Start), arg0) +} + +// Stop mocks base method. +func (m *MockService) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockServiceMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockService)(nil).Stop)) +} diff --git a/polygon/p2p/service_test.go b/polygon/p2p/service_test.go new file mode 100644 index 00000000000..8d3820dc4a0 --- /dev/null +++ b/polygon/p2p/service_test.go @@ -0,0 +1,302 @@ +package p2p + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/ledgerwatch/log/v3" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/protocols/eth" + "github.com/ledgerwatch/erigon/p2p" + "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/testlog" +) + +func newServiceTest(t *testing.T, requestIdGenerator RequestIdGenerator) *serviceTest { + ctrl := gomock.NewController(t) + logger := testlog.Logger(t, log.LvlTrace) + sentryClient := direct.NewMockSentryClient(ctrl) + return &serviceTest{ + t: t, + sentryClient: sentryClient, + service: newService(p2p.Config{}, logger, sentryClient, requestIdGenerator), + } +} + +type serviceTest struct { + t *testing.T + sentryClient *direct.MockSentryClient + service Service +} + +// run is needed so that we can properly shut down tests involving the p2p service due to how the sentry multi +// client SentryReconnectAndPumpStreamLoop works. +// +// Using t.Cleanup to call service.Stop instead does not work since the mocks generated by gomock cause +// an error when their methods are called after a test has finished - t.Cleanup is run after a +// test has finished, and so we need to make sure that the SentryReconnectAndPumpStreamLoop loop has been stopped +// before the test finishes otherwise we will have flaky tests. +// +// If changing the behaviour here please run "go test -v -count=1000 ./polygon/p2p" and +// "go test -v -count=1 -race ./polygon/p2p" to confirm there are no regressions. +func (st *serviceTest) run(ctx context.Context, f func(t *testing.T)) { + st.t.Run("start", func(_ *testing.T) { + st.service.Start(ctx) + }) + + st.t.Run("test", f) + + st.t.Run("stop", func(_ *testing.T) { + st.service.Stop() + }) +} + +func (st *serviceTest) mockExpectPenalizePeer(peerId PeerId) { + st.sentryClient. + EXPECT(). + PenalizePeer(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, req *sentry.PenalizePeerRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + if peerId.H512() != req.PeerId { + return nil, fmt.Errorf("peerId != reqPeerId - %v vs %v", peerId.H512(), req.PeerId) + } + + return &emptypb.Empty{}, nil + }). + Times(1) +} + +func (st *serviceTest) mockSentryBlockHeaders66InboundMessageStream(msgs []*sentry.InboundMessage, peerId PeerId) { + var wg sync.WaitGroup + if len(msgs) > 0 { + wg.Add(1) + } + + st.sentryClient. + EXPECT(). + Messages(gomock.Any(), gomock.Any(), gomock.Any()). + Return(st.mockSentryStream(&wg, msgs), nil). + AnyTimes() + st.sentryClient. + EXPECT(). + SendMessageById(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(newSendGetBlockHeaders66MessageMock(&wg, peerId, sentry.MessageId_GET_BLOCK_HEADERS_66, 1, 3)). + AnyTimes() + st.sentryClient. + EXPECT(). + HandShake(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, nil). + AnyTimes() + st.sentryClient. + EXPECT(). + SetStatus(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, nil). + AnyTimes() + st.sentryClient. + EXPECT(). + MarkDisconnected(). + AnyTimes() +} + +func (st *serviceTest) mockSentryStream(wg *sync.WaitGroup, msgs []*sentry.InboundMessage) sentry.Sentry_MessagesClient { + return &mockSentryMessagesStream{ + wg: wg, + msgs: msgs, + } +} + +type mockSentryMessagesStream struct { + wg *sync.WaitGroup + msgs []*sentry.InboundMessage +} + +func (s *mockSentryMessagesStream) Recv() (*sentry.InboundMessage, error) { + return nil, nil +} + +func (s *mockSentryMessagesStream) Header() (metadata.MD, error) { + return nil, nil +} + +func (s *mockSentryMessagesStream) Trailer() metadata.MD { + return nil +} + +func (s *mockSentryMessagesStream) CloseSend() error { + return nil +} + +func (s *mockSentryMessagesStream) Context() context.Context { + return context.Background() +} + +func (s *mockSentryMessagesStream) SendMsg(_ any) error { + return nil +} + +func (s *mockSentryMessagesStream) RecvMsg(msg any) error { + // wait for something external to happen before stream is allowed to produce values + s.wg.Wait() + + if len(s.msgs) == 0 { + return nil + } + + inboundMsg, ok := msg.(*sentry.InboundMessage) + if !ok { + return errors.New("unexpected msg type") + } + + mockMsg := s.msgs[0] + s.msgs = s.msgs[1:] + inboundMsg.Id = mockMsg.Id + inboundMsg.Data = mockMsg.Data + inboundMsg.PeerId = mockMsg.PeerId + return nil +} + +func newMockRequestGenerator(reqId uint64) RequestIdGenerator { + return func() uint64 { + return reqId + } +} + +func newMockBlockHeadersPacket66Bytes(t *testing.T, requestId uint64) []byte { + blockHeadersPacket66 := eth.BlockHeadersPacket66{ + RequestId: requestId, + BlockHeadersPacket: []*types.Header{ + { + Number: big.NewInt(1), + }, + { + Number: big.NewInt(2), + }, + { + Number: big.NewInt(3), + }, + }, + } + blockHeadersPacket66Bytes, err := rlp.EncodeToBytes(&blockHeadersPacket66) + require.NoError(t, err) + return blockHeadersPacket66Bytes +} + +func newSendGetBlockHeaders66MessageMock( + wg *sync.WaitGroup, + wantPeerId PeerId, + wantMessageId sentry.MessageId, + wantOriginNumber uint64, + wantAmount uint64, +) sendMessageByIdMock { + return func(_ context.Context, req *sentry.SendMessageByIdRequest, _ ...grpc.CallOption) (*sentry.SentPeers, error) { + defer wg.Done() + + reqPeerId := PeerIdFromH512(req.PeerId) + if wantPeerId != reqPeerId { + return nil, fmt.Errorf("wantPeerId != reqPeerId - %v vs %v", wantPeerId, reqPeerId) + } + + if wantMessageId != req.Data.Id { + return nil, fmt.Errorf("wantMessageId != req.Data.Id - %v vs %v", wantMessageId, req.Data.Id) + } + + var pkt eth.GetBlockHeadersPacket66 + if err := rlp.DecodeBytes(req.Data.Data, &pkt); err != nil { + return nil, err + } + + if wantOriginNumber != pkt.Origin.Number { + return nil, fmt.Errorf("wantOriginNumber != pkt.Origin.Number - %v vs %v", wantOriginNumber, pkt.Origin.Number) + } + + if wantAmount != pkt.Amount { + return nil, fmt.Errorf("wantAmount != pkt.Amount - %v vs %v", wantAmount, pkt.Amount) + } + + return nil, nil + } +} + +type sendMessageByIdMock func(context.Context, *sentry.SendMessageByIdRequest, ...grpc.CallOption) (*sentry.SentPeers, error) + +func TestServiceDownloadHeaders(t *testing.T) { + ctx := context.Background() + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockInboundMessages := []*sentry.InboundMessage{ + { + // should get filtered because it is from a different peer id + PeerId: PeerIdFromUint64(2).H512(), + }, + { + // should get filtered because it is for a different msg id + Id: sentry.MessageId_BLOCK_BODIES_66, + }, + { + // should get filtered because it is from a different request id + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: newMockBlockHeadersPacket66Bytes(t, requestId*2), + }, + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: newMockBlockHeadersPacket66Bytes(t, requestId), + }, + } + + test := newServiceTest(t, newMockRequestGenerator(requestId)) + test.mockSentryBlockHeaders66InboundMessageStream(mockInboundMessages, peerId) + test.run(ctx, func(t *testing.T) { + headers, err := test.service.DownloadHeaders(ctx, 1, 3, peerId) + require.NoError(t, err) + require.Len(t, headers, 3) + require.Equal(t, uint64(1), headers[0].Number.Uint64()) + require.Equal(t, uint64(2), headers[1].Number.Uint64()) + require.Equal(t, uint64(3), headers[2].Number.Uint64()) + }) +} + +func TestServiceInvalidDownloadHeadersRangeErr(t *testing.T) { + ctx := context.Background() + test := newServiceTest(t, newMockRequestGenerator(1)) + test.mockSentryBlockHeaders66InboundMessageStream(nil, PeerId{}) + test.run(ctx, func(t *testing.T) { + headers, err := test.service.DownloadHeaders(ctx, 3, 1, PeerIdFromUint64(1)) + require.ErrorIs(t, err, invalidDownloadHeadersRangeErr) + require.Nil(t, headers) + }) +} + +func TestServiceDownloadHeadersShouldPenalizePeerWhenInvalidRlpErr(t *testing.T) { + ctx := context.Background() + peerId := PeerIdFromUint64(1) + requestId := uint64(1234) + mockInboundMessages := []*sentry.InboundMessage{ + { + Id: sentry.MessageId_BLOCK_HEADERS_66, + PeerId: peerId.H512(), + Data: []byte{'i', 'n', 'v', 'a', 'l', 'i', 'd', '.', 'r', 'l', 'p'}, + }, + } + + test := newServiceTest(t, newMockRequestGenerator(requestId)) + test.mockSentryBlockHeaders66InboundMessageStream(mockInboundMessages, peerId) + test.mockExpectPenalizePeer(peerId) + test.run(ctx, func(t *testing.T) { + headers, err := test.service.DownloadHeaders(ctx, 1, 3, peerId) + require.Error(t, err) + require.Nil(t, headers) + }) +} diff --git a/polygon/sync/header_downloader.go b/polygon/sync/header_downloader.go index 8f3cfe3c2f2..3aefa88df84 100644 --- a/polygon/sync/header_downloader.go +++ b/polygon/sync/header_downloader.go @@ -15,24 +15,30 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/polygon/heimdall" + "github.com/ledgerwatch/erigon/polygon/p2p" ) const headerDownloaderLogPrefix = "HeaderDownloader" -func NewHeaderDownloader(logger log.Logger, sentry Sentry, heimdall heimdall.Heimdall, verify AccumulatedHeadersVerifier) *HeaderDownloader { +func NewHeaderDownloader( + logger log.Logger, + p2pService p2p.Service, + heimdall heimdall.Heimdall, + verify AccumulatedHeadersVerifier, +) *HeaderDownloader { return &HeaderDownloader{ - logger: logger, - sentry: sentry, - heimdall: heimdall, - verify: verify, + logger: logger, + p2pService: p2pService, + heimdall: heimdall, + verify: verify, } } type HeaderDownloader struct { - logger log.Logger - sentry Sentry - heimdall heimdall.Heimdall - verify AccumulatedHeadersVerifier + logger log.Logger + p2pService p2p.Service + heimdall heimdall.Heimdall + verify AccumulatedHeadersVerifier } func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, store CheckpointStore, start uint64) error { @@ -65,13 +71,13 @@ func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, store M func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store HeaderStore, waypoints heimdall.Waypoints) error { // waypoint rootHash->[headers part of waypoint] - waypointHeadersMemo, err := lru.New[common.Hash, []*types.Header](hd.sentry.MaxPeers()) + waypointHeadersMemo, err := lru.New[common.Hash, []*types.Header](hd.p2pService.MaxPeers()) if err != nil { return err } for len(waypoints) > 0 { - allPeers := hd.sentry.PeersWithBlockNumInfo() + allPeers := hd.p2pService.PeersSyncProgress() if len(allPeers) == 0 { hd.logger.Warn(fmt.Sprintf("[%s] zero peers, will try again", headerDownloaderLogPrefix)) continue @@ -84,8 +90,8 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He fmt.Sprintf("[%s] can't use any peers to sync, will try again", headerDownloaderLogPrefix), "start", waypoints[0].StartBlock(), "end", waypoints[len(waypoints)-1].EndBlock(), - "minPeerBlockNum", allPeers[0].BlockNum, - "minPeerID", allPeers[0].ID, + "lowestMaxSeenBlockNum", allPeers[0].MaxSeenBlockNum, + "lowestPeerId", allPeers[0].Id, ) continue } @@ -106,7 +112,7 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He for i, waypoint := range waypointsBatch { maxWaypointLength = math.Max(float64(waypoint.Length()), maxWaypointLength) wg.Add(1) - go func(i int, waypoint heimdall.Waypoint, peerID string) { + go func(i int, waypoint heimdall.Waypoint, peerId p2p.PeerId) { defer wg.Done() if headers, ok := waypointHeadersMemo.Get(waypoint.RootHash()); ok { @@ -114,7 +120,8 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He return } - headers, err := hd.sentry.DownloadHeaders(ctx, waypoint.StartBlock(), waypoint.EndBlock(), peerID) + start, end := waypoint.StartBlock().Uint64(), waypoint.EndBlock().Uint64() + headers, err := hd.p2pService.DownloadHeaders(ctx, start, end, peerId) if err != nil { hd.logger.Debug( fmt.Sprintf("[%s] issue downloading headers, will try again", headerDownloaderLogPrefix), @@ -123,7 +130,7 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He "end", waypoint.EndBlock(), "rootHash", waypoint.RootHash(), "kind", reflect.TypeOf(waypoint), - "peerID", peerID, + "peerId", peerId, ) return } @@ -138,16 +145,23 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He "end", waypoint.EndBlock(), "rootHash", waypoint.RootHash(), "kind", reflect.TypeOf(waypoint), - "peerID", peerID, + "peerId", peerId, ) - hd.sentry.Penalize(peerID) + if err := hd.p2pService.Penalize(ctx, peerId); err != nil { + hd.logger.Error( + fmt.Sprintf("[%s] failed to penalize peer", headerDownloaderLogPrefix), + "peerId", peerId, + "err", err, + ) + } + return } waypointHeadersMemo.Add(waypoint.RootHash(), headers) headerBatches[i] = headers - }(i, waypoint, peers[i].ID) + }(i, waypoint, peers[i].Id) } wg.Wait() @@ -192,16 +206,16 @@ func (hd *HeaderDownloader) downloadUsingWaypoints(ctx context.Context, store He } // choosePeers assumes peers are sorted in ascending order based on block num -func (hd *HeaderDownloader) choosePeers(peers PeersWithBlockNumInfo, waypoints heimdall.Waypoints) PeersWithBlockNumInfo { +func (hd *HeaderDownloader) choosePeers(peers p2p.PeersSyncProgress, waypoints heimdall.Waypoints) p2p.PeersSyncProgress { var peersIdx int - chosenPeers := make(PeersWithBlockNumInfo, 0, len(peers)) + chosenPeers := make(p2p.PeersSyncProgress, 0, len(peers)) for _, waypoint := range waypoints { if peersIdx >= len(peers) { break } peer := peers[peersIdx] - if peer.BlockNum.Cmp(waypoint.EndBlock()) > -1 { + if peer.MaxSeenBlockNum >= waypoint.EndBlock().Uint64() { chosenPeers = append(chosenPeers, peer) } diff --git a/polygon/sync/header_downloader_test.go b/polygon/sync/header_downloader_test.go index e424199a605..04996bc0315 100644 --- a/polygon/sync/header_downloader_test.go +++ b/polygon/sync/header_downloader_test.go @@ -15,6 +15,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/polygon/heimdall" + "github.com/ledgerwatch/erigon/polygon/p2p" "github.com/ledgerwatch/erigon/turbo/testlog" ) @@ -27,14 +28,14 @@ func newHeaderDownloaderTestWithOpts(t *testing.T, opts headerDownloaderTestOpts checkpointStore := NewMockCheckpointStore(ctrl) milestoneStore := NewMockMilestoneStore(ctrl) heimdall := heimdall.NewMockHeimdall(ctrl) - sentry := NewMockSentry(ctrl) - sentry.EXPECT().MaxPeers().Return(100).Times(1) + p2pService := p2p.NewMockService(ctrl) + p2pService.EXPECT().MaxPeers().Return(100).Times(1) logger := testlog.Logger(t, log.LvlDebug) headerVerifier := opts.getOrCreateDefaultHeaderVerifier() - headerDownloader := NewHeaderDownloader(logger, sentry, heimdall, headerVerifier) + headerDownloader := NewHeaderDownloader(logger, p2pService, heimdall, headerVerifier) return &headerDownloaderTest{ heimdall: heimdall, - sentry: sentry, + p2pService: p2pService, headerDownloader: headerDownloader, milestoneStore: milestoneStore, checkpointStore: checkpointStore, @@ -57,25 +58,25 @@ func (opts headerDownloaderTestOpts) getOrCreateDefaultHeaderVerifier() Accumula type headerDownloaderTest struct { heimdall *heimdall.MockHeimdall - sentry *MockSentry + p2pService *p2p.MockService milestoneStore *MockMilestoneStore checkpointStore *MockCheckpointStore headerDownloader *HeaderDownloader } -func (hdt headerDownloaderTest) fakePeers(count int, blockNums ...*big.Int) PeersWithBlockNumInfo { - peers := make(PeersWithBlockNumInfo, count) +func (hdt headerDownloaderTest) fakePeers(count int, blockNums ...uint64) p2p.PeersSyncProgress { + peers := make(p2p.PeersSyncProgress, count) for i := range peers { - var blockNum *big.Int + var blockNum uint64 if i < len(blockNums) { blockNum = blockNums[i] } else { - blockNum = new(big.Int).SetUint64(math.MaxUint64) + blockNum = math.MaxUint64 } - peers[i] = &PeerWithBlockNumInfo{ - ID: fmt.Sprintf("peer%d", i+1), - BlockNum: blockNum, + peers[i] = &p2p.PeerSyncProgress{ + Id: p2p.PeerIdFromUint64(uint64(i) + 1), + MaxSeenBlockNum: blockNum, } } @@ -114,14 +115,21 @@ func (hdt headerDownloaderTest) fakeMilestones(count int) heimdall.Waypoints { return milestones } -type downloadHeadersMock func(context.Context, *big.Int, *big.Int, string) ([]*types.Header, error) +type downloadHeadersMock func(ctx context.Context, start uint64, end uint64, peerId p2p.PeerId) ([]*types.Header, error) func (hdt headerDownloaderTest) defaultDownloadHeadersMock() downloadHeadersMock { - return func(ctx context.Context, start *big.Int, end *big.Int, peerID string) ([]*types.Header, error) { - res := make([]*types.Header, new(big.Int).Sub(end, start).Uint64()+1) - for i := new(big.Int).Set(start); i.Cmp(end) < 1; i.Add(i, new(big.Int).SetUint64(1)) { - res[new(big.Int).Sub(i, start).Uint64()] = &types.Header{Number: new(big.Int).Set(i)} + return func(ctx context.Context, start uint64, end uint64, peerId p2p.PeerId) ([]*types.Header, error) { + if start > end { + return nil, fmt.Errorf("unexpected start > end in test: start=%d, end=%d", start, end) } + + res := make([]*types.Header, end-start+1) + for num := start; num <= end; num++ { + res[num-start] = &types.Header{ + Number: new(big.Int).SetUint64(num), + } + } + return res, nil } } @@ -139,11 +147,11 @@ func TestHeaderDownloadUsingMilestones(t *testing.T) { FetchMilestonesFromBlock(gomock.Any(), gomock.Any(), gomock.Any()). Return(test.fakeMilestones(4), nil). Times(1) - test.sentry.EXPECT(). - PeersWithBlockNumInfo(). + test.p2pService.EXPECT(). + PeersSyncProgress(). Return(test.fakePeers(8)). Times(1) - test.sentry.EXPECT(). + test.p2pService.EXPECT(). DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(test.defaultDownloadHeadersMock()). Times(4) @@ -169,11 +177,11 @@ func TestHeaderDownloadUsingCheckpoints(t *testing.T) { FetchCheckpointsFromBlock(gomock.Any(), gomock.Any(), gomock.Any()). Return(test.fakeCheckpoints(8), nil). Times(1) - test.sentry.EXPECT(). - PeersWithBlockNumInfo(). + test.p2pService.EXPECT(). + PeersSyncProgress(). Return(test.fakePeers(2)). Times(4) - test.sentry.EXPECT(). + test.p2pService.EXPECT(). DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(test.defaultDownloadHeadersMock()). Times(8) @@ -213,11 +221,11 @@ func TestHeaderDownloadWhenInvalidStateThenPenalizePeerAndReDownload(t *testing. FetchCheckpointsFromBlock(gomock.Any(), gomock.Any(), gomock.Any()). Return(test.fakeCheckpoints(6), nil). Times(1) - test.sentry.EXPECT(). - PeersWithBlockNumInfo(). + test.p2pService.EXPECT(). + PeersSyncProgress(). Return(test.fakePeers(3)). Times(3) - test.sentry.EXPECT(). + test.p2pService.EXPECT(). DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(test.defaultDownloadHeadersMock()). // request 1,2,3 in parallel @@ -228,8 +236,8 @@ func TestHeaderDownloadWhenInvalidStateThenPenalizePeerAndReDownload(t *testing. // in total 6 requests + 1 request for re-requesting checkpoint 2 // total = 7 (note this also tests caching works) Times(7) - test.sentry.EXPECT(). - Penalize(gomock.Eq("peer2")). + test.p2pService.EXPECT(). + Penalize(gomock.Any(), gomock.Eq(p2p.PeerIdFromUint64(2))). Times(1) var persistedHeadersFirstTime, persistedHeadersRemaining []*types.Header gomock.InOrder( @@ -255,7 +263,7 @@ func TestHeaderDownloadWhenZeroPeersTriesAgain(t *testing.T) { FetchCheckpointsFromBlock(gomock.Any(), gomock.Any(), gomock.Any()). Return(test.fakeCheckpoints(8), nil). Times(1) - test.sentry.EXPECT(). + test.p2pService.EXPECT(). DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(test.defaultDownloadHeadersMock()). Times(8) @@ -266,18 +274,18 @@ func TestHeaderDownloadWhenZeroPeersTriesAgain(t *testing.T) { Times(4) gomock.InOrder( // first, no peers at all - test.sentry.EXPECT(). - PeersWithBlockNumInfo(). + test.p2pService.EXPECT(). + PeersSyncProgress(). Return(nil). Times(1), // second, 2 peers but not synced enough for us to use - test.sentry.EXPECT(). - PeersWithBlockNumInfo(). - Return(test.fakePeers(2, new(big.Int).SetUint64(0), new(big.Int).SetUint64(0))). + test.p2pService.EXPECT(). + PeersSyncProgress(). + Return(test.fakePeers(2, 0, 0)). Times(1), // then, 2 fully synced peers that we can use - test.sentry.EXPECT(). - PeersWithBlockNumInfo(). + test.p2pService.EXPECT(). + PeersSyncProgress(). Return(test.fakePeers(2)). Times(4), ) diff --git a/polygon/sync/peer_with_block_num_info.go b/polygon/sync/peer_with_block_num_info.go deleted file mode 100644 index 1cde2bbf7e4..00000000000 --- a/polygon/sync/peer_with_block_num_info.go +++ /dev/null @@ -1,22 +0,0 @@ -package sync - -import "math/big" - -type PeerWithBlockNumInfo struct { - ID string - BlockNum *big.Int -} - -type PeersWithBlockNumInfo []*PeerWithBlockNumInfo - -func (peers PeersWithBlockNumInfo) Len() int { - return len(peers) -} - -func (peers PeersWithBlockNumInfo) Less(i int, j int) bool { - return peers[i].BlockNum.Cmp(peers[j].BlockNum) < 1 -} - -func (peers PeersWithBlockNumInfo) Swap(i int, j int) { - peers[i], peers[j] = peers[j], peers[i] -} diff --git a/polygon/sync/sentry.go b/polygon/sync/sentry.go deleted file mode 100644 index ffea66b08d7..00000000000 --- a/polygon/sync/sentry.go +++ /dev/null @@ -1,16 +0,0 @@ -package sync - -import ( - "context" - "math/big" - - "github.com/ledgerwatch/erigon/core/types" -) - -//go:generate mockgen -destination=./sentry_mock.go -package=sync . Sentry -type Sentry interface { - MaxPeers() int - PeersWithBlockNumInfo() PeersWithBlockNumInfo - DownloadHeaders(ctx context.Context, start *big.Int, end *big.Int, peerID string) ([]*types.Header, error) - Penalize(peerID string) -} diff --git a/polygon/sync/sentry_mock.go b/polygon/sync/sentry_mock.go deleted file mode 100644 index 7070b629766..00000000000 --- a/polygon/sync/sentry_mock.go +++ /dev/null @@ -1,92 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/ledgerwatch/erigon/polygon/sync (interfaces: Sentry) - -// Package sync is a generated GoMock package. -package sync - -import ( - context "context" - big "math/big" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - types "github.com/ledgerwatch/erigon/core/types" -) - -// MockSentry is a mock of Sentry interface. -type MockSentry struct { - ctrl *gomock.Controller - recorder *MockSentryMockRecorder -} - -// MockSentryMockRecorder is the mock recorder for MockSentry. -type MockSentryMockRecorder struct { - mock *MockSentry -} - -// NewMockSentry creates a new mock instance. -func NewMockSentry(ctrl *gomock.Controller) *MockSentry { - mock := &MockSentry{ctrl: ctrl} - mock.recorder = &MockSentryMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockSentry) EXPECT() *MockSentryMockRecorder { - return m.recorder -} - -// DownloadHeaders mocks base method. -func (m *MockSentry) DownloadHeaders(arg0 context.Context, arg1, arg2 *big.Int, arg3 string) ([]*types.Header, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DownloadHeaders", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].([]*types.Header) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DownloadHeaders indicates an expected call of DownloadHeaders. -func (mr *MockSentryMockRecorder) DownloadHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadHeaders", reflect.TypeOf((*MockSentry)(nil).DownloadHeaders), arg0, arg1, arg2, arg3) -} - -// MaxPeers mocks base method. -func (m *MockSentry) MaxPeers() int { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MaxPeers") - ret0, _ := ret[0].(int) - return ret0 -} - -// MaxPeers indicates an expected call of MaxPeers. -func (mr *MockSentryMockRecorder) MaxPeers() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxPeers", reflect.TypeOf((*MockSentry)(nil).MaxPeers)) -} - -// PeersWithBlockNumInfo mocks base method. -func (m *MockSentry) PeersWithBlockNumInfo() PeersWithBlockNumInfo { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PeersWithBlockNumInfo") - ret0, _ := ret[0].(PeersWithBlockNumInfo) - return ret0 -} - -// PeersWithBlockNumInfo indicates an expected call of PeersWithBlockNumInfo. -func (mr *MockSentryMockRecorder) PeersWithBlockNumInfo() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeersWithBlockNumInfo", reflect.TypeOf((*MockSentry)(nil).PeersWithBlockNumInfo)) -} - -// Penalize mocks base method. -func (m *MockSentry) Penalize(arg0 string) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Penalize", arg0) -} - -// Penalize indicates an expected call of Penalize. -func (mr *MockSentryMockRecorder) Penalize(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Penalize", reflect.TypeOf((*MockSentry)(nil).Penalize), arg0) -} diff --git a/polygon/sync/storage.go b/polygon/sync/storage.go index ec7a38197e5..242a5a52cdd 100644 --- a/polygon/sync/storage.go +++ b/polygon/sync/storage.go @@ -5,7 +5,7 @@ import ( "github.com/ledgerwatch/erigon/polygon/heimdall" ) -//go:generate mockgen -destination=./db_mock.go -package=sync . DB +//go:generate mockgen -destination=./storage_mock.go -package=sync -source=./storage.go type HeaderStore interface { PutHeaders(headers []*types.Header) error }