diff --git a/iroh-relay/src/client.rs b/iroh-relay/src/client.rs index f6b1ed4352..29654e910f 100644 --- a/iroh-relay/src/client.rs +++ b/iroh-relay/src/client.rs @@ -44,7 +44,7 @@ use url::Url; use crate::{ defaults::timeouts::*, http::{Protocol, RELAY_PATH}, - protos::relay::DerpCodec, + protos::relay::RelayCodec, KeyCache, }; @@ -699,8 +699,8 @@ impl Actor { let cache = self.key_cache.clone(); - let reader = ConnReader::Derp(FramedRead::new(reader, DerpCodec::new(cache.clone()))); - let writer = ConnWriter::Derp(FramedWrite::new(writer, DerpCodec::new(cache))); + let reader = ConnReader::Derp(FramedRead::new(reader, RelayCodec::new(cache.clone()))); + let writer = ConnWriter::Derp(FramedWrite::new(writer, RelayCodec::new(cache))); Ok((reader, writer, local_addr)) } diff --git a/iroh-relay/src/client/conn.rs b/iroh-relay/src/client/conn.rs index b37f77f6d9..5fb669fa4e 100644 --- a/iroh-relay/src/client/conn.rs +++ b/iroh-relay/src/client/conn.rs @@ -32,7 +32,7 @@ use crate::{ client::streams::{MaybeTlsStreamReader, MaybeTlsStreamWriter}, defaults::timeouts::CLIENT_RECV_TIMEOUT, protos::relay::{ - write_frame, ClientInfo, DerpCodec, Frame, MAX_PACKET_SIZE, PER_CLIENT_READ_QUEUE_DEPTH, + write_frame, ClientInfo, Frame, RelayCodec, MAX_PACKET_SIZE, PER_CLIENT_READ_QUEUE_DEPTH, PER_CLIENT_SEND_QUEUE_DEPTH, PROTOCOL_VERSION, }, }; @@ -270,12 +270,12 @@ pub struct ConnBuilder { } pub(crate) enum ConnReader { - Derp(FramedRead), + Derp(FramedRead), Ws(SplitStream, KeyCache), } pub(crate) enum ConnWriter { - Derp(FramedWrite), + Derp(FramedWrite), Ws(SplitSink), } diff --git a/iroh-relay/src/protos/relay.rs b/iroh-relay/src/protos/relay.rs index 90d419e103..eaa5004f53 100644 --- a/iroh-relay/src/protos/relay.rs +++ b/iroh-relay/src/protos/relay.rs @@ -207,11 +207,11 @@ pub(crate) async fn recv_client_key> + Un /// This is a framed protocol, using [`tokio_util::codec`] to turn the streams of bytes into /// [`Frame`]s. #[derive(Debug, Clone)] -pub(crate) struct DerpCodec { +pub(crate) struct RelayCodec { cache: KeyCache, } -impl DerpCodec { +impl RelayCodec { #[cfg(test)] pub fn test() -> Self { Self { @@ -224,7 +224,7 @@ impl DerpCodec { } } -/// The frames in the [`DerpCodec`]. +/// The frames in the [`RelayCodec`]. #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum Frame { ClientInfo { @@ -495,7 +495,7 @@ impl Frame { const HEADER_LEN: usize = 5; -impl Decoder for DerpCodec { +impl Decoder for RelayCodec { type Item = Frame; type Error = anyhow::Error; @@ -539,7 +539,7 @@ impl Decoder for DerpCodec { } } -impl Encoder for DerpCodec { +impl Encoder for RelayCodec { type Error = std::io::Error; fn encode(&mut self, frame: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { @@ -593,8 +593,8 @@ mod tests { #[tokio::test] async fn test_basic_read_write() -> anyhow::Result<()> { let (reader, writer) = tokio::io::duplex(1024); - let mut reader = FramedRead::new(reader, DerpCodec::test()); - let mut writer = FramedWrite::new(writer, DerpCodec::test()); + let mut reader = FramedRead::new(reader, RelayCodec::test()); + let mut writer = FramedWrite::new(writer, RelayCodec::test()); let expect_buf = b"hello world!"; let expected_frame = Frame::Health { @@ -613,8 +613,8 @@ mod tests { #[tokio::test] async fn test_send_recv_client_key() -> anyhow::Result<()> { let (reader, writer) = tokio::io::duplex(1024); - let mut reader = FramedRead::new(reader, DerpCodec::test()); - let mut writer = FramedWrite::new(writer, DerpCodec::test()); + let mut reader = FramedRead::new(reader, RelayCodec::test()); + let mut writer = FramedWrite::new(writer, RelayCodec::test()); let client_key = SecretKey::generate(rand::thread_rng()); let client_info = ClientInfo { @@ -814,7 +814,7 @@ mod proptests { #[test] fn frame_roundtrip(frame in frame()) { let mut buf = BytesMut::new(); - let mut codec = DerpCodec::test(); + let mut codec = RelayCodec::test(); codec.encode(frame.clone(), &mut buf).unwrap(); let decoded = codec.decode(&mut buf).unwrap().unwrap(); prop_assert_eq!(frame, decoded); @@ -831,7 +831,7 @@ mod proptests { #[test] fn broken_frame_handling(frame in frame()) { let mut buf = BytesMut::new(); - let mut codec = DerpCodec::test(); + let mut codec = RelayCodec::test(); codec.encode(frame.clone(), &mut buf).unwrap(); inject_error(&mut buf); let decoded = codec.decode(&mut buf); diff --git a/iroh-relay/src/server/actor.rs b/iroh-relay/src/server/actor.rs index bf596b54f7..9ad0bf7c80 100644 --- a/iroh-relay/src/server/actor.rs +++ b/iroh-relay/src/server/actor.rs @@ -250,7 +250,7 @@ mod tests { use super::*; use crate::{ - protos::relay::{recv_frame, DerpCodec, Frame, FrameType}, + protos::relay::{recv_frame, Frame, FrameType, RelayCodec}, server::{ client_conn::ClientConnConfig, streams::{MaybeTlsStream, RelayedStream}, @@ -260,21 +260,21 @@ mod tests { fn test_client_builder( node_id: NodeId, server_channel: mpsc::Sender, - ) -> (ClientConnConfig, Framed) { + ) -> (ClientConnConfig, Framed) { let (test_io, io) = tokio::io::duplex(1024); ( ClientConnConfig { node_id, stream: RelayedStream::Derp(Framed::new( MaybeTlsStream::Test(io), - DerpCodec::test(), + RelayCodec::test(), )), write_timeout: Duration::from_secs(1), channel_capacity: 10, rate_limit: None, server_channel, }, - Framed::new(test_io, DerpCodec::test()), + Framed::new(test_io, RelayCodec::test()), ) } diff --git a/iroh-relay/src/server/client_conn.rs b/iroh-relay/src/server/client_conn.rs index 9ef4c0ae93..cc71dde43c 100644 --- a/iroh-relay/src/server/client_conn.rs +++ b/iroh-relay/src/server/client_conn.rs @@ -518,7 +518,7 @@ mod tests { use super::*; use crate::{ client::conn, - protos::relay::{recv_frame, DerpCodec, FrameType}, + protos::relay::{recv_frame, FrameType, RelayCodec}, server::streams::MaybeTlsStream, }; @@ -530,9 +530,9 @@ mod tests { let key = SecretKey::generate(rand::thread_rng()).public(); let (io, io_rw) = tokio::io::duplex(1024); - let mut io_rw = Framed::new(io_rw, DerpCodec::test()); + let mut io_rw = Framed::new(io_rw, RelayCodec::test()); let (server_channel_s, mut server_channel_r) = mpsc::channel(10); - let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec::test())); + let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), RelayCodec::test())); let actor = Actor { stream: RateLimitedRelayedStream::unlimited(stream), @@ -670,9 +670,9 @@ mod tests { let key = SecretKey::generate(rand::thread_rng()).public(); let (io, io_rw) = tokio::io::duplex(1024); - let mut io_rw = Framed::new(io_rw, DerpCodec::test()); + let mut io_rw = Framed::new(io_rw, RelayCodec::test()); let (server_channel_s, mut server_channel_r) = mpsc::channel(10); - let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec::test())); + let stream = RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), RelayCodec::test())); println!("-- create client conn"); let actor = Actor { @@ -750,10 +750,10 @@ mod tests { // Build the rate limited stream. let (io_read, io_write) = tokio::io::duplex((LIMIT * MAX_FRAMES) as _); - let mut frame_writer = Framed::new(io_write, DerpCodec::test()); + let mut frame_writer = Framed::new(io_write, RelayCodec::test()); let stream = RelayedStream::Derp(Framed::new( MaybeTlsStream::Test(io_read), - DerpCodec::test(), + RelayCodec::test(), )); let mut stream = RateLimitedRelayedStream::new(stream, limiter); diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 09186e6cb5..e381672f57 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -234,11 +234,13 @@ mod tests { use super::*; use crate::{ - protos::relay::{recv_frame, DerpCodec, Frame, FrameType}, + protos::relay::{recv_frame, Frame, FrameType, RelayCodec}, server::streams::{MaybeTlsStream, RelayedStream}, }; - fn test_client_builder(key: NodeId) -> (ClientConnConfig, FramedRead) { + fn test_client_builder( + key: NodeId, + ) -> (ClientConnConfig, FramedRead) { let (test_io, io) = tokio::io::duplex(1024); let (server_channel, _) = mpsc::channel(10); ( @@ -246,14 +248,14 @@ mod tests { node_id: key, stream: RelayedStream::Derp(Framed::new( MaybeTlsStream::Test(io), - DerpCodec::test(), + RelayCodec::test(), )), write_timeout: Duration::from_secs(1), channel_capacity: 10, rate_limit: None, server_channel, }, - FramedRead::new(test_io, DerpCodec::test()), + FramedRead::new(test_io, RelayCodec::test()), ) } diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index 8db846a905..143016dbf8 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -30,7 +30,7 @@ use tracing::{debug, debug_span, error, info, info_span, trace, warn, Instrument use crate::{ defaults::DEFAULT_KEY_CACHE_CAPACITY, http::{Protocol, LEGACY_RELAY_PATH, RELAY_PATH, SUPPORTED_WEBSOCKET_VERSION}, - protos::relay::{recv_client_key, DerpCodec, PER_CLIENT_SEND_QUEUE_DEPTH, PROTOCOL_VERSION}, + protos::relay::{recv_client_key, RelayCodec, PER_CLIENT_SEND_QUEUE_DEPTH, PROTOCOL_VERSION}, server::{ actor::{Message, ServerActorTask}, client_conn::ClientConnConfig, @@ -504,7 +504,7 @@ impl Inner { let mut io = match protocol { Protocol::Relay => { inc!(Metrics, derp_accepts); - RelayedStream::Derp(Framed::new(io, DerpCodec::new(self.key_cache.clone()))) + RelayedStream::Derp(Framed::new(io, RelayCodec::new(self.key_cache.clone()))) } Protocol::Websocket => { inc!(Metrics, websocket_accepts); @@ -918,8 +918,8 @@ mod tests { let client_reader = MaybeTlsStreamReader::Mem(client_reader); let client_writer = MaybeTlsStreamWriter::Mem(client_writer); - let client_reader = ConnReader::Derp(FramedRead::new(client_reader, DerpCodec::test())); - let client_writer = ConnWriter::Derp(FramedWrite::new(client_writer, DerpCodec::test())); + let client_reader = ConnReader::Derp(FramedRead::new(client_reader, RelayCodec::test())); + let client_writer = ConnWriter::Derp(FramedWrite::new(client_writer, RelayCodec::test())); ( server, diff --git a/iroh-relay/src/server/streams.rs b/iroh-relay/src/server/streams.rs index 84444c84a4..f5e139c7b2 100644 --- a/iroh-relay/src/server/streams.rs +++ b/iroh-relay/src/server/streams.rs @@ -13,7 +13,7 @@ use tokio_tungstenite::{tungstenite, WebSocketStream}; use tokio_util::codec::Framed; use crate::{ - protos::relay::{DerpCodec, Frame}, + protos::relay::{Frame, RelayCodec}, KeyCache, }; @@ -22,7 +22,7 @@ use crate::{ /// The stream receives message from the client while the sink sends them to the client. #[derive(Debug)] pub(crate) enum RelayedStream { - Derp(Framed), + Derp(Framed), Ws(WebSocketStream, KeyCache), }