From 5045cea0479385327761f890d9bb6fe0d747c2f0 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Fri, 22 Mar 2024 15:36:07 +0100 Subject: [PATCH] swap db call for local peerstore calls --- pkg/crawler/ethereum.go | 2 +- pkg/events/events.go | 30 ++++++++++++++---------------- pkg/hosts/host.go | 30 ++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index 5e2391c..1c9d790 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -194,7 +194,7 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) } // Build the event forwarder - eventHandler := events.NewForwarder(conf.SSEIP, conf.SSEPort, dbClient, ethMsgHandler) + eventHandler := events.NewForwarder(conf.SSEIP, conf.SSEPort, host, ethMsgHandler) // generate the CrawlerBase crawler := &EthereumCrawler{ diff --git a/pkg/events/events.go b/pkg/events/events.go index f1cd2be..20b48b6 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -6,7 +6,7 @@ import ( "net/http" "sync" - "github.com/migalabs/armiarma/pkg/db/postgresql" + "github.com/migalabs/armiarma/pkg/hosts" "github.com/migalabs/armiarma/pkg/networks/ethereum" "github.com/r3labs/sse/v2" log "github.com/sirupsen/logrus" @@ -20,7 +20,7 @@ type Forwarder struct { port int server *sse.Server - db *postgresql.DBClient + h *hosts.BasicLibp2pHost ethMsgHandler *ethereum.EthMessageHandler // Store downstream attestation events in a channel so @@ -31,7 +31,7 @@ type Forwarder struct { } // NewForwarder creates a new Forwarder -func NewForwarder(ip string, port int, db *postgresql.DBClient, ethMsgHandler *ethereum.EthMessageHandler) *Forwarder { +func NewForwarder(ip string, port int, h *hosts.BasicLibp2pHost, ethMsgHandler *ethereum.EthMessageHandler) *Forwarder { server := sse.New() // Disable auto replay. If a consumer is not connected, it will never receive the event. @@ -41,7 +41,7 @@ func NewForwarder(ip string, port int, db *postgresql.DBClient, ethMsgHandler *e ip: ip, port: port, server: server, - db: db, + h: h, ethMsgHandler: ethMsgHandler, attestationCh: make(chan *ethereum.AttestationReceievedEvent, 10000), } @@ -137,12 +137,10 @@ func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEven log.WithError(err).Error("error publishing raw attestation to SSE server") } - // Build the timed attestation event - info, err := f.db.GetFullHostInfo(e.PeerID) + // compose the info of the remote peer sending the attestation + hostInfo, err := f.h.GetHostInfo(e.PeerID) if err != nil { - log.WithError(err).Error("error getting host info from DB when handling a new attestation") - - return + log.WithError(err).Error("unable to extract info from peer that shared attestation") } // Publish the timed event @@ -155,13 +153,13 @@ func (f *Forwarder) processAttestationEvent(e *ethereum.AttestationReceievedEven TimeInSlot: e.TrackedAttestation.TimeInSlot, }, PeerInfo: &PeerInfo{ - ID: fmt.Sprintf("%s", info.ID), - IP: info.IP, - Port: info.Port, - UserAgent: info.PeerInfo.UserAgent, - Latency: info.PeerInfo.Latency, - Protocols: info.PeerInfo.Protocols, - ProtocolVersion: info.PeerInfo.ProtocolVersion, + ID: e.PeerID.String(), + IP: hostInfo.IP, + Port: hostInfo.Port, + UserAgent: hostInfo.PeerInfo.UserAgent, + Latency: hostInfo.PeerInfo.Latency, + Protocols: hostInfo.PeerInfo.Protocols, + ProtocolVersion: hostInfo.PeerInfo.ProtocolVersion, }, }); err != nil { log.WithError(err).Error("error publishing timed attestation to SSE server") diff --git a/pkg/hosts/host.go b/pkg/hosts/host.go index 7061ba1..8138a48 100644 --- a/pkg/hosts/host.go +++ b/pkg/hosts/host.go @@ -220,3 +220,33 @@ func (b *BasicLibp2pHost) RecIdentEvent(identEvent IdentificationEvent) { func (b *BasicLibp2pHost) IdentEventNotChannel() chan IdentificationEvent { return b.identNotChannel } + +func (b *BasicLibp2pHost) GetHostInfo(peerID peer.ID) (models.HostInfo, error) { + var hostInfo models.HostInfo + + // Connectivity of the node + hostInfo.MAddrs = b.host.Peerstore().Addrs(peerID) + addr := utils.GetPublicAddrsFromAddrArray(hostInfo.MAddrs) + hostInfo.IP = utils.ExtractIPFromMAddr(addr).String() + hostInfo.Port = utils.GetPortFromMaddrs(addr) + + // Protocols + pv, err := b.host.Peerstore().Get(peerID, "ProtocolVersion") + if err == nil { + hostInfo.PeerInfo.ProtocolVersion = pv.(string) + } + prots, err := b.host.Peerstore().GetProtocols(peerID) + if err == nil { + supportedProtocols := make([]string, len(prots)) + for i, prot := range prots { + supportedProtocols[i] = string(prot) + } + hostInfo.PeerInfo.Protocols = supportedProtocols + } + // Get user agent + ua, err := b.host.Peerstore().Get(peerID, "AgentVersion") + if err == nil { + hostInfo.PeerInfo.UserAgent = ua.(string) + } + return hostInfo, nil +}