From 96366e02334cd26bd134f2d170e123c4072fbe2b Mon Sep 17 00:00:00 2001 From: Andrew Ashikhmin <34320705+yperbasis@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:46:32 +0100 Subject: [PATCH] Fix eth/67 vs eth/68 peers for NewPooledTransactionHashes (#12997) [eth/68](https://eips.ethereum.org/EIPS/eip-5793) and eth/67 are only different w.r.t. to `NewPooledTransactionHashes`. Previously the determination of `msgcode` was sometimes failing because it relied only on one protocol version, namely `ss.Protocols[0].Version`. --- p2p/sentry/sentry_grpc_server.go | 68 ++++++++++++++++++-------------- txnprovider/txpool/send.go | 2 +- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/p2p/sentry/sentry_grpc_server.go b/p2p/sentry/sentry_grpc_server.go index 21e47443680..3719e9c7934 100644 --- a/p2p/sentry/sentry_grpc_server.go +++ b/p2p/sentry/sentry_grpc_server.go @@ -34,15 +34,15 @@ import ( "syscall" "time" + mapset "github.com/deckarep/golang-set/v2" "google.golang.org/grpc" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/protobuf/types/known/emptypb" - "github.com/erigontech/erigon-lib/log/v3" - libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/common/debug" "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/diagnostics" "github.com/erigontech/erigon-lib/direct" @@ -50,8 +50,7 @@ import ( "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" proto_sentry "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" proto_types "github.com/erigontech/erigon-lib/gointerfaces/typesproto" - - "github.com/erigontech/erigon-lib/common/debug" + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/rlp" "github.com/erigontech/erigon/cmd/utils" "github.com/erigontech/erigon/core/forkid" @@ -903,20 +902,6 @@ func (ss *GrpcServer) SendMessageByMinBlock(_ context.Context, inreq *proto_sent func (ss *GrpcServer) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} - msgcode := eth.FromProto[ss.Protocols[0].Version][inreq.Data.Id] - if msgcode != eth.GetBlockHeadersMsg && - msgcode != eth.BlockHeadersMsg && - msgcode != eth.GetBlockBodiesMsg && - msgcode != eth.BlockBodiesMsg && - msgcode != eth.NewBlockMsg && - msgcode != eth.NewBlockHashesMsg && - msgcode != eth.GetReceiptsMsg && - msgcode != eth.ReceiptsMsg && - msgcode != eth.NewPooledTransactionHashesMsg && - msgcode != eth.PooledTransactionsMsg && - msgcode != eth.GetPooledTransactionsMsg { - return reply, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id) - } peerID := ConvertH512ToPeerID(inreq.PeerId) peerInfo := ss.getPeer(peerID) @@ -926,25 +911,45 @@ func (ss *GrpcServer) SendMessageById(_ context.Context, inreq *proto_sentry.Sen return reply, nil } + msgcode, ok := eth.FromProto[peerInfo.protocol][inreq.Data.Id] + if !ok { + return reply, fmt.Errorf("msgcode not found for message Id: %s (peer protocol %d)", inreq.Data.Id, peerInfo.protocol) + } + ss.writePeer("[sentry] sendMessageById", peerInfo, msgcode, inreq.Data.Data, 0) reply.Peers = []*proto_types.H512{inreq.PeerId} return reply, nil } +func (ss *GrpcServer) messageCode(id proto_sentry.MessageId) (code uint64, protocolVersions mapset.Set[uint]) { + protocolVersions = mapset.NewSet[uint]() + for i := 0; i < len(ss.Protocols); i++ { + version := ss.Protocols[i].Version + if val, ok := eth.FromProto[version][id]; ok { + code = val // assuming that the code doesn't change between protocol versions + protocolVersions.Add(version) + } + } + return +} + func (ss *GrpcServer) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} - msgcode := eth.FromProto[ss.Protocols[0].Version][req.Data.Id] - if msgcode != eth.NewBlockMsg && - msgcode != eth.NewBlockHashesMsg && - msgcode != eth.NewPooledTransactionHashesMsg && - msgcode != eth.TransactionsMsg { + msgcode, protocolVersions := ss.messageCode(req.Data.Id) + if protocolVersions.Cardinality() == 0 || + (msgcode != eth.NewBlockMsg && + msgcode != eth.NewBlockHashesMsg && + msgcode != eth.NewPooledTransactionHashesMsg && + msgcode != eth.TransactionsMsg) { return reply, fmt.Errorf("sendMessageToRandomPeers not implemented for message Id: %s", req.Data.Id) } peerInfos := make([]*PeerInfo, 0, 100) ss.rangePeers(func(peerInfo *PeerInfo) bool { - peerInfos = append(peerInfos, peerInfo) + if protocolVersions.Contains(peerInfo.protocol) { + peerInfos = append(peerInfos, peerInfo) + } return true }) rand.Shuffle(len(peerInfos), func(i int, j int) { @@ -970,17 +975,20 @@ func (ss *GrpcServer) SendMessageToRandomPeers(ctx context.Context, req *proto_s func (ss *GrpcServer) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} - msgcode := eth.FromProto[ss.Protocols[0].Version][req.Id] - if msgcode != eth.NewBlockMsg && - msgcode != eth.NewPooledTransactionHashesMsg && // to broadcast new local transactions - msgcode != eth.NewBlockHashesMsg { + msgcode, protocolVersions := ss.messageCode(req.Id) + if protocolVersions.Cardinality() == 0 || + (msgcode != eth.NewBlockMsg && + msgcode != eth.NewPooledTransactionHashesMsg && // to broadcast new local transactions + msgcode != eth.NewBlockHashesMsg) { return reply, fmt.Errorf("sendMessageToAll not implemented for message Id: %s", req.Id) } var lastErr error ss.rangePeers(func(peerInfo *PeerInfo) bool { - ss.writePeer("[sentry] SendMessageToAll", peerInfo, msgcode, req.Data, 0) - reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH512(peerInfo.ID())) + if protocolVersions.Contains(peerInfo.protocol) { + ss.writePeer("[sentry] SendMessageToAll", peerInfo, msgcode, req.Data, 0) + reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH512(peerInfo.ID())) + } return true }) return reply, lastErr diff --git a/txnprovider/txpool/send.go b/txnprovider/txpool/send.go index 52b16c0e542..985b84c3ba4 100644 --- a/txnprovider/txpool/send.go +++ b/txnprovider/txpool/send.go @@ -157,7 +157,7 @@ func (f *Send) AnnouncePooledTxns(types []byte, sizes []uint32, hashes Hashes, m } switch protocols[protocolIndex] { - case 66, 67: + case 67: if i > prevI { req := &sentryproto.SendMessageToRandomPeersRequest{ Data: &sentryproto.OutboundMessageData{