Skip to content

Commit

Permalink
Fix eth/67 vs eth/68 peers for NewPooledTransactionHashes (#12997)
Browse files Browse the repository at this point in the history
[eth/68](https://eips.ethereum.org/EIPS/eip-5793) and eth/67 are only
different w.r.t. to `NewPooledTransactionHashes`. Previously the
determination of `msgcode` was sometimes failing because it relied only
on one protocol version, namely `ss.Protocols[0].Version`.
  • Loading branch information
yperbasis authored Dec 4, 2024
1 parent e56550f commit 96366e0
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 31 deletions.
68 changes: 38 additions & 30 deletions p2p/sentry/sentry_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,23 @@ import (
"syscall"
"time"

mapset "github.com/deckarep/golang-set/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/erigontech/erigon-lib/log/v3"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/datadir"
"github.com/erigontech/erigon-lib/common/debug"
"github.com/erigontech/erigon-lib/common/dir"
"github.com/erigontech/erigon-lib/diagnostics"
"github.com/erigontech/erigon-lib/direct"
"github.com/erigontech/erigon-lib/gointerfaces"
"github.com/erigontech/erigon-lib/gointerfaces/grpcutil"
proto_sentry "github.com/erigontech/erigon-lib/gointerfaces/sentryproto"
proto_types "github.com/erigontech/erigon-lib/gointerfaces/typesproto"

"github.com/erigontech/erigon-lib/common/debug"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon-lib/rlp"
"github.com/erigontech/erigon/cmd/utils"
"github.com/erigontech/erigon/core/forkid"
Expand Down Expand Up @@ -903,20 +902,6 @@ func (ss *GrpcServer) SendMessageByMinBlock(_ context.Context, inreq *proto_sent

func (ss *GrpcServer) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}
msgcode := eth.FromProto[ss.Protocols[0].Version][inreq.Data.Id]
if msgcode != eth.GetBlockHeadersMsg &&
msgcode != eth.BlockHeadersMsg &&
msgcode != eth.GetBlockBodiesMsg &&
msgcode != eth.BlockBodiesMsg &&
msgcode != eth.NewBlockMsg &&
msgcode != eth.NewBlockHashesMsg &&
msgcode != eth.GetReceiptsMsg &&
msgcode != eth.ReceiptsMsg &&
msgcode != eth.NewPooledTransactionHashesMsg &&
msgcode != eth.PooledTransactionsMsg &&
msgcode != eth.GetPooledTransactionsMsg {
return reply, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id)
}

peerID := ConvertH512ToPeerID(inreq.PeerId)
peerInfo := ss.getPeer(peerID)
Expand All @@ -926,25 +911,45 @@ func (ss *GrpcServer) SendMessageById(_ context.Context, inreq *proto_sentry.Sen
return reply, nil
}

msgcode, ok := eth.FromProto[peerInfo.protocol][inreq.Data.Id]
if !ok {
return reply, fmt.Errorf("msgcode not found for message Id: %s (peer protocol %d)", inreq.Data.Id, peerInfo.protocol)
}

ss.writePeer("[sentry] sendMessageById", peerInfo, msgcode, inreq.Data.Data, 0)
reply.Peers = []*proto_types.H512{inreq.PeerId}
return reply, nil
}

func (ss *GrpcServer) messageCode(id proto_sentry.MessageId) (code uint64, protocolVersions mapset.Set[uint]) {
protocolVersions = mapset.NewSet[uint]()
for i := 0; i < len(ss.Protocols); i++ {
version := ss.Protocols[i].Version
if val, ok := eth.FromProto[version][id]; ok {
code = val // assuming that the code doesn't change between protocol versions
protocolVersions.Add(version)
}
}
return
}

func (ss *GrpcServer) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}

msgcode := eth.FromProto[ss.Protocols[0].Version][req.Data.Id]
if msgcode != eth.NewBlockMsg &&
msgcode != eth.NewBlockHashesMsg &&
msgcode != eth.NewPooledTransactionHashesMsg &&
msgcode != eth.TransactionsMsg {
msgcode, protocolVersions := ss.messageCode(req.Data.Id)
if protocolVersions.Cardinality() == 0 ||
(msgcode != eth.NewBlockMsg &&
msgcode != eth.NewBlockHashesMsg &&
msgcode != eth.NewPooledTransactionHashesMsg &&
msgcode != eth.TransactionsMsg) {
return reply, fmt.Errorf("sendMessageToRandomPeers not implemented for message Id: %s", req.Data.Id)
}

peerInfos := make([]*PeerInfo, 0, 100)
ss.rangePeers(func(peerInfo *PeerInfo) bool {
peerInfos = append(peerInfos, peerInfo)
if protocolVersions.Contains(peerInfo.protocol) {
peerInfos = append(peerInfos, peerInfo)
}
return true
})
rand.Shuffle(len(peerInfos), func(i int, j int) {
Expand All @@ -970,17 +975,20 @@ func (ss *GrpcServer) SendMessageToRandomPeers(ctx context.Context, req *proto_s
func (ss *GrpcServer) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}

msgcode := eth.FromProto[ss.Protocols[0].Version][req.Id]
if msgcode != eth.NewBlockMsg &&
msgcode != eth.NewPooledTransactionHashesMsg && // to broadcast new local transactions
msgcode != eth.NewBlockHashesMsg {
msgcode, protocolVersions := ss.messageCode(req.Id)
if protocolVersions.Cardinality() == 0 ||
(msgcode != eth.NewBlockMsg &&
msgcode != eth.NewPooledTransactionHashesMsg && // to broadcast new local transactions
msgcode != eth.NewBlockHashesMsg) {
return reply, fmt.Errorf("sendMessageToAll not implemented for message Id: %s", req.Id)
}

var lastErr error
ss.rangePeers(func(peerInfo *PeerInfo) bool {
ss.writePeer("[sentry] SendMessageToAll", peerInfo, msgcode, req.Data, 0)
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH512(peerInfo.ID()))
if protocolVersions.Contains(peerInfo.protocol) {
ss.writePeer("[sentry] SendMessageToAll", peerInfo, msgcode, req.Data, 0)
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH512(peerInfo.ID()))
}
return true
})
return reply, lastErr
Expand Down
2 changes: 1 addition & 1 deletion txnprovider/txpool/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (f *Send) AnnouncePooledTxns(types []byte, sizes []uint32, hashes Hashes, m
}

switch protocols[protocolIndex] {
case 66, 67:
case 67:
if i > prevI {
req := &sentryproto.SendMessageToRandomPeersRequest{
Data: &sentryproto.OutboundMessageData{
Expand Down

0 comments on commit 96366e0

Please sign in to comment.