From f4e71fa2328f41b75f43f7573a2dea7558169591 Mon Sep 17 00:00:00 2001 From: Binarybaron Date: Sun, 24 Nov 2024 15:07:57 +0100 Subject: [PATCH] feat: listening on onion services, dialing onion services --- Cargo.toml | 24 +++- examples/ping-onion.rs | 57 +++++--- src/address.rs | 22 ++- src/lib.rs | 307 +++++++++++++++++++++++++++++++++++++---- 4 files changed, 356 insertions(+), 54 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 88a5097..727b469 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,19 +9,39 @@ repository = "https://github.com/umgefahren/libp2p-tor" authors = ["umgefahren "] [dependencies] -arti-client = { version = "0.24", default-features = false, features = ["tokio", "rustls"] } +thiserror = "1.0" +anyhow = "1.0.93" +tokio = "1.41.1" futures = "0.3" + +arti-client = { version = "0.24", default-features = false, features = ["tokio", "rustls", "onion-service-client"] } libp2p = { version = "^0.53", default-features = false, features = ["tokio", "tcp", "tls"] } + tor-rtcompat = { version = "0.24.0", features = ["tokio", "rustls"] } -tokio = { version = "1.0", features = ["macros"] } tracing = "0.1.40" +tor-hsservice = { version = "0.24.0", optional = true } +tor-cell = { version = "0.24.0", optional = true } +tor-proto = { version = "0.24.0", optional = true } +data-encoding = { version = "2.6.0", optional = true } [dev-dependencies] libp2p = { version = "0.53", default-features = false, features = ["tokio", "noise", "yamux", "ping", "macros", "tcp", "tls"] } tokio-test = "0.4.4" +tokio = { version = "1.41.1", features = ["macros"] } +tracing-subscriber = "0.2" + +[features] +listen-onion-service = [ + "arti-client/onion-service-service", + "dep:tor-hsservice", + "dep:tor-cell", + "dep:tor-proto", + "dep:data-encoding" +] [[example]] name = "ping-onion" +required-features = ["listen-onion-service"] [package.metadata.docs.rs] all-features = true diff --git a/examples/ping-onion.rs b/examples/ping-onion.rs index ab3c294..9896d7c 100644 --- a/examples/ping-onion.rs +++ b/examples/ping-onion.rs @@ -54,48 +54,67 @@ use libp2p::{ }; use libp2p_community_tor::{AddressConversion, TorTransport}; use std::error::Error; +use tor_hsservice::config::OnionServiceConfigBuilder; +/// Create a transport +/// Returns a tuple of the transport and the onion address we can instruct it to listen on async fn onion_transport( keypair: identity::Keypair, ) -> Result< - libp2p::core::transport::Boxed<(PeerId, libp2p::core::muxing::StreamMuxerBox)>, + ( + libp2p::core::transport::Boxed<(PeerId, libp2p::core::muxing::StreamMuxerBox)>, + Multiaddr, + ), Box, > { - let transport = TorTransport::bootstrapped() + let mut transport = TorTransport::bootstrapped() .await? - .with_address_conversion(AddressConversion::IpAndDns) - .boxed(); + .with_address_conversion(AddressConversion::IpAndDns); + + // We derive the nickname for the onion address from the peer id + let svg_cfg = OnionServiceConfigBuilder::default() + .nickname( + keypair + .public() + .to_peer_id() + .to_base58() + .to_ascii_lowercase() + .parse() + .unwrap(), + ) + .num_intro_points(3) + .build() + .unwrap(); + + let onion_listen_address = transport.add_onion_service(svg_cfg, 999).unwrap(); let auth_upgrade = noise::Config::new(&keypair)?; let multiplex_upgrade = yamux::Config::default(); let transport = transport + .boxed() .upgrade(Version::V1) .authenticate(auth_upgrade) .multiplex(multiplex_upgrade) .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) .boxed(); - Ok(transport) + Ok((transport, onion_listen_address)) } #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); println!("Local peer id: {local_peer_id}"); - let transport = onion_transport(local_key).await?; + let (transport, onion_listen_address) = onion_transport(local_key).await?; let mut swarm = SwarmBuilder::with_new_identity() .with_tokio() - .with_tcp( - Default::default(), - (libp2p::tls::Config::new, libp2p::noise::Config::new), - libp2p::yamux::Config::default, - ) - .unwrap() .with_other_transport(|_| transport) .unwrap() .with_behaviour(|_| Behaviour { @@ -104,18 +123,20 @@ async fn main() -> Result<(), Box> { .unwrap() .build(); - // Tell the swarm to listen on all interfaces and a random, OS-assigned - // port. - swarm - .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) - .unwrap(); - // Dial the peer identified by the multi-address given as the second // command-line argument, if any. if let Some(addr) = std::env::args().nth(1) { let remote: Multiaddr = addr.parse()?; swarm.dial(remote)?; println!("Dialed {addr}") + } else { + // TODO: We need to do this because otherwise the status of the onion service is gonna be [`Shutdown`] + // when we first poll it and then the swarm will not pull it again (?). I don't know why this is the case. + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + + // If we are not dialing, we need to listen + // Tell the swarm to listen on a specific onion address + swarm.listen_on(onion_listen_address).unwrap(); } loop { diff --git a/src/address.rs b/src/address.rs index 30c4f2a..f66fa1a 100644 --- a/src/address.rs +++ b/src/address.rs @@ -17,9 +17,8 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. - use arti_client::{DangerouslyIntoTorAddr, IntoTorAddr, TorAddr}; -use libp2p::{core::multiaddr::Protocol, Multiaddr}; +use libp2p::{core::multiaddr::Protocol, multiaddr::Onion3Addr, Multiaddr}; use std::net::SocketAddr; /// "Dangerously" extract a Tor address from the provided [`Multiaddr`]. @@ -45,22 +44,35 @@ pub fn dangerous_extract(multiaddr: &Multiaddr) -> Option { pub fn safe_extract(multiaddr: &Multiaddr) -> Option { let mut protocols = multiaddr.into_iter(); - let tor_addr = try_to_domain_and_port(&protocols.next()?, &protocols.next()?)? + let tor_addr = try_to_domain_and_port(&protocols.next()?, &protocols.next())? .into_tor_addr() .ok()?; Some(tor_addr) } +fn libp2p_onion_address_to_domain_and_port<'a>( + onion_address: &'a Onion3Addr<'_>, +) -> Option<(&'a str, u16)> { + // Here we convert from Onion3Addr to TorAddr + // We need to leak the string because it's a temporary string that would otherwise be freed + let hash = data_encoding::BASE32.encode(onion_address.hash()); + let onion_domain = format!("{}.onion", hash); + let onion_domain = Box::leak(onion_domain.into_boxed_str()); + + Some((onion_domain, onion_address.port())) +} + fn try_to_domain_and_port<'a>( maybe_domain: &'a Protocol, - maybe_port: &Protocol, + maybe_port: &Option, ) -> Option<(&'a str, u16)> { match (maybe_domain, maybe_port) { ( Protocol::Dns(domain) | Protocol::Dns4(domain) | Protocol::Dns6(domain), - Protocol::Tcp(port), + Some(Protocol::Tcp(port)), ) => Some((domain.as_ref(), *port)), + (Protocol::Onion3(domain), _) => libp2p_onion_address_to_domain_and_port(domain), _ => None, } } diff --git a/src/lib.rs b/src/lib.rs index b403d56..5e2ccc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,13 +53,31 @@ use arti_client::{TorClient, TorClientBuilder}; use futures::future::BoxFuture; +use futures::stream::BoxStream; +use libp2p::multiaddr::Protocol; use libp2p::{ core::transport::{ListenerId, TransportEvent}, Multiaddr, Transport, TransportError, }; +use std::collections::HashMap; +use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; +use std::task::{Context, Poll}; +use thiserror::Error; +use tor_hsservice::handle_rend_requests; +use tor_hsservice::status::OnionServiceStatus; +use tor_hsservice::StreamRequest; use tor_rtcompat::tokio::TokioRustlsRuntime; +// We only need these imports if the `listen-onion-service` feature is enabled +#[cfg(feature = "listen-onion-service")] +use tor_cell::relaycell::msg::{Connected, End, EndReason}; +#[cfg(feature = "listen-onion-service")] +use tor_hsservice::{HsId, OnionServiceConfig, RunningOnionService}; +#[cfg(feature = "listen-onion-service")] +use tor_proto::stream::IncomingStreamRequest; + mod address; mod provider; @@ -68,28 +86,55 @@ pub use provider::TokioTorStream; pub type TorError = arti_client::Error; -#[derive(Clone)] -pub struct TorTransport { - // client is in an Arc, because without it the [`Transport::dial`] method can't be implemented, - // due to lifetime issues. With the, eventual, stabilization of static async traits this issue - // will be resolved. - client: Arc>, - /// The used conversion mode to resolve addresses. One probably shouldn't access this directly. - /// The usage of [`TorTransport::with_address_conversion`] at construction is recommended. - pub conversion_mode: AddressConversion, +type PendingUpgrade = BoxFuture<'static, Result>; +#[cfg(feature = "listen-onion-service")] +type OnionServiceStream = BoxStream<'static, StreamRequest>; +#[cfg(feature = "listen-onion-service")] +type OnionServiceStatusStream = BoxStream<'static, OnionServiceStatus>; + +/// Struct representing an onion address we are listening on for libp2p connections. +#[cfg(feature = "listen-onion-service")] +struct TorListener { + #[allow(dead_code)] // We need to own this to keep the RunningOnionService alive + /// The onion service we are listening on + service: Arc, + /// The stream of status updates for the onion service + status_stream: OnionServiceStatusStream, + /// The stream incoming [`StreamRequest`]s + request_stream: OnionServiceStream, + + /// The port we are listening on + port: u16, + /// The onion address we are listening on + onion_address: Multiaddr, } /// Mode of address conversion. /// Refer tor [arti_client::TorAddr](https://docs.rs/arti-client/latest/arti_client/struct.TorAddr.html) for details #[derive(Debug, Clone, Copy, Hash, Default, PartialEq, Eq, PartialOrd, Ord)] pub enum AddressConversion { - /// Uses only dns for address resolution (default). + /// Uses only DNS for address resolution (default). #[default] DnsOnly, - /// uses ip and dns for addresses + /// Uses IP and DNS for addresses. IpAndDns, } +pub struct TorTransport { + pub conversion_mode: AddressConversion, + + /// The Tor client. + client: Arc>, + + /// Onion services we are listening on. + #[cfg(feature = "listen-onion-service")] + listeners: HashMap, + + /// Onion services we are running but currently not listening on + #[cfg(feature = "listen-onion-service")] + services: Vec<(Arc, OnionServiceStream)>, +} + impl TorTransport { /// Creates a new `TorClientBuilder`. /// @@ -122,10 +167,7 @@ impl TorTransport { ) -> Result { let client = Arc::new(builder.create_unbootstrapped()?); - Ok(Self { - client, - conversion_mode, - }) + Ok(Self::from_client(client, conversion_mode)) } /// Builds a `TorTransport` from an existing Arti `TorClient`. @@ -136,6 +178,10 @@ impl TorTransport { Self { client, conversion_mode, + #[cfg(feature = "listen-onion-service")] + listeners: HashMap::new(), + #[cfg(feature = "listen-onion-service")] + services: Vec::new(), } } @@ -153,25 +199,166 @@ impl TorTransport { self.conversion_mode = conversion_mode; self } + + /// Call this function to instruct the transport to listen on a specific onion address + /// You need to call this function **before** calling `listen_on` + /// + /// # Returns + /// Returns the Multiaddr of the onion address that the transport can be instructed to listen on + /// To actually listen on the address, you need to call [`listen_on`] with the returned address + #[cfg(feature = "listen-onion-service")] + pub fn add_onion_service( + &mut self, + svc_cfg: OnionServiceConfig, + port: u16, + ) -> anyhow::Result { + let (service, request_stream) = self.client.launch_onion_service(svc_cfg)?; + let request_stream = Box::pin(handle_rend_requests(request_stream)); + + let multiaddr = service + .onion_name() + .ok_or_else(|| anyhow::anyhow!("Onion service has no nickname"))? + .to_multiaddr(port); + + self.services.push((service, request_stream)); + + Ok(multiaddr) + } +} + +#[derive(Debug, Error)] +pub enum TorTransportError { + #[error(transparent)] + Client(#[from] TorError), + #[cfg(feature = "listen-onion-service")] + #[error(transparent)] + Service(#[from] tor_hsservice::ClientError), + #[cfg(feature = "listen-onion-service")] + #[error("Stream closed before receiving data")] + StreamClosed, + #[cfg(feature = "listen-onion-service")] + #[error("Stream port does not match listener port")] + StreamPortMismatch, + #[cfg(feature = "listen-onion-service")] + #[error("Onion service is broken")] + Broken, +} + +#[cfg(feature = "listen-onion-service")] +trait HsIdExt { + fn to_multiaddr(&self, port: u16) -> Multiaddr; +} + +#[cfg(feature = "listen-onion-service")] +impl HsIdExt for HsId { + /// Convert an HsId to a Multiaddr + fn to_multiaddr(&self, port: u16) -> Multiaddr { + let onion_domain = self.to_string(); + let onion_without_dot_onion = onion_domain + .split(".") + .nth(0) + .expect("Display formatting of HsId to contain .onion suffix"); + let multiaddress_string = format!("/onion3/{}:{}", onion_without_dot_onion, port); + + Multiaddr::from_str(&multiaddress_string) + .expect("A valid onion address to be convertible to a Multiaddr") + } +} + +trait StatusExt { + fn is_reachable(&self) -> bool; + fn is_broken(&self) -> bool; +} + +impl StatusExt for OnionServiceStatus { + /// Returns true if the onion service is reachable + fn is_reachable(&self) -> bool { + match self.state() { + tor_hsservice::status::State::Running => true, + tor_hsservice::status::State::DegradedReachable => true, + tor_hsservice::status::State::Bootstrapping => true, // TODO: Return false here, only enabled for testing + tor_hsservice::status::State::Shutdown => false, + tor_hsservice::status::State::DegradedUnreachable => false, + tor_hsservice::status::State::Recovering => false, + tor_hsservice::status::State::Broken => false, + // TODO: Why do we need this? + _ => false, + } + } + + fn is_broken(&self) -> bool { + matches!(self.state(), tor_hsservice::status::State::Broken) + } } impl Transport for TorTransport { type Output = TokioTorStream; - type Error = TorError; + type Error = TorTransportError; type Dial = BoxFuture<'static, Result>; - type ListenerUpgrade = futures::future::Pending>; + type ListenerUpgrade = PendingUpgrade; fn listen_on( &mut self, - _id: ListenerId, - addr: Multiaddr, + id: ListenerId, + onion_address: Multiaddr, ) -> Result<(), TransportError> { - // although this address might be supported, this is returned in order to not provoke an - // error when trying to listen on this transport. - Err(TransportError::MultiaddrNotSupported(addr)) + // If the `listen-onion-service` feature is not enabled, immediately return an error + #[cfg(not(feature = "listen-onion-service"))] + return Err(TransportError::MultiaddrNotSupported(onion_address.clone())); + + // If the address is not an onion3 address, return an error + let Some(Protocol::Onion3(address)) = onion_address.into_iter().nth(0) else { + return Err(TransportError::MultiaddrNotSupported(onion_address.clone())); + }; + + // Find the running onion service that matches the requested address + // If we find it, remove it from [`services`] and insert it into [`listeners`] + + // TODO: drain(..) is not a good idea. We should keep the services around so we can reuse them later + let Some((service, request_stream)) = self + .services + .drain(..) + .filter(|(service, _)| { + service + .onion_name() + .and_then(|name| Some(name.to_multiaddr(address.port()))) + == Some(onion_address.clone()) + }) + .next() + else { + return Err(TransportError::MultiaddrNotSupported(onion_address.clone())); + }; + + let status_stream = Box::pin(service.status_events()); + + self.listeners.insert( + id, + TorListener { + service, + request_stream, + onion_address: onion_address.clone(), + port: address.port(), + status_stream, + }, + ); + + return Ok(()); } - fn remove_listener(&mut self, _id: ListenerId) -> bool { + fn remove_listener(&mut self, id: ListenerId) -> bool { + // If the `listen-onion-service` feature is not enabled, we do not support listening + #[cfg(not(feature = "listen-onion-service"))] + return false; + + // Take the listener out of the map. This will stop listening on onion service for libp2p connections (we will not poll it anymore) + // However, we will not stop the onion service itself because we might want to reuse it later + // The onion service will be stopped when the transport is dropped + if let Some(listener) = self.listeners.remove(&id) { + self.services + .push((listener.service, listener.request_stream)); + return true; + } + false } @@ -190,7 +377,7 @@ impl Transport for TorTransport { tracing::debug!(%addr, "Established connection to peer through Tor"); - Ok(stream.into()) + Ok(TokioTorStream::from(stream)) })) } @@ -206,10 +393,72 @@ impl Transport for TorTransport { } fn poll( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - // pending is returned here because this transport doesn't support listening, yet - std::task::Poll::Pending + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // If the `listen-onion-service` feature is not enabled, we do not support listening + #[cfg(not(feature = "listen-onion-service"))] + return Poll::Pending; + + for (listener_id, listener) in self.listeners.iter_mut() { + // Check if the service has any new statuses + if let Poll::Ready(Some(status)) = listener.status_stream.as_mut().poll_next(cx) { + if status.is_reachable() { + // TODO: We might report the address here multiple time to the swarm. Is this a problem? + return Poll::Ready(TransportEvent::NewAddress { + listener_id: *listener_id, + listen_addr: listener.onion_address.clone(), + }); + } + + if status.is_broken() { + return Poll::Ready(TransportEvent::ListenerError { + listener_id: *listener_id, + error: TorTransportError::Broken, + }); + } + } + + match listener.request_stream.as_mut().poll_next(cx) { + Poll::Ready(Some(request)) => { + let port = listener.port; + let upgrade: PendingUpgrade = Box::pin(async move { + // Check if the port matches what we expect + if let IncomingStreamRequest::Begin(begin) = request.request() { + if begin.port() != port { + // Reject the connection with CONNECTREFUSED + request + .reject(End::new_with_reason(EndReason::CONNECTREFUSED)) + .await?; + + return Err(TorTransportError::StreamPortMismatch); + } + } + + // Accept the stream and forward it to the swarm + let data_stream = request.accept(Connected::new_empty()).await?; + Ok(TokioTorStream::from(data_stream)) + }); + + return Poll::Ready(TransportEvent::Incoming { + listener_id: *listener_id, + upgrade, + local_addr: listener.onion_address.clone(), + send_back_addr: listener.onion_address.clone(), + }); + } + + // The stream has ended. Most likely because the service was shut down + Poll::Ready(None) => { + return Poll::Ready(TransportEvent::ListenerClosed { + listener_id: *listener_id, + reason: Ok(()), + }); + } + Poll::Pending => {} + } + } + + Poll::Pending } }