Skip to content

Commit

Permalink
refactor: remove OverlayServiceError from UtpController
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita committed Mar 7, 2024
1 parent 68f5ad5 commit 502e156
Showing 1 changed file with 21 additions and 33 deletions.
54 changes: 21 additions & 33 deletions portalnet/src/utp_controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{discovery::UtpEnr, overlay_service::OverlayRequestError};
use crate::discovery::UtpEnr;
use anyhow::anyhow;
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::sync::Semaphore;
Expand All @@ -8,7 +9,6 @@ use trin_metrics::{
overlay::OverlayMetricsReporter,
};
use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket};

/// UtpController is meant to be a container which contains all code related to/for managing uTP
/// streams We are implementing this because we want the utils of controlling uTP connection to be
/// as contained as it can, instead of extending overlay_service even more.
Expand Down Expand Up @@ -55,7 +55,7 @@ impl UtpController {
&self,
cid: ConnectionId<UtpEnr>,
side: UtpConnectionSide,
) -> anyhow::Result<Vec<u8>, OverlayRequestError> {
) -> anyhow::Result<Vec<u8>> {
// Wait for an incoming connection with the given CID. Then, read the data from the uTP
// stream.
self.metrics
Expand All @@ -74,37 +74,25 @@ impl UtpController {
"accept inbound uTP stream",
),
};
let mut stream = match stream {
Ok(stream) => stream,
Err(err) => {
self.metrics.report_utp_outcome(
UtpDirectionLabel::Inbound,
UtpOutcomeLabel::FailedConnection,
);
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "unable to {message}");
return Err(OverlayRequestError::ContentNotFound {
message: format!(
"Unable to locate content on the network: unable to {message}"
),
utp: true,
trace: None,
});
}
};
let mut stream = stream.map_err(|err| {
self.metrics.report_utp_outcome(
UtpDirectionLabel::Inbound,
UtpOutcomeLabel::FailedConnection,
);
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "unable to {message}");
anyhow!("Unable to locate content on the network: unable to {message}")
})?;

let mut data = vec![];
if let Err(err) = stream.read_to_eof(&mut data).await {
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx);
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from {message}");
return Err(OverlayRequestError::ContentNotFound {
message: format!(
stream.read_to_eof(&mut data).await
.map_err(|err| {
self.metrics
.report_utp_outcome(UtpDirectionLabel::Inbound, UtpOutcomeLabel::FailedDataTx);
debug!(%err, cid.send, cid.recv, peer = ?cid.peer.client(), "error reading data from {message}");
anyhow!(
"Unable to locate content on the network: error reading data from {message}"
),
utp: true,
trace: None,
});
}
)
})?;

// report utp tx as successful, even if we go on to fail to process the payload
self.metrics
Expand Down Expand Up @@ -208,14 +196,14 @@ impl UtpController {
pub async fn connect_inbound_stream(
&self,
cid: ConnectionId<UtpEnr>,
) -> anyhow::Result<Vec<u8>, OverlayRequestError> {
) -> anyhow::Result<Vec<u8>> {
self.inbound_stream(cid, UtpConnectionSide::Connect).await
}

pub async fn accept_inbound_stream(
&self,
cid: ConnectionId<UtpEnr>,
) -> anyhow::Result<Vec<u8>, OverlayRequestError> {
) -> anyhow::Result<Vec<u8>> {
self.inbound_stream(cid, UtpConnectionSide::Accept).await
}

Expand Down

0 comments on commit 502e156

Please sign in to comment.