From 0919e957bcb95686b4de3c179c9908743c43076f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Fri, 13 Dec 2024 20:10:11 -0500 Subject: [PATCH] refactor: remove magicsock / quinn::Endpoint circular dependency `MagicSock::spawn` creates a `quinn::Endpoint`. `MagicSock` is now an `AsyncUdpSocket`, and can be passed into the `quinn::Endpoint`. `magicsock::Handle` now owns the `quinn::Endpoint`, and `iroh::Endpoint` interacts with the `quinn::Endpoint` through `Handle::endpoint()`. This allows us to pass the `quinn::Endpoint` to the `magicsock::Actor` for use in QAD, without any circular dependencies. --- iroh/examples/listen.rs | 2 +- iroh/src/endpoint.rs | 49 ++------- iroh/src/magicsock.rs | 228 ++++++++++++++++++++++------------------ 3 files changed, 139 insertions(+), 140 deletions(-) diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index a27b5f3067..17426f8d54 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -30,7 +30,7 @@ async fn main() -> anyhow::Result<()> { // Use `RelayMode::Custom` to pass in a `RelayMap` with custom relay urls. // Use `RelayMode::Disable` to disable holepunching and relaying over HTTPS // If you want to experiment with relaying using your own relay server, you must pass in the same custom relay url to both the `listen` code AND the `connect` code - .relay_mode(RelayMode::Default) + .relay_mode(RelayMode::Staging) // you can choose a port to bind to, but passing in `0` will bind the socket to a random available port .bind() .await?; diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 2ff5031c6c..94d5808965 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -149,6 +149,8 @@ impl Builder { 1 => Some(discovery.into_iter().next().unwrap()), _ => Some(Box::new(ConcurrentDiscovery::from_services(discovery))), }; + let server_config = static_config.create_server_config(self.alpn_protocols)?; + let msock_opts = magicsock::Options { addr_v4: self.addr_v4, addr_v6: self.addr_v6, @@ -158,10 +160,11 @@ impl Builder { discovery, proxy_url: self.proxy_url, dns_resolver, + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, }; - Endpoint::bind(static_config, msock_opts, self.alpn_protocols).await + Endpoint::bind(static_config, msock_opts).await } // # The very common methods everyone basically needs. @@ -414,7 +417,6 @@ impl Builder { self } } - /// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime. #[derive(Debug)] struct StaticConfig { @@ -425,7 +427,7 @@ struct StaticConfig { impl StaticConfig { /// Create a [`quinn::ServerConfig`] with the specified ALPN protocols. - fn create_server_config(&self, alpn_protocols: Vec>) -> Result { + fn create_server_config(&self, alpn_protocols: Vec>) -> Result { let server_config = make_server_config( &self.secret_key, alpn_protocols, @@ -478,7 +480,6 @@ pub fn make_server_config( #[derive(Clone, Debug)] pub struct Endpoint { msock: Handle, - endpoint: quinn::Endpoint, rtt_actor: Arc, cancel_token: CancellationToken, static_config: Arc, @@ -501,40 +502,17 @@ impl Endpoint { /// This is for internal use, the public interface is the [`Builder`] obtained from /// [Self::builder]. See the methods on the builder for documentation of the parameters. #[instrument("ep", skip_all, fields(me = %static_config.secret_key.public().fmt_short()))] - async fn bind( - static_config: StaticConfig, - msock_opts: magicsock::Options, - initial_alpns: Vec>, - ) -> Result { + async fn bind(static_config: StaticConfig, msock_opts: magicsock::Options) -> Result { let msock = magicsock::MagicSock::spawn(msock_opts).await?; trace!("created magicsock"); - - let server_config = static_config.create_server_config(initial_alpns)?; - - let mut endpoint_config = quinn::EndpointConfig::default(); - // Setting this to false means that quinn will ignore packets that have the QUIC fixed bit - // set to 0. The fixed bit is the 3rd bit of the first byte of a packet. - // For performance reasons and to not rewrite buffers we pass non-QUIC UDP packets straight - // through to quinn. We set the first byte of the packet to zero, which makes quinn ignore - // the packet if grease_quic_bit is set to false. - endpoint_config.grease_quic_bit(false); - - let endpoint = quinn::Endpoint::new_with_abstract_socket( - endpoint_config, - Some(server_config), - Arc::new(msock.clone()), - Arc::new(quinn::TokioRuntime), - )?; trace!("created quinn endpoint"); debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created"); let ep = Self { msock: msock.clone(), - endpoint: endpoint.clone(), rtt_actor: Arc::new(rtt_actor::RttHandle::new()), cancel_token: CancellationToken::new(), static_config: Arc::new(static_config), }; - msock.set_quic_endpoint(Some(endpoint)); Ok(ep) } @@ -544,7 +522,7 @@ impl Endpoint { /// Note that this *overrides* the current list of ALPNs. pub fn set_alpns(&self, alpns: Vec>) -> Result<()> { let server_config = self.static_config.create_server_config(alpns)?; - self.endpoint.set_server_config(Some(server_config)); + self.msock.endpoint().set_server_config(Some(server_config)); Ok(()) } @@ -648,7 +626,8 @@ impl Endpoint { // TODO: We'd eventually want to replace "localhost" with something that makes more sense. let connect = self - .endpoint + .msock + .endpoint() .connect_with(client_config, addr.0, "localhost")?; let connection = connect @@ -678,7 +657,7 @@ impl Endpoint { /// [`Endpoint::close`]. pub fn accept(&self) -> Accept<'_> { Accept { - inner: self.endpoint.accept(), + inner: self.msock.endpoint().accept(), ep: self.clone(), } } @@ -958,13 +937,7 @@ impl Endpoint { if self.is_closed() { return Ok(()); } - self.cancel_token.cancel(); - tracing::debug!("Closing connections"); - self.endpoint.close(0u16.into(), b""); - self.endpoint.wait_idle().await; - - tracing::debug!("Connections closed"); self.msock.close().await?; Ok(()) } @@ -1049,7 +1022,7 @@ impl Endpoint { } #[cfg(test)] pub(crate) fn endpoint(&self) -> &quinn::Endpoint { - &self.endpoint + self.msock.endpoint() } } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 737651c38a..9291c1277b 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -39,7 +39,7 @@ use iroh_metrics::{inc, inc_by}; use iroh_relay::protos::stun; use net_report::{Options as ReportOptions, ProbeProtocol, QuicConfig}; use netwatch::{interfaces, ip::LocalAddresses, netmon, UdpSocket}; -use quinn::AsyncUdpSocket; +use quinn::{AsyncUdpSocket, ServerConfig}; use rand::{seq::SliceRandom, Rng, SeedableRng}; use smallvec::{smallvec, SmallVec}; use tokio::{ @@ -124,6 +124,9 @@ pub(crate) struct Options { /// Proxy configuration. pub(crate) proxy_url: Option, + /// ServerConfig for the internal QUIC endpoint + pub(crate) server_config: ServerConfig, + /// Skip verification of SSL certificates from relay servers /// /// May only be used in tests. @@ -133,21 +136,34 @@ pub(crate) struct Options { impl Default for Options { fn default() -> Self { + let secret_key = SecretKey::generate(); + let server_config = make_default_server_config(&secret_key); Options { addr_v4: None, addr_v6: None, - secret_key: SecretKey::generate(), + secret_key, relay_map: RelayMap::empty(), node_map: None, discovery: None, proxy_url: None, dns_resolver: crate::dns::default_resolver().clone(), + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, } } } +/// Generate a server config with no ALPNS and a default +/// transport configuration +fn make_default_server_config(secret_key: &SecretKey) -> ServerConfig { + let quic_server_config = crate::tls::make_server_config(secret_key, vec![], false) + .expect("should generate valid config"); + let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config)); + server_config.transport_config(Arc::new(quinn::TransportConfig::default())); + server_config +} + /// Contents of a relay message. Use a SmallVec to avoid allocations for the very /// common case of a single packet. type RelayContents = SmallVec<[Bytes; 1]>; @@ -161,6 +177,8 @@ pub(crate) struct Handle { msock: Arc, // Empty when closed actor_tasks: Arc>>, + // quinn endpoint + endpoint: quinn::Endpoint, } /// Iroh connectivity layer. @@ -226,10 +244,6 @@ pub(crate) struct MagicSock { pconn6: Option, /// NetReport client net_reporter: net_report::Addr, - /// Handle to the underlying quinn::Endpoint. - /// - /// Used in netcheck for QUIC address discovery. - quic_endpoint: Arc>>, /// The state for an active DiscoKey. disco_secrets: DiscoSecrets, @@ -281,16 +295,6 @@ impl MagicSock { self.my_relay.replace(my_relay) } - /// Sets the internal `quinn::Endpoint` that is used for QUIC address - /// discovery. - pub(crate) fn set_quic_endpoint(&self, endpoint: Option) { - let mut ep = self - .quic_endpoint - .write() - .expect("MagicSock::endpoint RwLock is poisoned"); - *ep = endpoint; - } - fn is_closing(&self) -> bool { self.closing.load(Ordering::Relaxed) } @@ -447,32 +451,6 @@ impl MagicSock { Ok(addr) } - fn create_io_poller(&self) -> Pin> { - // To do this properly the MagicSock would need a registry of pollers. For each - // node we would look up the poller or create one. Then on each try_send we can - // look up the correct poller and configure it to poll the paths it needs. - // - // Note however that the current quinn impl calls UdpPoller::poll_writable() - // **before** it calls try_send(), as opposed to how it is documented. That is a - // problem as we would not yet know the path that needs to be polled. To avoid such - // ambiguity the API could be changed to a .poll_send(&self, cx: &mut Context, - // io_poller: Pin<&mut dyn UdpPoller>, transmit: &Transmit) -> Poll> - // instead of the existing .try_send() because then we would have control over this. - // - // Right now however we have one single poller behaving the same for each - // connection. It checks all paths and returns Poll::Ready as soon as any path is - // ready. - let ipv4_poller = self.pconn4.create_io_poller(); - let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller()); - let relay_sender = self.relay_actor_sender.clone(); - Box::pin(IoPoller { - ipv4_poller, - ipv6_poller, - relay_sender, - relay_send_waker: self.relay_send_waker.clone(), - }) - } - /// Implementation for AsyncUdpSocket::try_send #[instrument(skip_all)] fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> { @@ -1571,6 +1549,7 @@ impl Handle { discovery, dns_resolver, proxy_url, + server_config, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify, } = opts; @@ -1633,9 +1612,23 @@ impl Handle { dns_resolver, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify, - quic_endpoint: Arc::new(RwLock::new(None)), }); + let mut endpoint_config = quinn::EndpointConfig::default(); + // Setting this to false means that quinn will ignore packets that have the QUIC fixed bit + // set to 0. The fixed bit is the 3rd bit of the first byte of a packet. + // For performance reasons and to not rewrite buffers we pass non-QUIC UDP packets straight + // through to quinn. We set the first byte of the packet to zero, which makes quinn ignore + // the packet if grease_quic_bit is set to false. + endpoint_config.grease_quic_bit(false); + + let endpoint = quinn::Endpoint::new_with_abstract_socket( + endpoint_config, + Some(server_config), + inner.clone(), + Arc::new(quinn::TokioRuntime), + )?; + let mut actor_tasks = JoinSet::default(); let relay_actor = RelayActor::new(inner.clone(), relay_datagrams_queue); @@ -1658,6 +1651,7 @@ impl Handle { let inner2 = inner.clone(); let network_monitor = netmon::Monitor::new().await?; + let qad_endpoint = endpoint.clone(); actor_tasks.spawn( async move { let actor = Actor { @@ -1674,6 +1668,7 @@ impl Handle { no_v4_send: false, net_reporter, network_monitor, + qad_endpoint, }; if let Err(err) = actor.run().await { @@ -1686,11 +1681,17 @@ impl Handle { let c = Handle { msock: inner, actor_tasks: Arc::new(Mutex::new(actor_tasks)), + endpoint, }; Ok(c) } + /// The underlying [`quinn::Endpoint`] + pub fn endpoint(&self) -> &quinn::Endpoint { + &self.endpoint + } + /// Closes the connection. /// /// Only the first close does anything. Any later closes return nil. @@ -1698,6 +1699,10 @@ impl Handle { /// indefinitely after this call. #[instrument(skip_all, fields(me = %self.msock.me))] pub(crate) async fn close(&self) -> Result<()> { + tracing::debug!("Closing connections"); + self.endpoint.close(0u16.into(), b""); + self.endpoint.wait_idle().await; + tracing::debug!("Connections closed"); if self.msock.is_closed() { return Ok(()); } @@ -1850,13 +1855,35 @@ impl RelayDatagramsQueue { } } -impl AsyncUdpSocket for Handle { +impl AsyncUdpSocket for MagicSock { fn create_io_poller(self: Arc) -> Pin> { - self.msock.create_io_poller() + // To do this properly the MagicSock would need a registry of pollers. For each + // node we would look up the poller or create one. Then on each try_send we can + // look up the correct poller and configure it to poll the paths it needs. + // + // Note however that the current quinn impl calls UdpPoller::poll_writable() + // **before** it calls try_send(), as opposed to how it is documented. That is a + // problem as we would not yet know the path that needs to be polled. To avoid such + // ambiguity the API could be changed to a .poll_send(&self, cx: &mut Context, + // io_poller: Pin<&mut dyn UdpPoller>, transmit: &Transmit) -> Poll> + // instead of the existing .try_send() because then we would have control over this. + // + // Right now however we have one single poller behaving the same for each + // connection. It checks all paths and returns Poll::Ready as soon as any path is + // ready. + let ipv4_poller = self.pconn4.create_io_poller(); + let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller()); + let relay_sender = self.relay_actor_sender.clone(); + Box::pin(IoPoller { + ipv4_poller, + ipv6_poller, + relay_sender, + relay_send_waker: self.relay_send_waker.clone(), + }) } fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> { - self.msock.try_send(transmit) + self.try_send(transmit) } /// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely. @@ -1866,11 +1893,11 @@ impl AsyncUdpSocket for Handle { bufs: &mut [io::IoSliceMut<'_>], metas: &mut [quinn_udp::RecvMeta], ) -> Poll> { - self.msock.poll_recv(cx, bufs, metas) + self.poll_recv(cx, bufs, metas) } fn local_addr(&self) -> io::Result { - match &*self.msock.local_addrs.read().expect("not poisoned") { + match &*self.local_addrs.read().expect("not poisoned") { (ipv4, None) => { // Pretend to be IPv6, because our QuinnMappedAddrs // need to be IPv6. @@ -1990,6 +2017,11 @@ struct Actor { net_reporter: net_report::Client, network_monitor: netmon::Monitor, + + /// The internal quinn::Endpoint + /// + /// Needed for Quic Address Discovery + qad_endpoint: quinn::Endpoint, } impl Actor { @@ -2401,27 +2433,23 @@ impl Actor { let pconn4 = Some(self.pconn4.clone()); let pconn6 = self.pconn6.clone(); - let quic_endpoint = self.msock.quic_endpoint.read().expect("poisoned").clone(); - - let quic_config = match quic_endpoint { - Some(ep) => { - // Need to add Quic Mapped Addrs for the relay nodes to use for - // QUIC Address Discovery - let mapped_addrs = self.resolve_qad_addrs(Duration::from_millis(500)).await; - let root_store = rustls::RootCertStore::from_iter( - webpki_roots::TLS_SERVER_ROOTS.iter().cloned(), - ); - let client_config = rustls::ClientConfig::builder() - .with_root_certificates(root_store) - .with_no_client_auth(); - Some(QuicConfig { - ep, - client_config, - mapped_addrs, - }) - } - None => None, - }; + // Need to add Quic Mapped Addrs for the relay nodes to use for + // QUIC Address Discovery + let mapped_addrs = self + .resolve_qad_addrs(std::time::Duration::from_millis(500)) + .await; + // create a client config for the endpoint to + // use for QUIC address discovery + let root_store = + rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + let client_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + let quic_config = Some(QuicConfig { + ep: self.qad_endpoint.clone(), + client_config, + mapped_addrs, + }); debug!("requesting net_report report"); match self @@ -3945,7 +3973,13 @@ mod tests { /// /// Use [`magicsock_connect`] to establish connections. #[instrument(name = "ep", skip_all, fields(me = secret_key.public().fmt_short()))] - async fn magicsock_ep(secret_key: SecretKey) -> anyhow::Result<(quinn::Endpoint, Handle)> { + async fn magicsock_ep(secret_key: SecretKey) -> anyhow::Result { + let server_config = crate::endpoint::make_server_config( + &secret_key, + vec![ALPN.to_vec()], + Arc::new(quinn::TransportConfig::default()), + true, + )?; let opts = Options { addr_v4: None, addr_v6: None, @@ -3955,24 +3989,11 @@ mod tests { discovery: None, dns_resolver: crate::dns::default_resolver().clone(), proxy_url: None, + server_config, insecure_skip_relay_cert_verify: true, }; let msock = MagicSock::spawn(opts).await?; - let server_config = crate::endpoint::make_server_config( - &secret_key, - vec![ALPN.to_vec()], - Arc::new(quinn::TransportConfig::default()), - true, - )?; - let mut endpoint_config = quinn::EndpointConfig::default(); - endpoint_config.grease_quic_bit(false); - let endpoint = quinn::Endpoint::new_with_abstract_socket( - endpoint_config, - Some(server_config), - Arc::new(msock.clone()), - Arc::new(quinn::TokioRuntime), - )?; - Ok((endpoint, msock)) + Ok(msock) } /// Connects from `ep` returned by [`magicsock_ep`] to the `node_id`. @@ -3989,7 +4010,7 @@ mod tests { let mut transport_config = quinn::TransportConfig::default(); transport_config.keep_alive_interval(Some(Duration::from_secs(1))); - magicsock_connet_with_transport_config( + magicsock_connect_with_transport_config( ep, ep_secret_key, addr, @@ -4005,7 +4026,7 @@ mod tests { /// /// Uses [`ALPN`], `node_id`, must match `addr`. #[instrument(name = "connect", skip_all, fields(me = ep_secret_key.public().fmt_short()))] - async fn magicsock_connet_with_transport_config( + async fn magicsock_connect_with_transport_config( ep: &quinn::Endpoint, ep_secret_key: SecretKey, addr: QuicMappedAddr, @@ -4034,7 +4055,7 @@ mod tests { let secret_key_missing_node = SecretKey::from_bytes(&[255u8; 32]); let node_id_missing_node = secret_key_missing_node.public(); - let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap(); + let msock_1 = magicsock_ep(secret_key_1.clone()).await.unwrap(); // Generate an address not present in the NodeMap. let bad_addr = QuicMappedAddr::generate(); @@ -4045,14 +4066,19 @@ mod tests { // this speeds up the test. let res = tokio::time::timeout( Duration::from_millis(500), - magicsock_connect(&ep_1, secret_key_1.clone(), bad_addr, node_id_missing_node), + magicsock_connect( + msock_1.endpoint(), + secret_key_1.clone(), + bad_addr, + node_id_missing_node, + ), ) .await; assert!(res.is_err(), "expecting timeout"); // Now check we can still create another connection with this endpoint. - let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap(); - + let msock_2 = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let ep_2 = msock_2.endpoint().clone(); // This needs an accept task let accept_task = tokio::spawn({ async fn accept(ep: quinn::Endpoint) -> Result<()> { @@ -4064,7 +4090,6 @@ mod tests { info!("accept finished"); Ok(()) } - let ep_2 = ep_2.clone(); async move { if let Err(err) = accept(ep_2).await { error!("{err:#}"); @@ -4097,7 +4122,7 @@ mod tests { let addr = msock_1.get_mapping_addr(node_id_2).unwrap(); let res = tokio::time::timeout( Duration::from_secs(10), - magicsock_connect(&ep_1, secret_key_1.clone(), addr, node_id_2), + magicsock_connect(msock_1.endpoint(), secret_key_1.clone(), addr, node_id_2), ) .await .expect("timeout while connecting"); @@ -4119,8 +4144,9 @@ mod tests { let secret_key_2 = SecretKey::from_bytes(&[2u8; 32]); let node_id_2 = secret_key_2.public(); - let (ep_1, msock_1) = magicsock_ep(secret_key_1.clone()).await.unwrap(); - let (ep_2, msock_2) = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let msock_1 = magicsock_ep(secret_key_1.clone()).await.unwrap(); + let msock_2 = magicsock_ep(secret_key_2.clone()).await.unwrap(); + let ep_2 = msock_2.endpoint().clone(); // We need a task to accept the connection. let accept_task = tokio::spawn({ @@ -4132,7 +4158,6 @@ mod tests { info!("accept finished"); Ok(()) } - let ep_2 = ep_2.clone(); async move { if let Err(err) = accept(ep_2).await { error!("{err:#}"); @@ -4167,8 +4192,8 @@ mod tests { // little slower though. let mut transport_config = quinn::TransportConfig::default(); transport_config.max_idle_timeout(Some(Duration::from_millis(200).try_into().unwrap())); - let res = magicsock_connet_with_transport_config( - &ep_1, + let res = magicsock_connect_with_transport_config( + msock_1.endpoint(), secret_key_1.clone(), addr_2, node_id_2, @@ -4200,9 +4225,10 @@ mod tests { // We can now connect tokio::time::timeout(Duration::from_secs(10), async move { info!("establishing new connection"); - let conn = magicsock_connect(&ep_1, secret_key_1.clone(), addr_2, node_id_2) - .await - .unwrap(); + let conn = + magicsock_connect(msock_1.endpoint(), secret_key_1.clone(), addr_2, node_id_2) + .await + .unwrap(); info!("have connection"); let mut stream = conn.open_uni().await.unwrap(); stream.write_all(b"hello").await.unwrap();