Skip to content

Commit

Permalink
refactor(iroh): remove genawaiter usage from dht discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Dec 13, 2024
1 parent 218aad3 commit 8ef29c3
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 169 deletions.
75 changes: 0 additions & 75 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ iroh-metrics = { version = "0.29", default-features = false }
data-encoding = { version = "2.2", optional = true }
swarm-discovery = { version = "0.3.0-alpha.1", optional = true }

# dht_discovery
genawaiter = { version = "0.99", features = ["futures03"], optional = true }

# Examples
clap = { version = "4", features = ["derive"], optional = true }
tracing-subscriber = { version = "0.3", features = [
Expand Down Expand Up @@ -179,7 +176,7 @@ default = ["metrics", "discovery-pkarr-dht"]
metrics = ["iroh-metrics/metrics", "iroh-relay/metrics", "net-report/metrics", "portmapper/metrics"]
test-utils = ["iroh-relay/test-utils", "iroh-relay/server", "dep:axum"]
discovery-local-network = ["dep:data-encoding", "dep:swarm-discovery"]
discovery-pkarr-dht = ["pkarr/dht", "dep:genawaiter"]
discovery-pkarr-dht = ["pkarr/dht"]
examples = [
"dep:clap",
"dep:tracing-subscriber",
Expand Down
193 changes: 103 additions & 90 deletions iroh/src/discovery/pkarr/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use std::{
time::Duration,
};

use futures_lite::{stream::Boxed, StreamExt};
use genawaiter::sync::{Co, Gen};
use futures_lite::{
stream::{Boxed, StreamExt},
FutureExt,
};
use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey};
use pkarr::{
PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, PublicKey,
RelaySettings, SignedPacket,
PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, RelaySettings,
SignedPacket,
};
use tokio_util::task::AbortOnDropHandle;
use url::Url;
Expand Down Expand Up @@ -278,90 +280,6 @@ impl DhtDiscovery {
tokio::time::sleep(this.0.republish_delay).await;
}
}

async fn resolve_relay(
&self,
pkarr_public_key: PublicKey,
co: &Co<anyhow::Result<DiscoveryItem>>,
) {
let Some(relay) = &self.0.pkarr_relay else {
return;
};
tracing::info!(
"resolving {} from relay {:?}",
pkarr_public_key.to_z32(),
self.0.relay_url
);
let response = relay.resolve(&pkarr_public_key).await;
match response {
Ok(Some(signed_packet)) => {
if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
let node_addr: NodeAddr = node_info.into();

tracing::info!("discovered node info from relay {:?}", node_addr);
co.yield_(Ok(DiscoveryItem {
node_addr,
provenance: "relay",
last_updated: None,
}))
.await;
} else {
tracing::debug!("failed to parse signed packet as node info");
}
}
Ok(None) => {
tracing::debug!("no signed packet found in relay");
}
Err(e) => {
tracing::debug!("failed to get signed packet from relay: {}", e);
co.yield_(Err(e.into())).await;
}
}
}

/// Resolves a node id from the DHT.
async fn resolve_dht(
&self,
pkarr_public_key: PublicKey,
co: &Co<anyhow::Result<DiscoveryItem>>,
) {
if !self.0.dht {
return;
};
tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32());
let response = match self.0.pkarr.resolve(&pkarr_public_key).await {
Ok(r) => r,
Err(e) => {
co.yield_(Err(e.into())).await;
return;
}
};
let Some(signed_packet) = response else {
tracing::debug!("no signed packet found in DHT");
return;
};
if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
let node_addr: NodeAddr = node_info.into();
tracing::info!("discovered node info from DHT {:?}", node_addr);
co.yield_(Ok(DiscoveryItem {
node_addr,
provenance: "mainline",
last_updated: None,
}))
.await;
} else {
tracing::debug!("failed to parse signed packet as node info");
}
}

async fn gen_resolve(self, node_id: NodeId, co: Co<anyhow::Result<DiscoveryItem>>) {
let pkarr_public_key =
pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
tokio::join!(
self.resolve_dht(pkarr_public_key.clone(), &co),
self.resolve_relay(pkarr_public_key, &co)
);
}
}

impl Discovery for DhtDiscovery {
Expand Down Expand Up @@ -395,11 +313,106 @@ impl Discovery for DhtDiscovery {
_endpoint: Endpoint,
node_id: NodeId,
) -> Option<Boxed<anyhow::Result<DiscoveryItem>>> {
let this = self.clone();
let pkarr_public_key =
pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32());
Some(Gen::new(|co| async move { this.gen_resolve(node_id, co).await }).boxed())

let mut stream = futures_buffered::FuturesUnorderedBounded::new(2);
if self.0.pkarr_relay.is_some() {
tracing::info!(
"resolving {} from relay {:?}",
pkarr_public_key.to_z32(),
self.0.relay_url
);
let key = pkarr_public_key.clone();
let discovery = self.0.clone();
stream.push(
async move {
let maybe_packet = discovery
.pkarr_relay
.as_ref()
.expect("checked")
.resolve(&key)
.await;
match maybe_packet {
Ok(Some(signed_packet)) => {
match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(node_info) => {
let node_addr: NodeAddr = node_info.into();

tracing::info!(
"discovered node info from relay {:?}",
node_addr
);
Some(anyhow::Ok(DiscoveryItem {
node_addr,
provenance: "relay",
last_updated: None,
}))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as node info");
None
}
}
}
Ok(None) => {
tracing::debug!("no signed packet found in relay");
None
}
Err(err) => {
tracing::debug!("failed to get signed packet from relay: {}", err);
Some(Err(err.into()))
}
}
}
.boxed(),
);
}

if self.0.dht {
tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32());

let key = pkarr_public_key.clone();
let discovery = self.0.clone();
stream.push(
async move {
let maybe_packet = discovery.pkarr.resolve(&key).await;
match maybe_packet {
Ok(Some(signed_packet)) => {
match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(node_info) => {
let node_addr: NodeAddr = node_info.into();
tracing::info!("discovered node info from DHT {:?}", node_addr);
Some(Ok(DiscoveryItem {
node_addr,
provenance: "mainline",
last_updated: None,
}))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as node info");
None
}
}
}
Ok(None) => {
// nothing to do
tracing::debug!("no signed packet found in DHT");
None
}
Err(err) => Some(Err(err.into())),
}
}
.boxed(),
);
}

Some(stream.filter_map(|t| t).boxed())
// Some(Box::pin(ResolveStream {
// dht_resolve,
// relay_resolve,
// }))
}
}

Expand Down

0 comments on commit 8ef29c3

Please sign in to comment.