From 738c7730df2af03747ec4c2c8a51b5da2e173733 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 16 Dec 2024 11:49:54 +0100 Subject: [PATCH] refactor(iroh): remove genawaiter usage from dht discovery (#3048) ## Description One less dependency in the core of iroh ## Breaking Changes ## Notes & open questions ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --- Cargo.lock | 75 ------------- deny.toml | 1 - iroh/Cargo.toml | 5 +- iroh/src/discovery/pkarr/dht.rs | 181 ++++++++++++++++---------------- 4 files changed, 91 insertions(+), 171 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f02841fb4..a76eeec8fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1349,37 +1349,6 @@ dependencies = [ "slab", ] -[[package]] -name = "genawaiter" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" -dependencies = [ - "futures-core", - "genawaiter-macro", - "genawaiter-proc-macro", - "proc-macro-hack", -] - -[[package]] -name = "genawaiter-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" - -[[package]] -name = "genawaiter-proc-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" -dependencies = [ - "proc-macro-error", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "generator" version = "0.7.5" @@ -2101,7 +2070,6 @@ dependencies = [ "futures-lite 2.5.0", "futures-sink", "futures-util", - "genawaiter", "governor 0.7.0", "hickory-resolver", "hostname 0.4.0", @@ -3440,38 +3408,6 @@ dependencies = [ "toml_edit", ] -[[package]] -name = "proc-macro-error" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", - "syn-mid", - "version_check", -] - -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.92" @@ -4533,17 +4469,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn-mid" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea305d57546cc8cd04feb14b62ec84bf17f50e3f7b12560d7bfa9265f39d9ed" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "sync_wrapper" version = "1.0.2" diff --git a/deny.toml b/deny.toml index dd450c447f..9c11d21ce3 100644 --- a/deny.toml +++ b/deny.toml @@ -24,6 +24,5 @@ license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }] [advisories] ignore = [ - "RUSTSEC-2024-0370", # unmaintained, no upgrade available "RUSTSEC-2024-0384", # unmaintained, no upgrade available ] diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 3bc84abd7c..96ef553fa5 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -115,9 +115,6 @@ iroh-metrics = { version = "0.29", default-features = false } # local-swarm-discovery swarm-discovery = { version = "0.3.0-alpha.1", optional = true } -# dht_discovery -genawaiter = { version = "0.99", features = ["futures03"], optional = true } - # Examples clap = { version = "4", features = ["derive"], optional = true } tracing-subscriber = { version = "0.3", features = [ @@ -179,7 +176,7 @@ default = ["metrics", "discovery-pkarr-dht"] metrics = ["iroh-metrics/metrics", "iroh-relay/metrics", "net-report/metrics", "portmapper/metrics"] test-utils = ["iroh-relay/test-utils", "iroh-relay/server", "dep:axum"] discovery-local-network = ["dep:swarm-discovery"] -discovery-pkarr-dht = ["pkarr/dht", "dep:genawaiter"] +discovery-pkarr-dht = ["pkarr/dht"] examples = [ "dep:clap", "dep:tracing-subscriber", diff --git a/iroh/src/discovery/pkarr/dht.rs b/iroh/src/discovery/pkarr/dht.rs index 63ca19a0cb..2729a57270 100644 --- a/iroh/src/discovery/pkarr/dht.rs +++ b/iroh/src/discovery/pkarr/dht.rs @@ -12,12 +12,15 @@ use std::{ time::Duration, }; -use futures_lite::{stream::Boxed, StreamExt}; -use genawaiter::sync::{Co, Gen}; +use anyhow::Result; +use futures_lite::{ + stream::{Boxed, StreamExt}, + FutureExt, +}; use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey}; use pkarr::{ - PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, PublicKey, - RelaySettings, SignedPacket, + PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, RelaySettings, + SignedPacket, }; use tokio_util::task::AbortOnDropHandle; use url::Url; @@ -89,6 +92,73 @@ struct Inner { republish_delay: Duration, } +impl Inner { + async fn resolve_relay(&self, key: pkarr::PublicKey) -> Option> { + tracing::info!("resolving {} from relay {:?}", key.to_z32(), self.relay_url); + + let maybe_packet = self + .pkarr_relay + .as_ref() + .expect("checked") + .resolve(&key) + .await; + match maybe_packet { + Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) { + Ok(node_info) => { + let node_addr: NodeAddr = node_info.into(); + + tracing::info!("discovered node info from relay {:?}", node_addr); + Some(Ok(DiscoveryItem { + node_addr, + provenance: "relay", + last_updated: None, + })) + } + Err(_err) => { + tracing::debug!("failed to parse signed packet as node info"); + None + } + }, + Ok(None) => { + tracing::debug!("no signed packet found in relay"); + None + } + Err(err) => { + tracing::debug!("failed to get signed packet from relay: {}", err); + Some(Err(err.into())) + } + } + } + async fn resolve_dht(&self, key: pkarr::PublicKey) -> Option> { + tracing::info!("resolving {} from DHT", key.to_z32()); + + let maybe_packet = self.pkarr.resolve(&key).await; + match maybe_packet { + Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) { + Ok(node_info) => { + let node_addr: NodeAddr = node_info.into(); + tracing::info!("discovered node info from DHT {:?}", node_addr); + Some(Ok(DiscoveryItem { + node_addr, + provenance: "mainline", + last_updated: None, + })) + } + Err(_err) => { + tracing::debug!("failed to parse signed packet as node info"); + None + } + }, + Ok(None) => { + // nothing to do + tracing::debug!("no signed packet found in DHT"); + None + } + Err(err) => Some(Err(err.into())), + } + } +} + /// Builder for [`DhtDiscovery`]. /// /// By default, publishing to the DHT is enabled, and relay publishing is disabled. @@ -179,7 +249,7 @@ impl Builder { } /// Builds the discovery mechanism. - pub fn build(self) -> anyhow::Result { + pub fn build(self) -> Result { let pkarr = match self.client { Some(client) => client, None => PkarrClient::new(Default::default())?, @@ -278,90 +348,6 @@ impl DhtDiscovery { tokio::time::sleep(this.0.republish_delay).await; } } - - async fn resolve_relay( - &self, - pkarr_public_key: PublicKey, - co: &Co>, - ) { - let Some(relay) = &self.0.pkarr_relay else { - return; - }; - tracing::info!( - "resolving {} from relay {:?}", - pkarr_public_key.to_z32(), - self.0.relay_url - ); - let response = relay.resolve(&pkarr_public_key).await; - match response { - Ok(Some(signed_packet)) => { - if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) { - let node_addr: NodeAddr = node_info.into(); - - tracing::info!("discovered node info from relay {:?}", node_addr); - co.yield_(Ok(DiscoveryItem { - node_addr, - provenance: "relay", - last_updated: None, - })) - .await; - } else { - tracing::debug!("failed to parse signed packet as node info"); - } - } - Ok(None) => { - tracing::debug!("no signed packet found in relay"); - } - Err(e) => { - tracing::debug!("failed to get signed packet from relay: {}", e); - co.yield_(Err(e.into())).await; - } - } - } - - /// Resolves a node id from the DHT. - async fn resolve_dht( - &self, - pkarr_public_key: PublicKey, - co: &Co>, - ) { - if !self.0.dht { - return; - }; - tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32()); - let response = match self.0.pkarr.resolve(&pkarr_public_key).await { - Ok(r) => r, - Err(e) => { - co.yield_(Err(e.into())).await; - return; - } - }; - let Some(signed_packet) = response else { - tracing::debug!("no signed packet found in DHT"); - return; - }; - if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) { - let node_addr: NodeAddr = node_info.into(); - tracing::info!("discovered node info from DHT {:?}", node_addr); - co.yield_(Ok(DiscoveryItem { - node_addr, - provenance: "mainline", - last_updated: None, - })) - .await; - } else { - tracing::debug!("failed to parse signed packet as node info"); - } - } - - async fn gen_resolve(self, node_id: NodeId, co: Co>) { - let pkarr_public_key = - pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key"); - tokio::join!( - self.resolve_dht(pkarr_public_key.clone(), &co), - self.resolve_relay(pkarr_public_key, &co) - ); - } } impl Discovery for DhtDiscovery { @@ -395,11 +381,24 @@ impl Discovery for DhtDiscovery { _endpoint: Endpoint, node_id: NodeId, ) -> Option>> { - let this = self.clone(); let pkarr_public_key = pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key"); tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32()); - Some(Gen::new(|co| async move { this.gen_resolve(node_id, co).await }).boxed()) + + let mut stream = futures_buffered::FuturesUnorderedBounded::new(2); + if self.0.pkarr_relay.is_some() { + let key = pkarr_public_key.clone(); + let discovery = self.0.clone(); + stream.push(async move { discovery.resolve_relay(key).await }.boxed()); + } + + if self.0.dht { + let key = pkarr_public_key.clone(); + let discovery = self.0.clone(); + stream.push(async move { discovery.resolve_dht(key).await }.boxed()); + } + + Some(stream.filter_map(|t| t).boxed()) } }