Skip to content

Commit

Permalink
Remove unnecessary arc from OverlayMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita committed Oct 3, 2023
1 parent 8d5f69d commit 99e2b9b
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 32 deletions.
10 changes: 9 additions & 1 deletion portalnet/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::net::SocketAddr;
use std::path::PathBuf;

use ethereum_types::H256;

Expand All @@ -24,16 +25,22 @@ pub struct PortalnetConfig {
pub no_stun: bool,
pub node_addr_cache_capacity: usize,
pub metrics: OverlayMetrics,
pub enr_file_location: Option<PathBuf>,
}

impl PortalnetConfig {
pub fn new(trin_config: &TrinConfig, private_key: H256) -> Self {
pub fn new(
trin_config: &TrinConfig,
private_key: H256,
enr_file_location: Option<PathBuf>,
) -> Self {
Self {
external_addr: trin_config.external_addr,
private_key,
listen_port: trin_config.discovery_port,
no_stun: trin_config.no_stun,
bootnodes: trin_config.bootnodes.clone(),
enr_file_location,
..Default::default()
}
}
Expand All @@ -51,6 +58,7 @@ impl Default for PortalnetConfig {
no_stun: false,
node_addr_cache_capacity: NODE_ADDR_CACHE_CAPACITY,
metrics: PORTALNET_METRICS.overlay.clone(),
enr_file_location: None,
}
}
}
23 changes: 13 additions & 10 deletions portalnet/src/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::types::messages::{PortalnetConfig, ProtocolId};
use crate::socket;
use std::hash::{Hash, Hasher};
use std::net::Ipv4Addr;
use std::str::FromStr;
use std::{convert::TryFrom, fmt, fs, io, net::SocketAddr, sync::Arc};

use anyhow::anyhow;
use async_trait::async_trait;
use bytes::BytesMut;
Expand All @@ -8,22 +11,22 @@ use discv5::{
enr::{CombinedKey, EnrBuilder, NodeId},
ConfigBuilder, Discv5, Event, ListenConfig, RequestError, TalkRequest,
};
use ethportal_api::types::enr::Enr;
use ethportal_api::utils::bytes::hex_encode;
use ethportal_api::NodeInfo;
use lru::LruCache;
use parking_lot::RwLock;
use rlp::RlpStream;
use serde_json::{json, Value};
use std::hash::{Hash, Hasher};
use std::net::Ipv4Addr;
use std::str::FromStr;
use std::{convert::TryFrom, fmt, fs, io, net::SocketAddr, sync::Arc};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use trin_utils::version::get_trin_version;
use utp_rs::{cid::ConnectionPeer, udp::AsyncUdpSocket};

use super::config::PortalnetConfig;
use super::types::messages::ProtocolId;
use crate::socket;
use ethportal_api::types::enr::Enr;
use ethportal_api::utils::bytes::hex_encode;
use ethportal_api::NodeInfo;
use trin_utils::version::get_trin_version;

/// Size of the buffer of the Discv5 TALKREQ channel.
const TALKREQ_CHANNEL_BUFFER: usize = 100;

Expand Down
6 changes: 3 additions & 3 deletions portalnet/src/metrics/portalnet.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::metrics::overlay::OverlayMetrics;
use crate::metrics::storage::StorageMetrics;
use lazy_static::lazy_static;
use prometheus_exporter::prometheus::{default_registry, Registry};
use prometheus_exporter::prometheus::default_registry;

// We use lazy_static to ensure that the metrics registry is initialized only once, for each
// runtime. This is important because the registry is a global singleton, and if it is
Expand All @@ -23,8 +23,8 @@ pub struct PortalnetMetrics {
impl PortalnetMetrics {
pub fn new() -> anyhow::Result<Self> {
let registry = default_registry();
let overlay = OverlayMetrics::new(&registry)?;
let storage = StorageMetrics::new(&registry)?;
let overlay = OverlayMetrics::new(registry)?;
let storage = StorageMetrics::new(registry)?;
Ok(Self { overlay, storage })
}
}
6 changes: 2 additions & 4 deletions portalnet/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub struct OverlayProtocol<TContentKey, TMetric, TValidator, TStore> {
/// Accepted content validator that makes requests to this/other overlay networks
validator: Arc<TValidator>,
/// Runtime telemetry metrics for the overlay network.
metrics: Arc<OverlayMetrics>,
metrics: OverlayMetrics,
}

impl<
Expand Down Expand Up @@ -141,8 +141,6 @@ where
config.table_filter,
config.bucket_filter,
)));
let metrics = Arc::new(metrics);

let command_tx = OverlayService::<TContentKey, TMetric, TValidator, TStore>::spawn(
Arc::clone(&discovery),
Arc::clone(&store),
Expand All @@ -151,7 +149,7 @@ where
config.ping_queue_interval,
protocol.clone(),
Arc::clone(&utp_socket),
Arc::clone(&metrics),
metrics.clone(),
Arc::clone(&validator),
config.query_timeout,
config.query_peer_timeout,
Expand Down
19 changes: 9 additions & 10 deletions portalnet/src/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ pub struct OverlayService<TContentKey, TMetric, TValidator, TStore> {
/// Phantom metric (distance function).
phantom_metric: PhantomData<TMetric>,
/// Metrics reporting component
metrics: Arc<OverlayMetrics>,
metrics: OverlayMetrics,
/// Validator for overlay network content.
validator: Arc<TValidator>,
/// A channel that the overlay service emits events on.
Expand Down Expand Up @@ -332,7 +332,7 @@ where
ping_queue_interval: Option<Duration>,
protocol: ProtocolId,
utp_socket: Arc<UtpSocket<crate::discovery::UtpEnr>>,
metrics: Arc<OverlayMetrics>,
metrics: OverlayMetrics,
validator: Arc<TValidator>,
query_timeout: Duration,
query_peer_timeout: Duration,
Expand Down Expand Up @@ -1124,7 +1124,7 @@ where
// Wait for an incoming connection with the given CID. Then, write the data
// over the uTP stream.
let utp = Arc::clone(&self.utp_socket);
let metrics = Arc::clone(&self.metrics);
let metrics = self.metrics.clone();
let protocol = self.protocol.clone();
tokio::spawn(async move {
metrics.report_utp_active_inc(
Expand Down Expand Up @@ -1256,7 +1256,7 @@ where
let kbuckets = Arc::clone(&self.kbuckets);
let command_tx = self.command_tx.clone();
let utp = Arc::clone(&self.utp_socket);
let metrics = Arc::clone(&self.metrics);
let metrics = self.metrics.clone();
let protocol = self.protocol.clone();

tokio::spawn(async move {
Expand Down Expand Up @@ -1533,7 +1533,7 @@ where
let response_clone = response.clone();

let utp = Arc::clone(&self.utp_socket);
let metrics = Arc::clone(&self.metrics);
let metrics = self.metrics.clone();
let protocol = self.protocol.clone();

tokio::spawn(async move {
Expand Down Expand Up @@ -1619,7 +1619,7 @@ where
async fn process_accept_utp_payload(
validator: Arc<TValidator>,
store: Arc<RwLock<TStore>>,
metrics: Arc<OverlayMetrics>,
metrics: OverlayMetrics,
kbuckets: Arc<RwLock<KBucketsTable<NodeId, Node>>>,
command_tx: UnboundedSender<OverlayCommand<TContentKey>>,
content_keys: Vec<TContentKey>,
Expand Down Expand Up @@ -1648,7 +1648,7 @@ where
// - Propagate all validated content
let validator = Arc::clone(&validator);
let store = Arc::clone(&store);
let metrics = Arc::clone(&metrics);
let metrics = metrics.clone();
let protocol = protocol.clone();
tokio::spawn(async move {
// Validated received content
Expand Down Expand Up @@ -1717,7 +1717,7 @@ where
async fn send_utp_content(
mut stream: UtpStream<crate::discovery::UtpEnr>,
content: &[u8],
metrics: Arc<OverlayMetrics>,
metrics: OverlayMetrics,
protocol: ProtocolId,
) -> anyhow::Result<()> {
match stream.write(content).await {
Expand Down Expand Up @@ -1873,7 +1873,7 @@ where
responder: Option<oneshot::Sender<RecursiveFindContentResult>>,
trace: Option<QueryTrace>,
nodes_to_poke: Vec<NodeId>,
metrics: Arc<OverlayMetrics>,
metrics: OverlayMetrics,
protocol: ProtocolId,
) {
let mut content = content;
Expand Down Expand Up @@ -2795,7 +2795,6 @@ mod tests {
let (command_tx, command_rx) = mpsc::unbounded_channel();
let (response_tx, response_rx) = mpsc::unbounded_channel();
let metrics = PORTALNET_METRICS.overlay.clone();
let metrics = Arc::new(metrics);
let validator = Arc::new(MockValidator {});

OverlayService {
Expand Down
1 change: 0 additions & 1 deletion portalnet/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ pub struct PortalStorageConfig {
pub distance_fn: DistanceFunction,
pub db: Arc<rocksdb::DB>,
pub sql_connection_pool: Pool<SqliteConnectionManager>,
// do we need arc here?
pub metrics: StorageMetrics,
}

Expand Down
1 change: 0 additions & 1 deletion portalnet/src/types/messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::path::PathBuf;
use std::{
convert::{TryFrom, TryInto},
fmt,
Expand Down
9 changes: 7 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ pub async fn run_trin(
prometheus_exporter::start(addr)?;
}

//enr_file_location: Some(node_data_dir.clone());
let portalnet_config = PortalnetConfig::new(&trin_config, private_key, &metrics_registry);
let enr_file_location = Some(node_data_dir.clone());
let portalnet_config = PortalnetConfig::new(&trin_config, private_key, enr_file_location);

// Initialize base discovery protocol
let mut discovery = Discovery::new(portalnet_config.clone())?;
let talk_req_rx = discovery.start().await?;
let discovery = Arc::new(discovery);

// Initialize prometheus metrics
if let Some(addr) = trin_config.enable_metrics_with_url {
prometheus_exporter::start(addr)?;
}

// Initialize and spawn uTP socket
let (utp_talk_reqs_tx, utp_talk_reqs_rx) = mpsc::unbounded_channel();
let discv5_utp_socket = Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_reqs_rx);
Expand Down

0 comments on commit 99e2b9b

Please sign in to comment.