diff --git a/pkg/crawler/metrics.go b/pkg/crawler/metrics.go index 3d944dc..edf446d 100644 --- a/pkg/crawler/metrics.go +++ b/pkg/crawler/metrics.go @@ -62,10 +62,24 @@ var ( HostedPeers = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: modName, Name: "hosted_peers_distribution", - Help: "Distribution of IPs hosting the nodes in the network", + Help: "Distribution of nodes that are hosted on non-residential networks", }, []string{"ip_host"}, ) + RttDist = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: modName, + Name: "observed_rtt_distribution", + Help: "Distribution of RTT between our tool and nodes in the network", + }, + []string{"secs"}, + ) + IPDist = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: modName, + Name: "observed_ip_distribution", + Help: "Distribution of IPs hosting nodes in the network", + }, + []string{"numbernodes"}, + ) ) func (c *EthereumCrawler) GetMetrics() *metrics.MetricsModule { @@ -82,6 +96,8 @@ func (c *EthereumCrawler) GetMetrics() *metrics.MetricsModule { metricsMod.AddIndvMetric(c.getPeersOs()) metricsMod.AddIndvMetric(c.getPeersArch()) metricsMod.AddIndvMetric(c.getHostedPeers()) + metricsMod.AddIndvMetric(c.getRTTDist()) + metricsMod.AddIndvMetric(c.getIPDist()) return metricsMod } @@ -292,3 +308,56 @@ func (c *EthereumCrawler) getHostedPeers() *metrics.IndvMetrics { } return ipHosting } + + +func (c *EthereumCrawler) getRTTDist() *metrics.IndvMetrics { + initFn := func() error { + prometheus.MustRegister(RttDist) + return nil + } + updateFn := func() (interface{}, error) { + summary, err := c.DB.GetRTTDistribution() + if err != nil { + return nil, err + } + for key, val := range summary { + RttDist.WithLabelValues(key).Set(float64(val.(int))) + } + return summary, nil + } + indvMetric, err := metrics.NewIndvMetrics( + "rtt_distribution", + initFn, + updateFn, + ) + if err != nil { + return nil + } + return indvMetric +} + +func (c *EthereumCrawler) getIPDist() *metrics.IndvMetrics { + initFn := func() error { + prometheus.MustRegister(IPDist) + return nil + } + updateFn := func() (interface{}, error) { + summary, err := c.DB.GetIPDistribution() + if err != nil { + return nil, err + } + for key, val := range summary { + IPDist.WithLabelValues(key).Set(float64(val.(int))) + } + return summary, nil + } + indvMetric, err := metrics.NewIndvMetrics( + "ip_distribution", + initFn, + updateFn, + ) + if err != nil { + return nil + } + return indvMetric +} diff --git a/pkg/db/postgresql/crawler_metrics.go b/pkg/db/postgresql/crawler_metrics.go index 1c52a07..435d848 100644 --- a/pkg/db/postgresql/crawler_metrics.go +++ b/pkg/db/postgresql/crawler_metrics.go @@ -1,6 +1,7 @@ package postgresql import ( + "fmt" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -251,3 +252,95 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { summary["hosted_ips"] = hosted return summary, nil } + +func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) { + summary := make(map[string]interface{}, 0) + + rows, err := db.psqlPool.Query( + db.ctx, + ` + SELECT + t.latency as latency_range, + count(*) as nodes + FROM ( + SELECT + CASE + WHEN latency between 0 and 100 THEN ' 0-100ms' + WHEN latency between 101 and 200 THEN '101-200ms' + WHEN latency between 201 and 300 THEN '201-300ms' + WHEN latency between 301 and 400 THEN '301-400ms' + WHEN latency between 401 and 500 THEN '401-500ms' + WHEN latency between 501 and 600 THEN '501-600ms' + WHEN latency between 601 and 700 THEN '601-700ms' + WHEN latency between 701 and 800 THEN '701-800ms' + WHEN latency between 801 and 900 THEN '801-900ms' + WHEN latency between 901 and 1000 THEN '901-1000ms' + ELSE '+1s' + END as latency + FROM peer_info + WHERE deprecated=false and client_name IS NOT NULL + ) as t + GROUP BY t.latency + ORDER BY nodes DESC; + + `, + ) + if err != nil { + return summary, err + } + + for rows.Next() { + var rttRange string + var rttValue int + err = rows.Scan( + &rttRange, + &rttValue, + ) + if err != nil { + return summary, err + } + summary[rttRange] = rttValue + } + return summary, nil +} + +func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) { + summary := make(map[string]interface{}, 0) + + rows, err := db.psqlPool.Query( + db.ctx, + ` + SELECT + nodes as nodes_per_ip, + count(t.nodes) as number_of_ips + FROM ( + SELECT + ip, + count(ip) as nodes + FROM peer_info + WHERE deprecated = false and client_name IS NOT NULL + GROUP BY ip + ORDER BY nodes DESC + ) as t + GROUP BY nodes + ORDER BY number_of_ips DESC; + `, + ) + if err != nil { + return summary, err + } + + for rows.Next() { + var nodesPerIP int + var ips int + err = rows.Scan( + &nodesPerIP, + &ips, + ) + if err != nil { + return summary, err + } + summary[fmt.Sprintf("%d", nodesPerIP)] = ips + } + return summary, nil +} diff --git a/pkg/discovery/dv5/dv5_service.go b/pkg/discovery/dv5/dv5_service.go index 43a6be2..299e4ec 100644 --- a/pkg/discovery/dv5/dv5_service.go +++ b/pkg/discovery/dv5/dv5_service.go @@ -134,8 +134,8 @@ func (d *Discovery5) nodeIterator() { if d.Iterator.Next() { // fill the given DiscoveredPeer interface with the next found peer node := d.Iterator.Node() - log.WithFields(log.Fields{ + "enr": node.String(), "node_id": node.ID().String(), "module": "Discv5", }).Debug("new ENR discovered") diff --git a/pkg/gossipsub/topic_subscription.go b/pkg/gossipsub/topic_subscription.go index 02b8940..c746cc4 100644 --- a/pkg/gossipsub/topic_subscription.go +++ b/pkg/gossipsub/topic_subscription.go @@ -64,7 +64,7 @@ func (c *TopicSubscription) MessageReadingLoop(selfId peer.ID, dbClient database // use the msg handler for that specific topic that we have content, err := c.handlerFn(msg) if err != nil { - log.Error(errors.Wrap(err, "unable to unwrap message")) + log.Error(errors.Wrap(err, "unable to unwrap message on topic " + c.sub.Topic())) continue } if !content.IsZero() && c.persistMsgs {