From 99e2b9b38f5f6382a8ba09f9a87e87fb68e1a916 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Fri, 29 Sep 2023 11:16:20 -0400 Subject: [PATCH] Remove unnecessary arc from OverlayMetrics --- portalnet/src/config.rs | 10 +++++++++- portalnet/src/discovery.rs | 23 +++++++++++++---------- portalnet/src/metrics/portalnet.rs | 6 +++--- portalnet/src/overlay.rs | 6 ++---- portalnet/src/overlay_service.rs | 19 +++++++++---------- portalnet/src/storage.rs | 1 - portalnet/src/types/messages.rs | 1 - src/lib.rs | 9 +++++++-- 8 files changed, 43 insertions(+), 32 deletions(-) diff --git a/portalnet/src/config.rs b/portalnet/src/config.rs index e779d7e21..8b9c093cd 100644 --- a/portalnet/src/config.rs +++ b/portalnet/src/config.rs @@ -1,4 +1,5 @@ use std::net::SocketAddr; +use std::path::PathBuf; use ethereum_types::H256; @@ -24,16 +25,22 @@ pub struct PortalnetConfig { pub no_stun: bool, pub node_addr_cache_capacity: usize, pub metrics: OverlayMetrics, + pub enr_file_location: Option, } impl PortalnetConfig { - pub fn new(trin_config: &TrinConfig, private_key: H256) -> Self { + pub fn new( + trin_config: &TrinConfig, + private_key: H256, + enr_file_location: Option, + ) -> Self { Self { external_addr: trin_config.external_addr, private_key, listen_port: trin_config.discovery_port, no_stun: trin_config.no_stun, bootnodes: trin_config.bootnodes.clone(), + enr_file_location, ..Default::default() } } @@ -51,6 +58,7 @@ impl Default for PortalnetConfig { no_stun: false, node_addr_cache_capacity: NODE_ADDR_CACHE_CAPACITY, metrics: PORTALNET_METRICS.overlay.clone(), + enr_file_location: None, } } } diff --git a/portalnet/src/discovery.rs b/portalnet/src/discovery.rs index d11acf72a..92c665aa5 100644 --- a/portalnet/src/discovery.rs +++ b/portalnet/src/discovery.rs @@ -1,5 +1,8 @@ -use super::types::messages::{PortalnetConfig, ProtocolId}; -use crate::socket; +use std::hash::{Hash, Hasher}; +use std::net::Ipv4Addr; +use std::str::FromStr; +use std::{convert::TryFrom, fmt, fs, io, net::SocketAddr, sync::Arc}; + use anyhow::anyhow; use async_trait::async_trait; use bytes::BytesMut; @@ -8,22 +11,22 @@ use discv5::{ enr::{CombinedKey, EnrBuilder, NodeId}, ConfigBuilder, Discv5, Event, ListenConfig, RequestError, TalkRequest, }; -use ethportal_api::types::enr::Enr; -use ethportal_api::utils::bytes::hex_encode; -use ethportal_api::NodeInfo; use lru::LruCache; use parking_lot::RwLock; use rlp::RlpStream; use serde_json::{json, Value}; -use std::hash::{Hash, Hasher}; -use std::net::Ipv4Addr; -use std::str::FromStr; -use std::{convert::TryFrom, fmt, fs, io, net::SocketAddr, sync::Arc}; use tokio::sync::mpsc; use tracing::{debug, info, warn}; -use trin_utils::version::get_trin_version; use utp_rs::{cid::ConnectionPeer, udp::AsyncUdpSocket}; +use super::config::PortalnetConfig; +use super::types::messages::ProtocolId; +use crate::socket; +use ethportal_api::types::enr::Enr; +use ethportal_api::utils::bytes::hex_encode; +use ethportal_api::NodeInfo; +use trin_utils::version::get_trin_version; + /// Size of the buffer of the Discv5 TALKREQ channel. const TALKREQ_CHANNEL_BUFFER: usize = 100; diff --git a/portalnet/src/metrics/portalnet.rs b/portalnet/src/metrics/portalnet.rs index f46182b55..66cef3bf2 100644 --- a/portalnet/src/metrics/portalnet.rs +++ b/portalnet/src/metrics/portalnet.rs @@ -1,7 +1,7 @@ use crate::metrics::overlay::OverlayMetrics; use crate::metrics::storage::StorageMetrics; use lazy_static::lazy_static; -use prometheus_exporter::prometheus::{default_registry, Registry}; +use prometheus_exporter::prometheus::default_registry; // We use lazy_static to ensure that the metrics registry is initialized only once, for each // runtime. This is important because the registry is a global singleton, and if it is @@ -23,8 +23,8 @@ pub struct PortalnetMetrics { impl PortalnetMetrics { pub fn new() -> anyhow::Result { let registry = default_registry(); - let overlay = OverlayMetrics::new(®istry)?; - let storage = StorageMetrics::new(®istry)?; + let overlay = OverlayMetrics::new(registry)?; + let storage = StorageMetrics::new(registry)?; Ok(Self { overlay, storage }) } } diff --git a/portalnet/src/overlay.rs b/portalnet/src/overlay.rs index 650f84bb7..bbf1a986c 100644 --- a/portalnet/src/overlay.rs +++ b/portalnet/src/overlay.rs @@ -113,7 +113,7 @@ pub struct OverlayProtocol { /// Accepted content validator that makes requests to this/other overlay networks validator: Arc, /// Runtime telemetry metrics for the overlay network. - metrics: Arc, + metrics: OverlayMetrics, } impl< @@ -141,8 +141,6 @@ where config.table_filter, config.bucket_filter, ))); - let metrics = Arc::new(metrics); - let command_tx = OverlayService::::spawn( Arc::clone(&discovery), Arc::clone(&store), @@ -151,7 +149,7 @@ where config.ping_queue_interval, protocol.clone(), Arc::clone(&utp_socket), - Arc::clone(&metrics), + metrics.clone(), Arc::clone(&validator), config.query_timeout, config.query_peer_timeout, diff --git a/portalnet/src/overlay_service.rs b/portalnet/src/overlay_service.rs index 4448a39f3..151a1fddf 100644 --- a/portalnet/src/overlay_service.rs +++ b/portalnet/src/overlay_service.rs @@ -302,7 +302,7 @@ pub struct OverlayService { /// Phantom metric (distance function). phantom_metric: PhantomData, /// Metrics reporting component - metrics: Arc, + metrics: OverlayMetrics, /// Validator for overlay network content. validator: Arc, /// A channel that the overlay service emits events on. @@ -332,7 +332,7 @@ where ping_queue_interval: Option, protocol: ProtocolId, utp_socket: Arc>, - metrics: Arc, + metrics: OverlayMetrics, validator: Arc, query_timeout: Duration, query_peer_timeout: Duration, @@ -1124,7 +1124,7 @@ where // Wait for an incoming connection with the given CID. Then, write the data // over the uTP stream. let utp = Arc::clone(&self.utp_socket); - let metrics = Arc::clone(&self.metrics); + let metrics = self.metrics.clone(); let protocol = self.protocol.clone(); tokio::spawn(async move { metrics.report_utp_active_inc( @@ -1256,7 +1256,7 @@ where let kbuckets = Arc::clone(&self.kbuckets); let command_tx = self.command_tx.clone(); let utp = Arc::clone(&self.utp_socket); - let metrics = Arc::clone(&self.metrics); + let metrics = self.metrics.clone(); let protocol = self.protocol.clone(); tokio::spawn(async move { @@ -1533,7 +1533,7 @@ where let response_clone = response.clone(); let utp = Arc::clone(&self.utp_socket); - let metrics = Arc::clone(&self.metrics); + let metrics = self.metrics.clone(); let protocol = self.protocol.clone(); tokio::spawn(async move { @@ -1619,7 +1619,7 @@ where async fn process_accept_utp_payload( validator: Arc, store: Arc>, - metrics: Arc, + metrics: OverlayMetrics, kbuckets: Arc>>, command_tx: UnboundedSender>, content_keys: Vec, @@ -1648,7 +1648,7 @@ where // - Propagate all validated content let validator = Arc::clone(&validator); let store = Arc::clone(&store); - let metrics = Arc::clone(&metrics); + let metrics = metrics.clone(); let protocol = protocol.clone(); tokio::spawn(async move { // Validated received content @@ -1717,7 +1717,7 @@ where async fn send_utp_content( mut stream: UtpStream, content: &[u8], - metrics: Arc, + metrics: OverlayMetrics, protocol: ProtocolId, ) -> anyhow::Result<()> { match stream.write(content).await { @@ -1873,7 +1873,7 @@ where responder: Option>, trace: Option, nodes_to_poke: Vec, - metrics: Arc, + metrics: OverlayMetrics, protocol: ProtocolId, ) { let mut content = content; @@ -2795,7 +2795,6 @@ mod tests { let (command_tx, command_rx) = mpsc::unbounded_channel(); let (response_tx, response_rx) = mpsc::unbounded_channel(); let metrics = PORTALNET_METRICS.overlay.clone(); - let metrics = Arc::new(metrics); let validator = Arc::new(MockValidator {}); OverlayService { diff --git a/portalnet/src/storage.rs b/portalnet/src/storage.rs index 4643777ff..ce4e29d04 100644 --- a/portalnet/src/storage.rs +++ b/portalnet/src/storage.rs @@ -176,7 +176,6 @@ pub struct PortalStorageConfig { pub distance_fn: DistanceFunction, pub db: Arc, pub sql_connection_pool: Pool, - // do we need arc here? pub metrics: StorageMetrics, } diff --git a/portalnet/src/types/messages.rs b/portalnet/src/types/messages.rs index ae9d999e1..4e71ecf1a 100644 --- a/portalnet/src/types/messages.rs +++ b/portalnet/src/types/messages.rs @@ -1,4 +1,3 @@ -use std::path::PathBuf; use std::{ convert::{TryFrom, TryInto}, fmt, diff --git a/src/lib.rs b/src/lib.rs index df9df9c66..2f08e7d44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,14 +53,19 @@ pub async fn run_trin( prometheus_exporter::start(addr)?; } - //enr_file_location: Some(node_data_dir.clone()); - let portalnet_config = PortalnetConfig::new(&trin_config, private_key, &metrics_registry); + let enr_file_location = Some(node_data_dir.clone()); + let portalnet_config = PortalnetConfig::new(&trin_config, private_key, enr_file_location); // Initialize base discovery protocol let mut discovery = Discovery::new(portalnet_config.clone())?; let talk_req_rx = discovery.start().await?; let discovery = Arc::new(discovery); + // Initialize prometheus metrics + if let Some(addr) = trin_config.enable_metrics_with_url { + prometheus_exporter::start(addr)?; + } + // Initialize and spawn uTP socket let (utp_talk_reqs_tx, utp_talk_reqs_rx) = mpsc::unbounded_channel(); let discv5_utp_socket = Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_reqs_rx);