Skip to content

Commit

Permalink
feat(pkarr): add wasm support for relay::Client
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Sep 21, 2024
1 parent 848c96c commit 3a607ae
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkarr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rand = { version = "0.8.5", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3.69"
futures = "0.3.30"

[dev-dependencies]
postcard = { version = "1.0.10", features = ["alloc"] }
Expand Down
88 changes: 62 additions & 26 deletions pkarr/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use std::num::NonZeroUsize;
use reqwest::{Response, StatusCode};
use tracing::debug;

#[cfg(target_arch = "wasm32")]
use futures::future::select_ok;

#[cfg(not(target_arch = "wasm32"))]
use tokio::task::JoinSet;

Expand Down Expand Up @@ -182,7 +185,7 @@ impl Client {
let signed_packet = signed_packet.clone();
let this = self.clone();

futures.spawn(async move { this.publish_to_relay(relay, signed_packet).await });
futures.spawn(async move { this.publish_to_relay(&relay, signed_packet).await });
}

let mut last_error = Error::EmptyListOfRelays;
Expand Down Expand Up @@ -213,7 +216,7 @@ impl Client {
let cached = cached_packet.clone();
let this = self.clone();

futures.spawn(async move { this.resolve_from_relay(relay, public_key, cached).await });
futures.spawn(async move { this.resolve_from_relay(&relay, public_key, cached).await });
}

let mut result: Result<Option<SignedPacket>> = Ok(None);
Expand All @@ -237,13 +240,56 @@ impl Client {
result
}

// === Private Methods ===
// === Wasm ===

#[cfg(target_arch = "wasm32")]
async fn race_publish(&self, signed_packet: &SignedPacket) -> Result<()> {
let futures = self.relays.iter().map(|relay| {
let signed_packet = signed_packet.clone();
let this = self.clone();

Box::pin(async move { this.publish_to_relay(relay, signed_packet).await })
});

match select_ok(futures).await {
Ok((_, _)) => Ok(()),
Err(e) => Err(e),
}
}

async fn publish_to_relay(
#[cfg(target_arch = "wasm32")]
async fn race_resolve(
&self,
relay: String,
signed_packet: SignedPacket,
) -> Result<Response> {
public_key: &PublicKey,
cached_packet: Option<SignedPacket>,
) -> Result<Option<SignedPacket>> {
let futures = self.relays.iter().map(|relay| {
let public_key = public_key.clone();
let cached = cached_packet.clone();
let this = self.clone();

Box::pin(async move { this.resolve_from_relay(relay, public_key, cached).await })
});

let mut result: Result<Option<SignedPacket>> = Ok(None);

match select_ok(futures).await {
Ok((Some(signed_packet), _)) => {
self.cache
.put(&signed_packet.public_key().as_ref().into(), &signed_packet);

return Ok(Some(signed_packet));
}
Err(error) => result = Err(error),
Ok(_) => {}
}

result
}

// === Private Methods ===

async fn publish_to_relay(&self, relay: &str, signed_packet: SignedPacket) -> Result<Response> {
let url = format!("{relay}/{}", signed_packet.public_key());

self.http_client
Expand Down Expand Up @@ -280,22 +326,27 @@ impl Client {

async fn resolve_from_relay(
&self,
relay: String,
relay: &str,
public_key: PublicKey,
cached_packet: Option<SignedPacket>,
) -> Result<Option<SignedPacket>> {
let url = format!("{relay}/{public_key}");

match self.http_client.get(&url).send().await {
Ok(mut response) => {
Ok(response) => {
if response.status() == StatusCode::NOT_FOUND {
debug!(?url, "SignedPacket not found");
return Ok(None);
}
if response.content_length().unwrap_or_default() > SignedPacket::MAX_BYTES {
debug!(?url, "Response too large");

let payload = read(&mut response).await;
return Ok(None);
}

match SignedPacket::from_relay_payload(&public_key, &payload.into()) {
let payload = response.bytes().await?;

match SignedPacket::from_relay_payload(&public_key, &payload) {
Ok(signed_packet) => Ok(choose_most_recent(signed_packet, cached_packet)),
Err(error) => {
debug!(?url, ?error, "Invalid signed_packet");
Expand All @@ -313,21 +364,6 @@ impl Client {
}
}

#[cfg(not(target_arch = "wasm32"))]
async fn read(response: &mut Response) -> Vec<u8> {
let mut total_size = 0;
let mut payload = Vec::new();

while let Ok(Some(chunk)) = response.chunk().await {
total_size += chunk.len();
payload.extend_from_slice(&chunk);
if total_size >= SignedPacket::MAX_BYTES {
break;
}
}
payload
}

fn choose_most_recent(
signed_packet: SignedPacket,
cached_packet: Option<SignedPacket>,
Expand Down
4 changes: 2 additions & 2 deletions pkarr/src/signed_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub struct SignedPacket {
}

impl SignedPacket {
pub const MAX_BYTES: usize = 1104;
pub const MAX_BYTES: u64 = 1104;

/// Creates a [Self] from the serialized representation:
/// `<32 bytes public_key><64 bytes signature><8 bytes big-endian timestamp in microseconds><encoded DNS packet>`
Expand All @@ -90,7 +90,7 @@ impl SignedPacket {
if bytes.len() < 104 {
return Err(Error::InvalidSignedPacketBytesLength(bytes.len()));
}
if bytes.len() > SignedPacket::MAX_BYTES {
if (bytes.len() as u64) > SignedPacket::MAX_BYTES {
return Err(Error::PacketTooLarge(bytes.len()));
}
let public_key = PublicKey::try_from(&bytes[..32])?;
Expand Down

0 comments on commit 3a607ae

Please sign in to comment.