From 68f5ad546403e5dae4ae3b0f722674a03d9a17ea Mon Sep 17 00:00:00 2001 From: Kolby Moroz Liebl <31669092+KolbyML@users.noreply.github.com> Date: Thu, 7 Mar 2024 12:22:47 -0700 Subject: [PATCH] refactor: deduplicate stream code in UtpController (#10) * refactor: deduplicate stream code in UtpController --- portalnet/src/overlay.rs | 7 +- portalnet/src/overlay_service.rs | 10 +- portalnet/src/utp_controller.rs | 272 ++++++++++++++----------------- 3 files changed, 135 insertions(+), 154 deletions(-) diff --git a/portalnet/src/overlay.rs b/portalnet/src/overlay.rs index 46db924b6..24f216ef6 100644 --- a/portalnet/src/overlay.rs +++ b/portalnet/src/overlay.rs @@ -540,8 +540,13 @@ where enr: Enr, conn_id: u16, ) -> Result, OverlayRequestError> { + let cid = utp_rs::cid::ConnectionId { + recv: conn_id, + send: conn_id.wrapping_add(1), + peer: UtpEnr(enr), + }; self.utp_controller - .connect_inbound_stream(conn_id, enr) + .connect_inbound_stream(cid) .await .map_err(|err| OverlayRequestError::ContentNotFound { message: format!("Unable to locate content on the network: {err:?}"), diff --git a/portalnet/src/overlay_service.rs b/portalnet/src/overlay_service.rs index 6f3d6eca4..e9ac3691c 100644 --- a/portalnet/src/overlay_service.rs +++ b/portalnet/src/overlay_service.rs @@ -871,10 +871,12 @@ where let utp_controller = self.utp_controller.clone(); tokio::spawn(async move { let trace = query_info.trace; - let data = match utp_controller - .connect_inbound_stream(connection_id, source) - .await - { + let cid = utp_rs::cid::ConnectionId { + recv: connection_id, + send: connection_id.wrapping_add(1), + peer: UtpEnr(source), + }; + let data = match utp_controller.connect_inbound_stream(cid).await { Ok(data) => data, Err(e) => { if let Some(responder) = callback { diff --git a/portalnet/src/utp_controller.rs b/portalnet/src/utp_controller.rs index f097534cc..3d65ed5e9 100644 --- a/portalnet/src/utp_controller.rs +++ b/portalnet/src/utp_controller.rs @@ -1,15 +1,13 @@ -use crate::discovery::UtpEnr; -use anyhow::anyhow; -use ethportal_api::types::enr::Enr; +use crate::{discovery::UtpEnr, overlay_service::OverlayRequestError}; use lazy_static::lazy_static; -use std::{io, sync::Arc}; +use std::sync::Arc; use tokio::sync::Semaphore; use tracing::debug; use trin_metrics::{ labels::{UtpDirectionLabel, UtpOutcomeLabel}, overlay::OverlayMetricsReporter, }; -use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket, stream::UtpStream}; +use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket}; /// UtpController is meant to be a container which contains all code related to/for managing uTP /// streams We are implementing this because we want the utils of controlling uTP connection to be @@ -31,6 +29,14 @@ lazy_static! { pub static ref UTP_CONN_CFG: ConnectionConfig = ConnectionConfig { max_packet_size: 1024, ..Default::default()}; } +/// An enum for deciding to initiate the uTP connection as connecting or accepting. +/// The selection is specified in the Portal Wire spec, depending upon whether the +/// data is being transferred inbound or outbound. +enum UtpConnectionSide { + Connect, + Accept, +} + impl UtpController { pub fn new( utp_transfer_limit: usize, @@ -45,52 +51,90 @@ impl UtpController { } } - /// Connect with a peer given the connection id, and return the data received from the peer. - pub async fn connect_inbound_stream( + async fn inbound_stream( &self, - connection_id: u16, - peer: Enr, - ) -> anyhow::Result> { + cid: ConnectionId, + side: UtpConnectionSide, + ) -> anyhow::Result, OverlayRequestError> { + // Wait for an incoming connection with the given CID. Then, read the data from the uTP + // stream. self.metrics .report_utp_active_inc(UtpDirectionLabel::Inbound); - let cid = utp_rs::cid::ConnectionId { - recv: connection_id, - send: connection_id.wrapping_add(1), - peer: UtpEnr(peer), + let (stream, message) = match side { + UtpConnectionSide::Connect => ( + self.utp_socket + .connect_with_cid(cid.clone(), *UTP_CONN_CFG) + .await, + "connect inbound uTP stream", + ), + UtpConnectionSide::Accept => ( + self.utp_socket + .accept_with_cid(cid.clone(), *UTP_CONN_CFG) + .await, + "accept inbound uTP stream", + ), }; - let stream = match self.connect_with_cid(cid.clone(), *UTP_CONN_CFG).await { + let mut stream = match stream { Ok(stream) => stream, Err(err) => { self.metrics.report_utp_outcome( UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedConnection, ); - debug!( - %err, - cid.send, - cid.recv, - peer = ?cid.peer.client(), - "Unable to establish uTP conn based on Content response", - ); - return Err(anyhow!("unable to establish utp conn")); + debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "unable to {message}"); + return Err(OverlayRequestError::ContentNotFound { + message: format!( + "Unable to locate content on the network: unable to {message}" + ), + utp: true, + trace: None, + }); } }; - // receive_utp_content handles metrics reporting of successful & failed rx - match Self::receive_utp_content(stream, self.metrics.clone()).await { - Ok(data) => Ok(data), - Err(err) => { - debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from uTP stream, while handling a FindContent request."); - Err(anyhow!("error reading data from uTP stream")) - } + let mut data = vec![]; + if let Err(err) = stream.read_to_eof(&mut data).await { + self.metrics + .report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx); + debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from {message}"); + return Err(OverlayRequestError::ContentNotFound { + message: format!( + "Unable to locate content on the network: error reading data from {message}" + ), + utp: true, + trace: None, + }); } + + // report utp tx as successful, even if we go on to fail to process the payload + self.metrics + .report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success); + Ok(data) } - /// Connect with a peer given the connection id, and transfer the data to the peer. - pub async fn connect_outbound_stream(&self, cid: ConnectionId, data: Vec) -> bool { + async fn outbound_stream( + &self, + cid: ConnectionId, + data: Vec, + side: UtpConnectionSide, + ) -> bool { self.metrics .report_utp_active_inc(UtpDirectionLabel::Outbound); - let stream = match self.connect_with_cid(cid.clone(), *UTP_CONN_CFG).await { + let (stream, message) = match side { + UtpConnectionSide::Connect => ( + self.utp_socket + .connect_with_cid(cid.clone(), *UTP_CONN_CFG) + .await, + "outbound connect with cid", + ), + UtpConnectionSide::Accept => ( + self.utp_socket + .accept_with_cid(cid.clone(), *UTP_CONN_CFG) + .await, + "outbound accept with cid", + ), + }; + let mut stream = match stream { Ok(stream) => stream, Err(err) => { self.metrics.report_utp_outcome( @@ -102,156 +146,86 @@ impl UtpController { cid.send, cid.recv, peer = ?cid.peer.client(), - "Unable to establish uTP conn based on Accept", + "Unable to establish uTP conn based on {message}", ); return false; } }; - // send_utp_content handles metrics reporting of successful & failed txs - match Self::send_utp_content(stream, &data, self.metrics.clone()).await { - Ok(_) => true, + match stream.write(&data).await { + Ok(write_size) => { + if write_size != data.len() { + self.metrics.report_utp_outcome( + UtpDirectionLabel::Outbound, + UtpOutcomeLabel::FailedDataTx, + ); + debug!( + %cid.send, + %cid.recv, + peer = ?cid.peer.client(), + "Error sending content over uTP, in response to uTP write exited before sending all content: {write_size} bytes written, {} bytes expected", + data.len() + ); + return false; + } + } Err(err) => { + self.metrics + .report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::FailedDataTx); debug!( %err, %cid.send, %cid.recv, peer = ?cid.peer.client(), - "Error sending content over uTP, in response to ACCEPT" + "Error sending content over uTP, in response to Error writing content to uTP stream: {err}" ); - false + return false; } } - } - /// Accept an outbound stream given the connection id, and transfer the data to the peer. - pub async fn accept_outbound_stream(&self, cid: ConnectionId, data: Vec) { - self.metrics - .report_utp_active_inc(UtpDirectionLabel::Outbound); - let stream = match self.accept_with_cid(cid.clone(), *UTP_CONN_CFG).await { - Ok(stream) => stream, - Err(err) => { - self.metrics.report_utp_outcome( - UtpDirectionLabel::Outbound, - UtpOutcomeLabel::FailedConnection, - ); - debug!( - %err, - %cid.send, - %cid.recv, - peer = ?cid.peer.client(), - "unable to accept uTP stream for CID" - ); - return; - } - }; - // send_utp_content handles metrics reporting of successful & failed txs - if let Err(err) = Self::send_utp_content(stream, &data, self.metrics.clone()).await { + // close uTP connection + if let Err(err) = stream.close().await { + self.metrics + .report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::FailedShutdown); debug!( %err, %cid.send, %cid.recv, peer = ?cid.peer.client(), - "Error sending content over uTP, in response to FindContent" + "Error sending content over uTP, in response to Error closing uTP connection: {err}" ); + return false; }; - } - - /// Accept an inbound stream given the connection id, and return the data received from the - /// peer. - pub async fn accept_inbound_stream( - &self, - cid: ConnectionId, - ) -> anyhow::Result> { - // Wait for an incoming connection with the given CID. Then, read the data from the uTP - // stream. self.metrics - .report_utp_active_inc(UtpDirectionLabel::Inbound); - let stream = match self.accept_with_cid(cid.clone(), *UTP_CONN_CFG).await { - Ok(stream) => stream, - Err(_) => { - self.metrics.report_utp_outcome( - UtpDirectionLabel::Inbound, - UtpOutcomeLabel::FailedConnection, - ); - return Err(anyhow!("unable to accept uTP stream")); - } - }; - - // receive_utp_content handles metrics reporting of successful & failed rx - match Self::receive_utp_content(stream, self.metrics.clone()).await { - Ok(data) => Ok(data), - Err(_) => Err(anyhow!("error reading data from uTP stream")), - } - } - - async fn send_utp_content( - mut stream: UtpStream, - content: &[u8], - metrics: OverlayMetricsReporter, - ) -> anyhow::Result<()> { - match stream.write(content).await { - Ok(write_size) => { - if write_size != content.len() { - metrics.report_utp_outcome( - UtpDirectionLabel::Outbound, - UtpOutcomeLabel::FailedDataTx, - ); - return Err(anyhow!( - "uTP write exited before sending all content: {write_size} bytes written, {} bytes expected", - content.len() - )); - } - } - Err(err) => { - metrics - .report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::FailedDataTx); - return Err(anyhow!("Error writing content to uTP stream: {err}")); - } - } - - // close uTP connection - if let Err(err) = stream.close().await { - metrics - .report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::FailedShutdown); - return Err(anyhow!("Error closing uTP connection: {err}")); - }; - metrics.report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::Success); - Ok(()) + .report_utp_outcome(UtpDirectionLabel::Outbound, UtpOutcomeLabel::Success); + true } - async fn receive_utp_content( - mut stream: UtpStream, - metrics: OverlayMetricsReporter, - ) -> anyhow::Result> { - let mut data = vec![]; - if let Err(err) = stream.read_to_eof(&mut data).await { - metrics.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx); - return Err(anyhow!("Error reading data from uTP stream: {err}")); - } - - // report utp tx as successful, even if we go on to fail to process the payload - metrics.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::Success); - Ok(data) + pub fn cid(&self, peer: UtpEnr, is_initiator: bool) -> ConnectionId { + self.utp_socket.cid(peer, is_initiator) } - async fn connect_with_cid( + pub async fn connect_inbound_stream( &self, cid: ConnectionId, - config: ConnectionConfig, - ) -> io::Result> { - self.utp_socket.connect_with_cid(cid, config).await + ) -> anyhow::Result, OverlayRequestError> { + self.inbound_stream(cid, UtpConnectionSide::Connect).await } - async fn accept_with_cid( + pub async fn accept_inbound_stream( &self, cid: ConnectionId, - config: ConnectionConfig, - ) -> io::Result> { - self.utp_socket.accept_with_cid(cid, config).await + ) -> anyhow::Result, OverlayRequestError> { + self.inbound_stream(cid, UtpConnectionSide::Accept).await } - pub fn cid(&self, peer: UtpEnr, is_initiator: bool) -> ConnectionId { - self.utp_socket.cid(peer, is_initiator) + pub async fn connect_outbound_stream(&self, cid: ConnectionId, data: Vec) -> bool { + self.outbound_stream(cid, data, UtpConnectionSide::Connect) + .await + } + + pub async fn accept_outbound_stream(&self, cid: ConnectionId, data: Vec) -> bool { + self.outbound_stream(cid, data, UtpConnectionSide::Accept) + .await } }