Skip to content

Commit

Permalink
refactor: deduplicate stream code in UtpController (#10)
Browse files Browse the repository at this point in the history
* refactor: deduplicate stream code in UtpController
  • Loading branch information
KolbyML authored and njgheorghita committed Mar 7, 2024
1 parent 21784b6 commit 68f5ad5
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 154 deletions.
7 changes: 6 additions & 1 deletion portalnet/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,13 @@ where
enr: Enr,
conn_id: u16,
) -> Result<Vec<u8>, OverlayRequestError> {
let cid = utp_rs::cid::ConnectionId {
recv: conn_id,
send: conn_id.wrapping_add(1),
peer: UtpEnr(enr),
};
self.utp_controller
.connect_inbound_stream(conn_id, enr)
.connect_inbound_stream(cid)
.await
.map_err(|err| OverlayRequestError::ContentNotFound {
message: format!("Unable to locate content on the network: {err:?}"),
Expand Down
10 changes: 6 additions & 4 deletions portalnet/src/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,10 +871,12 @@ where
let utp_controller = self.utp_controller.clone();
tokio::spawn(async move {
let trace = query_info.trace;
let data = match utp_controller
.connect_inbound_stream(connection_id, source)
.await
{
let cid = utp_rs::cid::ConnectionId {
recv: connection_id,
send: connection_id.wrapping_add(1),
peer: UtpEnr(source),
};
let data = match utp_controller.connect_inbound_stream(cid).await {
Ok(data) => data,
Err(e) => {
if let Some(responder) = callback {
Expand Down
272 changes: 123 additions & 149 deletions portalnet/src/utp_controller.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use crate::discovery::UtpEnr;
use anyhow::anyhow;
use ethportal_api::types::enr::Enr;
use crate::{discovery::UtpEnr, overlay_service::OverlayRequestError};
use lazy_static::lazy_static;
use std::{io, sync::Arc};
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::debug;
use trin_metrics::{
labels::{UtpDirectionLabel, UtpOutcomeLabel},
overlay::OverlayMetricsReporter,
};
use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket, stream::UtpStream};
use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket};

/// UtpController is meant to be a container which contains all code related to/for managing uTP
/// streams We are implementing this because we want the utils of controlling uTP connection to be
Expand All @@ -31,6 +29,14 @@ lazy_static! {
pub static ref UTP_CONN_CFG: ConnectionConfig = ConnectionConfig { max_packet_size: 1024, ..Default::default()};
}

/// An enum for deciding to initiate the uTP connection as connecting or accepting.
/// The selection is specified in the Portal Wire spec, depending upon whether the
/// data is being transferred inbound or outbound.
enum UtpConnectionSide {
Connect,
Accept,
}

impl UtpController {
pub fn new(
utp_transfer_limit: usize,
Expand All @@ -45,52 +51,90 @@ impl UtpController {
}
}

/// Connect with a peer given the connection id, and return the data received from the peer.
pub async fn connect_inbound_stream(
async fn inbound_stream(
&self,
connection_id: u16,
peer: Enr,
) -> anyhow::Result<Vec<u8>> {
cid: ConnectionId<UtpEnr>,
side: UtpConnectionSide,
) -> anyhow::Result<Vec<u8>, OverlayRequestError> {
// Wait for an incoming connection with the given CID. Then, read the data from the uTP
// stream.
self.metrics
.report_utp_active_inc(UtpDirectionLabel::Inbound);
let cid = utp_rs::cid::ConnectionId {
recv: connection_id,
send: connection_id.wrapping_add(1),
peer: UtpEnr(peer),
let (stream, message) = match side {
UtpConnectionSide::Connect => (
self.utp_socket
.connect_with_cid(cid.clone(), *UTP_CONN_CFG)
.await,
"connect inbound uTP stream",
),
UtpConnectionSide::Accept => (
self.utp_socket
.accept_with_cid(cid.clone(), *UTP_CONN_CFG)
.await,
"accept inbound uTP stream",
),
};
let stream = match self.connect_with_cid(cid.clone(), *UTP_CONN_CFG).await {
let mut stream = match stream {
Ok(stream) => stream,
Err(err) => {
self.metrics.report_utp_outcome(
UtpDirectionLabel::Inbound,
UtpOutcomeLabel::FailedConnection,
);
debug!(
%err,
cid.send,
cid.recv,
peer = ?cid.peer.client(),
"Unable to establish uTP conn based on Content response",
);
return Err(anyhow!("unable to establish utp conn"));
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "unable to {message}");
return Err(OverlayRequestError::ContentNotFound {
message: format!(
"Unable to locate content on the network: unable to {message}"
),
utp: true,
trace: None,
});
}
};

// receive_utp_content handles metrics reporting of successful & failed rx
match Self::receive_utp_content(stream, self.metrics.clone()).await {
Ok(data) => Ok(data),
Err(err) => {
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from uTP stream, while handling a FindContent request.");
Err(anyhow!("error reading data from uTP stream"))
}
let mut data = vec![];
if let Err(err) = stream.read_to_eof(&mut data).await {
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx);
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from {message}");
return Err(OverlayRequestError::ContentNotFound {
message: format!(
"Unable to locate content on the network: error reading data from {message}"
),
utp: true,
trace: None,
});
}

// report utp tx as successful, even if we go on to fail to process the payload
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success);
Ok(data)
}

/// Connect with a peer given the connection id, and transfer the data to the peer.
pub async fn connect_outbound_stream(&self, cid: ConnectionId<UtpEnr>, data: Vec<u8>) -> bool {
async fn outbound_stream(
&self,
cid: ConnectionId<UtpEnr>,
data: Vec<u8>,
side: UtpConnectionSide,
) -> bool {
self.metrics
.report_utp_active_inc(UtpDirectionLabel::Outbound);
let stream = match self.connect_with_cid(cid.clone(), *UTP_CONN_CFG).await {
let (stream, message) = match side {
UtpConnectionSide::Connect => (
self.utp_socket
.connect_with_cid(cid.clone(), *UTP_CONN_CFG)
.await,
"outbound connect with cid",
),
UtpConnectionSide::Accept => (
self.utp_socket
.accept_with_cid(cid.clone(), *UTP_CONN_CFG)
.await,
"outbound accept with cid",
),
};
let mut stream = match stream {
Ok(stream) => stream,
Err(err) => {
self.metrics.report_utp_outcome(
Expand All @@ -102,156 +146,86 @@ impl UtpController {
cid.send,
cid.recv,
peer = ?cid.peer.client(),
"Unable to establish uTP conn based on Accept",
"Unable to establish uTP conn based on {message}",
);
return false;
}
};

// send_utp_content handles metrics reporting of successful & failed txs
match Self::send_utp_content(stream, &data, self.metrics.clone()).await {
Ok(_) => true,
match stream.write(&data).await {
Ok(write_size) => {
if write_size != data.len() {
self.metrics.report_utp_outcome(
UtpDirectionLabel::Outbound,
UtpOutcomeLabel::FailedDataTx,
);
debug!(
%cid.send,
%cid.recv,
peer = ?cid.peer.client(),
"Error sending content over uTP, in response to uTP write exited before sending all content: {write_size} bytes written, {} bytes expected",
data.len()
);
return false;
}
}
Err(err) => {
self.metrics
.report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::FailedDataTx);
debug!(
%err,
%cid.send,
%cid.recv,
peer = ?cid.peer.client(),
"Error sending content over uTP, in response to ACCEPT"
"Error sending content over uTP, in response to Error writing content to uTP stream: {err}"
);
false
return false;
}
}
}

/// Accept an outbound stream given the connection id, and transfer the data to the peer.
pub async fn accept_outbound_stream(&self, cid: ConnectionId<UtpEnr>, data: Vec<u8>) {
self.metrics
.report_utp_active_inc(UtpDirectionLabel::Outbound);
let stream = match self.accept_with_cid(cid.clone(), *UTP_CONN_CFG).await {
Ok(stream) => stream,
Err(err) => {
self.metrics.report_utp_outcome(
UtpDirectionLabel::Outbound,
UtpOutcomeLabel::FailedConnection,
);
debug!(
%err,
%cid.send,
%cid.recv,
peer = ?cid.peer.client(),
"unable to accept uTP stream for CID"
);
return;
}
};
// send_utp_content handles metrics reporting of successful & failed txs
if let Err(err) = Self::send_utp_content(stream, &data, self.metrics.clone()).await {
// close uTP connection
if let Err(err) = stream.close().await {
self.metrics
.report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::FailedShutdown);
debug!(
%err,
%cid.send,
%cid.recv,
peer = ?cid.peer.client(),
"Error sending content over uTP, in response to FindContent"
"Error sending content over uTP, in response to Error closing uTP connection: {err}"
);
return false;
};
}

/// Accept an inbound stream given the connection id, and return the data received from the
/// peer.
pub async fn accept_inbound_stream(
&self,
cid: ConnectionId<UtpEnr>,
) -> anyhow::Result<Vec<u8>> {
// Wait for an incoming connection with the given CID. Then, read the data from the uTP
// stream.
self.metrics
.report_utp_active_inc(UtpDirectionLabel::Inbound);
let stream = match self.accept_with_cid(cid.clone(), *UTP_CONN_CFG).await {
Ok(stream) => stream,
Err(_) => {
self.metrics.report_utp_outcome(
UtpDirectionLabel::Inbound,
UtpOutcomeLabel::FailedConnection,
);
return Err(anyhow!("unable to accept uTP stream"));
}
};

// receive_utp_content handles metrics reporting of successful & failed rx
match Self::receive_utp_content(stream, self.metrics.clone()).await {
Ok(data) => Ok(data),
Err(_) => Err(anyhow!("error reading data from uTP stream")),
}
}

async fn send_utp_content(
mut stream: UtpStream<UtpEnr>,
content: &[u8],
metrics: OverlayMetricsReporter,
) -> anyhow::Result<()> {
match stream.write(content).await {
Ok(write_size) => {
if write_size != content.len() {
metrics.report_utp_outcome(
UtpDirectionLabel::Outbound,
UtpOutcomeLabel::FailedDataTx,
);
return Err(anyhow!(
"uTP write exited before sending all content: {write_size} bytes written, {} bytes expected",
content.len()
));
}
}
Err(err) => {
metrics
.report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::FailedDataTx);
return Err(anyhow!("Error writing content to uTP stream: {err}"));
}
}

// close uTP connection
if let Err(err) = stream.close().await {
metrics
.report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::FailedShutdown);
return Err(anyhow!("Error closing uTP connection: {err}"));
};
metrics.report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::Success);
Ok(())
.report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::Success);
true
}

async fn receive_utp_content(
mut stream: UtpStream<UtpEnr>,
metrics: OverlayMetricsReporter,
) -> anyhow::Result<Vec<u8>> {
let mut data = vec![];
if let Err(err) = stream.read_to_eof(&mut data).await {
metrics.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx);
return Err(anyhow!("Error reading data from uTP stream: {err}"));
}

// report utp tx as successful, even if we go on to fail to process the payload
metrics.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success);
Ok(data)
pub fn cid(&self, peer: UtpEnr, is_initiator: bool) -> ConnectionId<UtpEnr> {
self.utp_socket.cid(peer, is_initiator)
}

async fn connect_with_cid(
pub async fn connect_inbound_stream(
&self,
cid: ConnectionId<UtpEnr>,
config: ConnectionConfig,
) -> io::Result<UtpStream<UtpEnr>> {
self.utp_socket.connect_with_cid(cid, config).await
) -> anyhow::Result<Vec<u8>, OverlayRequestError> {
self.inbound_stream(cid, UtpConnectionSide::Connect).await
}

async fn accept_with_cid(
pub async fn accept_inbound_stream(
&self,
cid: ConnectionId<UtpEnr>,
config: ConnectionConfig,
) -> io::Result<UtpStream<UtpEnr>> {
self.utp_socket.accept_with_cid(cid, config).await
) -> anyhow::Result<Vec<u8>, OverlayRequestError> {
self.inbound_stream(cid, UtpConnectionSide::Accept).await
}

pub fn cid(&self, peer: UtpEnr, is_initiator: bool) -> ConnectionId<UtpEnr> {
self.utp_socket.cid(peer, is_initiator)
pub async fn connect_outbound_stream(&self, cid: ConnectionId<UtpEnr>, data: Vec<u8>) -> bool {
self.outbound_stream(cid, data, UtpConnectionSide::Connect)
.await
}

pub async fn accept_outbound_stream(&self, cid: ConnectionId<UtpEnr>, data: Vec<u8>) -> bool {
self.outbound_stream(cid, data, UtpConnectionSide::Accept)
.await
}
}

0 comments on commit 68f5ad5

Please sign in to comment.