Skip to content

Commit

Permalink
refactor(iroh): remove genawaiter usage from dht discovery (#3048)
Browse files Browse the repository at this point in the history
## Description

One less dependency in the core of iroh

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [ ] Tests if relevant.
- [ ] All breaking changes documented.
  • Loading branch information
dignifiedquire authored Dec 16, 2024
1 parent 4a774f1 commit 738c773
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 171 deletions.
75 changes: 0 additions & 75 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]

[advisories]
ignore = [
"RUSTSEC-2024-0370", # unmaintained, no upgrade available
"RUSTSEC-2024-0384", # unmaintained, no upgrade available
]
5 changes: 1 addition & 4 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ iroh-metrics = { version = "0.29", default-features = false }
# local-swarm-discovery
swarm-discovery = { version = "0.3.0-alpha.1", optional = true }

# dht_discovery
genawaiter = { version = "0.99", features = ["futures03"], optional = true }

# Examples
clap = { version = "4", features = ["derive"], optional = true }
tracing-subscriber = { version = "0.3", features = [
Expand Down Expand Up @@ -179,7 +176,7 @@ default = ["metrics", "discovery-pkarr-dht"]
metrics = ["iroh-metrics/metrics", "iroh-relay/metrics", "net-report/metrics", "portmapper/metrics"]
test-utils = ["iroh-relay/test-utils", "iroh-relay/server", "dep:axum"]
discovery-local-network = ["dep:swarm-discovery"]
discovery-pkarr-dht = ["pkarr/dht", "dep:genawaiter"]
discovery-pkarr-dht = ["pkarr/dht"]
examples = [
"dep:clap",
"dep:tracing-subscriber",
Expand Down
181 changes: 90 additions & 91 deletions iroh/src/discovery/pkarr/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ use std::{
time::Duration,
};

use futures_lite::{stream::Boxed, StreamExt};
use genawaiter::sync::{Co, Gen};
use anyhow::Result;
use futures_lite::{
stream::{Boxed, StreamExt},
FutureExt,
};
use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey};
use pkarr::{
PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, PublicKey,
RelaySettings, SignedPacket,
PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, RelaySettings,
SignedPacket,
};
use tokio_util::task::AbortOnDropHandle;
use url::Url;
Expand Down Expand Up @@ -89,6 +92,73 @@ struct Inner {
republish_delay: Duration,
}

impl Inner {
async fn resolve_relay(&self, key: pkarr::PublicKey) -> Option<Result<DiscoveryItem>> {
tracing::info!("resolving {} from relay {:?}", key.to_z32(), self.relay_url);

let maybe_packet = self
.pkarr_relay
.as_ref()
.expect("checked")
.resolve(&key)
.await;
match maybe_packet {
Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(node_info) => {
let node_addr: NodeAddr = node_info.into();

tracing::info!("discovered node info from relay {:?}", node_addr);
Some(Ok(DiscoveryItem {
node_addr,
provenance: "relay",
last_updated: None,
}))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as node info");
None
}
},
Ok(None) => {
tracing::debug!("no signed packet found in relay");
None
}
Err(err) => {
tracing::debug!("failed to get signed packet from relay: {}", err);
Some(Err(err.into()))
}
}
}
async fn resolve_dht(&self, key: pkarr::PublicKey) -> Option<Result<DiscoveryItem>> {
tracing::info!("resolving {} from DHT", key.to_z32());

let maybe_packet = self.pkarr.resolve(&key).await;
match maybe_packet {
Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(node_info) => {
let node_addr: NodeAddr = node_info.into();
tracing::info!("discovered node info from DHT {:?}", node_addr);
Some(Ok(DiscoveryItem {
node_addr,
provenance: "mainline",
last_updated: None,
}))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as node info");
None
}
},
Ok(None) => {
// nothing to do
tracing::debug!("no signed packet found in DHT");
None
}
Err(err) => Some(Err(err.into())),
}
}
}

/// Builder for [`DhtDiscovery`].
///
/// By default, publishing to the DHT is enabled, and relay publishing is disabled.
Expand Down Expand Up @@ -179,7 +249,7 @@ impl Builder {
}

/// Builds the discovery mechanism.
pub fn build(self) -> anyhow::Result<DhtDiscovery> {
pub fn build(self) -> Result<DhtDiscovery> {
let pkarr = match self.client {
Some(client) => client,
None => PkarrClient::new(Default::default())?,
Expand Down Expand Up @@ -278,90 +348,6 @@ impl DhtDiscovery {
tokio::time::sleep(this.0.republish_delay).await;
}
}

async fn resolve_relay(
&self,
pkarr_public_key: PublicKey,
co: &Co<anyhow::Result<DiscoveryItem>>,
) {
let Some(relay) = &self.0.pkarr_relay else {
return;
};
tracing::info!(
"resolving {} from relay {:?}",
pkarr_public_key.to_z32(),
self.0.relay_url
);
let response = relay.resolve(&pkarr_public_key).await;
match response {
Ok(Some(signed_packet)) => {
if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
let node_addr: NodeAddr = node_info.into();

tracing::info!("discovered node info from relay {:?}", node_addr);
co.yield_(Ok(DiscoveryItem {
node_addr,
provenance: "relay",
last_updated: None,
}))
.await;
} else {
tracing::debug!("failed to parse signed packet as node info");
}
}
Ok(None) => {
tracing::debug!("no signed packet found in relay");
}
Err(e) => {
tracing::debug!("failed to get signed packet from relay: {}", e);
co.yield_(Err(e.into())).await;
}
}
}

/// Resolves a node id from the DHT.
async fn resolve_dht(
&self,
pkarr_public_key: PublicKey,
co: &Co<anyhow::Result<DiscoveryItem>>,
) {
if !self.0.dht {
return;
};
tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32());
let response = match self.0.pkarr.resolve(&pkarr_public_key).await {
Ok(r) => r,
Err(e) => {
co.yield_(Err(e.into())).await;
return;
}
};
let Some(signed_packet) = response else {
tracing::debug!("no signed packet found in DHT");
return;
};
if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
let node_addr: NodeAddr = node_info.into();
tracing::info!("discovered node info from DHT {:?}", node_addr);
co.yield_(Ok(DiscoveryItem {
node_addr,
provenance: "mainline",
last_updated: None,
}))
.await;
} else {
tracing::debug!("failed to parse signed packet as node info");
}
}

async fn gen_resolve(self, node_id: NodeId, co: Co<anyhow::Result<DiscoveryItem>>) {
let pkarr_public_key =
pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
tokio::join!(
self.resolve_dht(pkarr_public_key.clone(), &co),
self.resolve_relay(pkarr_public_key, &co)
);
}
}

impl Discovery for DhtDiscovery {
Expand Down Expand Up @@ -395,11 +381,24 @@ impl Discovery for DhtDiscovery {
_endpoint: Endpoint,
node_id: NodeId,
) -> Option<Boxed<anyhow::Result<DiscoveryItem>>> {
let this = self.clone();
let pkarr_public_key =
pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32());
Some(Gen::new(|co| async move { this.gen_resolve(node_id, co).await }).boxed())

let mut stream = futures_buffered::FuturesUnorderedBounded::new(2);
if self.0.pkarr_relay.is_some() {
let key = pkarr_public_key.clone();
let discovery = self.0.clone();
stream.push(async move { discovery.resolve_relay(key).await }.boxed());
}

if self.0.dht {
let key = pkarr_public_key.clone();
let discovery = self.0.clone();
stream.push(async move { discovery.resolve_dht(key).await }.boxed());
}

Some(stream.filter_map(|t| t).boxed())
}
}

Expand Down

0 comments on commit 738c773

Please sign in to comment.