Skip to content

Commit

Permalink
refactor: pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita committed Mar 7, 2024
1 parent 89486d5 commit 21784b6
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 65 deletions.
1 change: 1 addition & 0 deletions portalnet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct PortalnetConfig {
pub node_addr_cache_capacity: usize,
pub disable_poke: bool,
pub trusted_block_root: Option<String>,
// the max number of concurrent utp transfers
pub utp_transfer_limit: usize,
}

Expand Down
7 changes: 6 additions & 1 deletion portalnet/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,13 @@ where
conn_id: u16,
) -> Result<Vec<u8>, OverlayRequestError> {
self.utp_controller
.connect_inbound_stream(conn_id, enr, /* query_trace */ None)
.connect_inbound_stream(conn_id, enr)
.await
.map_err(|err| OverlayRequestError::ContentNotFound {
message: format!("Unable to locate content on the network: {err:?}"),
utp: true,
trace: None,
})
}

/// Offer is sent in order to store content to k nodes with radii that contain content-id
Expand Down
22 changes: 13 additions & 9 deletions portalnet/src/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,13 +872,21 @@ where
tokio::spawn(async move {
let trace = query_info.trace;
let data = match utp_controller
.connect_inbound_stream(connection_id, source, trace.clone())
.connect_inbound_stream(connection_id, source)
.await
{
Ok(data) => data,
Err(e) => {
if let Some(responder) = callback {
let _ = responder.send(Err(e));
let _ = responder.send(Err(
OverlayRequestError::ContentNotFound {
message: format!(
"Unable to locate content on the network: {e}"
),
utp: true,
trace,
},
));
}
return;
}
Expand Down Expand Up @@ -1148,8 +1156,7 @@ where
// over the uTP stream.
let utp = Arc::clone(&self.utp_controller);
tokio::spawn(async move {
utp.accept_outbound_stream(cid, content_key.content_id(), content)
.await;
utp.accept_outbound_stream(cid, content).await;
drop(permit);
});

Expand Down Expand Up @@ -1298,13 +1305,10 @@ where

let utp_controller = Arc::clone(&self.utp_controller);
tokio::spawn(async move {
let data = match utp_controller
.accept_inbound_stream(cid.clone(), content_keys_string.clone())
.await
{
let data = match utp_controller.accept_inbound_stream(cid.clone()).await {
Ok(data) => data,
Err(err) => {
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), content_keys = ?content_keys_string, "unable to accept uTP stream");
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), content_keys = ?content_keys_string, "unable to complete uTP transfer");
drop(permit);
return;
}
Expand Down
89 changes: 34 additions & 55 deletions portalnet/src/utp_controller.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::{discovery::UtpEnr, overlay_service::OverlayRequestError};
use crate::discovery::UtpEnr;
use anyhow::anyhow;
use ethportal_api::{
types::{enr::Enr, query_trace::QueryTrace},
utils::bytes::hex_encode,
};
use ethportal_api::types::enr::Enr;
use lazy_static::lazy_static;
use std::{io, sync::Arc};
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -53,16 +50,15 @@ impl UtpController {
&self,
connection_id: u16,
peer: Enr,
trace: Option<QueryTrace>,
) -> Result<Vec<u8>, OverlayRequestError> {
) -> anyhow::Result<Vec<u8>> {
self.metrics
.report_utp_active_inc(UtpDirectionLabel::Inbound);
let cid = utp_rs::cid::ConnectionId {
recv: connection_id,
send: connection_id.wrapping_add(1),
peer: UtpEnr(peer),
};
let mut stream = match self.connect_with_cid(cid.clone(), *UTP_CONN_CFG).await {
let stream = match self.connect_with_cid(cid.clone(), *UTP_CONN_CFG).await {
Ok(stream) => stream,
Err(err) => {
self.metrics.report_utp_outcome(
Expand All @@ -76,35 +72,18 @@ impl UtpController {
peer = ?cid.peer.client(),
"Unable to establish uTP conn based on Content response",
);
return Err(OverlayRequestError::ContentNotFound {
message:
"Unable to locate content on the network: unable to establish utp conn"
.to_string(),
utp: true,
trace,
});
return Err(anyhow!("unable to establish utp conn"));
}
};

let mut data = vec![];
if let Err(err) = stream.read_to_eof(&mut data).await {
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx);
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from uTP stream, while handling a FindContent request.");
return Err(OverlayRequestError::ContentNotFound {
message:
"Unable to locate content on the network: error reading data from utp stream"
.to_string(),
utp: true,
trace,
});
// receive_utp_content handles metrics reporting of successful & failed rx
match Self::receive_utp_content(stream, self.metrics.clone()).await {
Ok(data) => Ok(data),
Err(err) => {
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from uTP stream, while handling a FindContent request.");
Err(anyhow!("error reading data from uTP stream"))
}
}

// report utp tx as successful, even if we go on to fail to process the
// payload
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success);
Ok(data)
}

/// Connect with a peer given the connection id, and transfer the data to the peer.
Expand Down Expand Up @@ -146,12 +125,7 @@ impl UtpController {
}

/// Accept an outbound stream given the connection id, and transfer the data to the peer.
pub async fn accept_outbound_stream(
&self,
cid: ConnectionId<UtpEnr>,
content_id: [u8; 32],
data: Vec<u8>,
) {
pub async fn accept_outbound_stream(&self, cid: ConnectionId<UtpEnr>, data: Vec<u8>) {
self.metrics
.report_utp_active_inc(UtpDirectionLabel::Outbound);
let stream = match self.accept_with_cid(cid.clone(), *UTP_CONN_CFG).await {
Expand All @@ -178,7 +152,6 @@ impl UtpController {
%cid.send,
%cid.recv,
peer = ?cid.peer.client(),
content_id = %hex_encode(content_id),
"Error sending content over uTP, in response to FindContent"
);
};
Expand All @@ -189,36 +162,27 @@ impl UtpController {
pub async fn accept_inbound_stream(
&self,
cid: ConnectionId<UtpEnr>,
content_keys_string: Vec<String>,
) -> anyhow::Result<Vec<u8>> {
// Wait for an incoming connection with the given CID. Then, read the data from the uTP
// stream.
self.metrics
.report_utp_active_inc(UtpDirectionLabel::Inbound);
let mut stream = match self.accept_with_cid(cid.clone(), *UTP_CONN_CFG).await {
let stream = match self.accept_with_cid(cid.clone(), *UTP_CONN_CFG).await {
Ok(stream) => stream,
Err(err) => {
Err(_) => {
self.metrics.report_utp_outcome(
UtpDirectionLabel::Inbound,
UtpOutcomeLabel::FailedConnection,
);
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), content_keys = ?content_keys_string, "unable to accept uTP stream");
return Err(anyhow!("unable to accept uTP stream"));
}
};

let mut data = vec![];
if let Err(err) = stream.read_to_eof(&mut data).await {
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx);
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), content_keys = ?content_keys_string, "error reading data from uTP stream, while handling an Offer request.");
return Err(anyhow!("error reading data from uTP stream"));
// receive_utp_content handles metrics reporting of successful & failed rx
match Self::receive_utp_content(stream, self.metrics.clone()).await {
Ok(data) => Ok(data),
Err(_) => Err(anyhow!("error reading data from uTP stream")),
}

// report utp tx as successful, even if we go on to fail to process the payload
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success);
Ok(data)
}

async fn send_utp_content(
Expand Down Expand Up @@ -256,6 +220,21 @@ impl UtpController {
Ok(())
}

async fn receive_utp_content(
mut stream: UtpStream<UtpEnr>,
metrics: OverlayMetricsReporter,
) -> anyhow::Result<Vec<u8>> {
let mut data = vec![];
if let Err(err) = stream.read_to_eof(&mut data).await {
metrics.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx);
return Err(anyhow!("Error reading data from uTP stream: {err}"));
}

// report utp tx as successful, even if we go on to fail to process the payload
metrics.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success);
Ok(data)
}

async fn connect_with_cid(
&self,
cid: ConnectionId<UtpEnr>,
Expand Down

0 comments on commit 21784b6

Please sign in to comment.