Skip to content

Commit

Permalink
feat: add recursive resolvers option
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Apr 24, 2024
1 parent 6571e65 commit d94a8ce
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pkarr/examples/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() -> Result<()> {

println!(
"\nRunning a resolver at {} ...\n",
client.loca_addr().unwrap()
client.local_addr().unwrap()
);

loop {}
Expand Down
6 changes: 3 additions & 3 deletions pkarr/src/async_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl AsyncPkarrClient {
/// Returns the local address of the udp socket this node is listening on.
///
/// Returns `None` if the node is shutdown
pub fn loca_addr(&self) -> Option<SocketAddr> {
pub fn local_addr(&self) -> Option<SocketAddr> {
self.0.address
}

Expand Down Expand Up @@ -154,11 +154,11 @@ mod tests {
.build()
.unwrap();

assert_ne!(a.loca_addr(), None);
assert_ne!(a.local_addr(), None);

a.shutdown().unwrap();

assert_eq!(a.loca_addr(), None);
assert_eq!(a.local_addr(), None);
}

futures::executor::block_on(test());
Expand Down
116 changes: 77 additions & 39 deletions pkarr/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
thread,
time::Instant,
};
use tracing::{debug, instrument};
use tracing::{debug, info, instrument};

use crate::{
cache::PkarrCache, Error, PublicKey, Result, SignedPacket, DEFAULT_MAXIMUM_TTL,
Expand All @@ -29,12 +29,35 @@ pub const DEFAULT_RESOLVERS: [&str; 1] = ["resolver.pkarr.org:7101"];

#[derive(Debug, Clone)]
pub struct Settings {
dht: DhtSettings,
resolver: bool,
resolvers: Vec<String>,
cache_size: NonZeroUsize,
minimum_ttl: u32,
maximum_ttl: u32,
pub dht: DhtSettings,
/// If set to `true`, run as a [resolver](https://pkarr.org/resolvers)s.
///
/// Defaults to `false`
pub resolver: bool,
/// Controls the Resolver behavior when querying the Dht on cache miss.
/// if set to `true`, it will query [Settings::resolvers] alongside the closest nodes.
///
/// If [Settings::resolver] is `false` (default), this will have no effect.
///
/// Defaults to `false`
pub recursive: bool,
/// A set of [resolver](https://pkarr.org/resolvers)s
/// to be queried alongside the Dht routing table, to
/// lower the latency on cold starts, and help if the
/// Dht is missing values not't republished often enough.
///
/// Defaults to [DEFAULT_RESOLVERS]
pub resolvers: Vec<String>,
/// Defaults to [DEFAULT_CACHE_SIZE]
pub cache_size: NonZeroUsize,
/// Used in the `min` parametere in [SignedPacket::ttl].
///
/// Defaults to [DEFAULT_MINIMUM_TTL]
pub minimum_ttl: u32,
/// Used in the `max` parametere in [SignedPacket::ttl].
///
/// Defaults to [DEFAULT_MAXIMUM_TTL]
pub maximum_ttl: u32,
}

impl Default for Settings {
Expand All @@ -43,6 +66,7 @@ impl Default for Settings {
dht: DhtSettings::default(),
cache_size: NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap(),
resolver: false,
recursive: false,
resolvers: DEFAULT_RESOLVERS.map(|s| s.to_string()).to_vec(),
minimum_ttl: DEFAULT_MINIMUM_TTL,
maximum_ttl: DEFAULT_MAXIMUM_TTL,
Expand All @@ -56,20 +80,30 @@ pub struct PkarrClientBuilder {
}

impl PkarrClientBuilder {
/// Run this node as a Resolver.
/// Set [Settings::resolver] to true.
///
/// Run this client as a [resolver](https://pkarr.org/resolvers)
pub fn resolver(mut self) -> Self {
self.settings.resolver = true;
self.settings.dht.read_only = false;
self
}

// Set custom set of resolvers nodes.
/// Set [Settings::recursive] to true.
///
/// Set to true to help your resolver leverage even bigger resolvers.
pub fn recursive(mut self) -> Self {
self.settings.recursive = true;
self
}

/// Set custom set of [resolvers](Settings::resolvers).
pub fn resolvers(mut self, resolvers: Vec<String>) -> Self {
self.settings.resolvers = resolvers;
self
}

/// Set the Dht bootstrapping nodes
/// Set the Dht bootstrapping nodes.
pub fn bootstrap(mut self, bootstrap: &[String]) -> Self {
self.settings.dht.bootstrap = Some(bootstrap.to_vec());
self
Expand All @@ -81,34 +115,26 @@ impl PkarrClientBuilder {
self
}

/// Set the [SignedPacket] cache size.
/// Set the [Settings::cache_size].
///
/// Defaults to [DEFAULT_CACHE_SIZE].
/// Controls the capacity of [PkarrCache].
pub fn cache_size(mut self, cache_size: NonZeroUsize) -> Self {
self.settings.cache_size = cache_size;
self
}

/// Set the minimum ttl for a cached [SignedPacket].
/// Defaults to [crate::DEFAULT_MINIMUM_TTL].
/// Set the [Settings::minimum_ttl] value.
///
/// Internally the cache will expire after the smallest ttl in
/// the resource records, unless it is smaller than this minimum.
///
/// If the `ttl` is bigger than the [Self::maximum_ttl], it will clamp it.
/// Limits how soon a [SignedPacket] is considered expired.
pub fn minimum_ttl(mut self, ttl: u32) -> Self {
self.settings.minimum_ttl = ttl;
self.settings.maximum_ttl = self.settings.maximum_ttl.clamp(ttl, u32::MAX);
self
}

/// Set the maximum ttl for a cached [SignedPacket].
/// Defaults to [crate::DEFAULT_MAXIMUM_TTL].
///
/// Internally the cache will expire after the smallest ttl in
/// the resource records, unless it is bigger than this maximum.
/// Set the [Settings::maximum_ttl] value.
///
/// If the `ttl` is smaller than the [Self::minimum_ttl], it will clamp it.
/// Limits how long it takes before a [SignedPacket] is considered expired.
pub fn maximum_ttl(mut self, ttl: u32) -> Self {
self.settings.maximum_ttl = ttl;
self.settings.minimum_ttl = self.settings.minimum_ttl.clamp(0, ttl);
Expand All @@ -121,7 +147,7 @@ impl PkarrClientBuilder {
}

#[derive(Clone, Debug)]
/// Main client for publishing and resolving [SignedPacket]s.
/// Native Pkarr client for publishing and resolving [SignedPacket]s.
pub struct PkarrClient {
pub(crate) address: Option<SocketAddr>,
pub(crate) sender: Sender<ActorMessage>,
Expand Down Expand Up @@ -157,15 +183,17 @@ impl PkarrClient {
rpc = rpc.with_port(port)?;
}

let loca_addr = rpc.local_addr();
let local_addr = rpc.local_addr();

info!(?local_addr, ?settings, "Running PkarrClient");

let cache = PkarrCache::new(settings.cache_size);

let moved_cache = cache.clone();
thread::spawn(move || run(rpc, moved_cache, settings.dht, resolvers, receiver));
thread::spawn(move || run(rpc, moved_cache, settings.recursive, resolvers, receiver));

let client = PkarrClient {
address: Some(loca_addr),
address: Some(local_addr),
sender,
cache,
minimum_ttl: settings.minimum_ttl,
Expand All @@ -184,7 +212,7 @@ impl PkarrClient {
/// Returns the local address of the udp socket this node is listening on.
///
/// Returns `None` if the node is shutdown
pub fn loca_addr(&self) -> Option<SocketAddr> {
pub fn local_addr(&self) -> Option<SocketAddr> {
self.address
}

Expand Down Expand Up @@ -230,10 +258,12 @@ impl PkarrClient {
}
}

/// Returns the first valid [SignedPacket] available from cache, or the Dht.
/// Returns a [SignedPacket] from cache if it is not expired, otherwise,
/// it will query the Dht, and return the first valid response, which may
/// or may not be expired itself.
///
/// If the Dht was called, in the background, it continues receiving responses
/// and updating the cache.
/// and updating the cache with any more recent valid packets it receives.
///
/// # Errors
/// - Returns a [Error::DhtIsShutdown] if [PkarrClient::shutdown] was called, or
Expand Down Expand Up @@ -295,7 +325,7 @@ impl PkarrClient {
fn run(
mut rpc: Rpc,
cache: PkarrCache,
_settings: DhtSettings,
recursive: bool,
resolvers: Vec<SocketAddr>,
receiver: Receiver<ActorMessage>,
) {
Expand Down Expand Up @@ -421,16 +451,19 @@ fn run(
from,
message: ReceivedMessage::Request((transaction_id, request)),
}) => {
// We shouldn't reach this if settings.resolvers is not set,
// because that sets [DhtSettings::server].
// === Resolver Logic ===

// We shouldn't reach this if [Settings.resolver] is not set to true,
// because that sets [DhtSettings::server].
if let RequestSpecific {
request_type:
RequestTypeSpecific::GetValue(GetValueRequestArguments { target, .. }),
..
} = request
{
if let Some(cached) = cache.get(target) {
// We don't care about expiry, if the client doesn't want it, they
// can discard it. But it is useful to return whatever we have.
let mutable_item = MutableItem::from(&cached);
debug!(?target, "Resolver: cache hit responding with packet!");

Expand All @@ -439,6 +472,9 @@ fn run(
*transaction_id,
ResponseSpecific::GetMutable(messages::GetMutableResponseArguments {
responder_id: *rpc.id(),
// Token doesn't matter much, as we are most likely _not_ the
// closest nodes, so we shouldn't expect an PUT requests based on
// this response.
token: vec![0, 0, 0, 0],
nodes: None,
v: mutable_item.value().to_vec(),
Expand All @@ -457,9 +493,11 @@ fn run(
salt: None,
}),
None,
// Don't call resolvers recursively, that is a bit much to
// be honest.
None,
if recursive {
Some(resolvers.clone())
} else {
None
},
);
}
};
Expand Down Expand Up @@ -493,11 +531,11 @@ mod tests {
.build()
.unwrap();

assert_ne!(a.loca_addr(), None);
assert_ne!(a.local_addr(), None);

a.shutdown().unwrap();

assert_eq!(a.loca_addr(), None);
assert_eq!(a.local_addr(), None);
}

#[test]
Expand Down
14 changes: 6 additions & 8 deletions pkarr/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#![doc = include_str!("../README.md")]

// TODO: examine errors (failed to publish, failed to bind socket, unused errors...)
// TODO: logs (info for binding, debug for steps)
// TODO: HTTP relay should return some caching headers.
// TODO: add server settings to mainline DhtSettings
// TODO: better documentation especially resolvers.
// TODO: HTTP relay should return some caching headers.
// TODO: add support for wasm using relays.
// TODO: allow custom Cache with traits.

Expand All @@ -17,12 +15,12 @@ pub use simple_dns as dns;
#[cfg(feature = "async")]
pub mod async_client;
#[cfg(feature = "dht")]
mod cache;
pub mod cache;
#[cfg(feature = "dht")]
mod client;
mod error;
mod keys;
mod signed_packet;
pub mod client;
pub mod error;
pub mod keys;
pub mod signed_packet;

// Exports
#[cfg(feature = "dht")]
Expand Down

0 comments on commit d94a8ce

Please sign in to comment.