From c1c69c4ee32549c0bb312104e7607734d4352a7d Mon Sep 17 00:00:00 2001 From: nazeh Date: Tue, 30 Apr 2024 15:30:02 +0300 Subject: [PATCH] feat: change SignedPacket::last_seen to be u64 instead of instant for easier serialization --- Cargo.lock | 19 ++------ pkarr/src/client.rs | 3 +- pkarr/src/lib.rs | 1 - pkarr/src/signed_packet.rs | 51 ++++++++++++--------- server/Cargo.toml | 3 +- server/src/cache.rs | 93 ++++++++++++++++---------------------- server/src/main.rs | 4 +- 7 files changed, 76 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26e3760..0b4a6a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -208,15 +208,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bitflags" version = "2.5.0" @@ -705,7 +696,6 @@ dependencies = [ "lmdb-master-sys", "once_cell", "page_size", - "serde", "synchronoise", "url", ] @@ -722,11 +712,8 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3cb0d6ba3700c9a57e83c013693e3eddb68a6d9b6781cacafc62a0d992e8ddb3" dependencies = [ - "bincode", "byteorder", "heed-traits", - "serde", - "serde_json", ] [[package]] @@ -899,8 +886,9 @@ checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" [[package]] name = "mainline" -version = "1.5.0" -source = "git+https://github.com/Nuhvi/mainline.git?branch=v2#97d21073816223d60278dc7b967c8dc4eabd5c70" +version = "2.0.0-rc1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "243e63833b8d5a333f0a2d384eb6eff4f5795ad9a68b1b2b731573d969011014" dependencies = [ "bytes", "crc", @@ -1172,6 +1160,7 @@ dependencies = [ "anyhow", "axum", "axum-server", + "byteorder", "bytes", "governor", "heed", diff --git a/pkarr/src/client.rs b/pkarr/src/client.rs index 84d270f..cb5a3b9 100644 --- a/pkarr/src/client.rs +++ b/pkarr/src/client.rs @@ -16,7 +16,6 @@ use std::{ net::{SocketAddr, ToSocketAddrs}, num::NonZeroUsize, thread, - time::Instant, }; use tracing::{debug, info, instrument, trace}; @@ -438,7 +437,7 @@ fn run( if (*seq as u64) == cached.timestamp() { trace!("Remote node has the a packet with same timestamp, refreshing cached packet."); - cached.set_last_seen(&Instant::now()); + cached.refresh(); cache.put(target, &cached); // Send the found sequence as a timestamp to the caller to decide what to do diff --git a/pkarr/src/lib.rs b/pkarr/src/lib.rs index e2e017d..21756de 100644 --- a/pkarr/src/lib.rs +++ b/pkarr/src/lib.rs @@ -1,7 +1,6 @@ #![doc = include_str!("../README.md")] // TODO: add support for wasm using relays. -// TODO: allow custom Cache with traits. // Rexports pub use bytes; diff --git a/pkarr/src/signed_packet.rs b/pkarr/src/signed_packet.rs index 298a2ea..f95a17e 100644 --- a/pkarr/src/signed_packet.rs +++ b/pkarr/src/signed_packet.rs @@ -14,7 +14,7 @@ use std::{ char, fmt::{self, Display, Formatter}, net::{Ipv4Addr, Ipv6Addr}, - time::{Duration, Instant, SystemTime}, + time::SystemTime, }; const DOT: char = '.'; @@ -61,7 +61,7 @@ impl Inner { /// Signed DNS packet pub struct SignedPacket { inner: Inner, - last_seen: Instant, + last_seen: u64, } impl SignedPacket { @@ -101,13 +101,13 @@ impl SignedPacket { Ok(SignedPacket { inner: Inner::try_from_bytes(bytes)?, - last_seen: Instant::now(), + last_seen: system_time(), }) } /// Useful for cloning a [SignedPacket], or cerating one from a previously checked bytes, /// like ones stored on disk or in a database. - pub fn from_bytes_unchecked(bytes: &Bytes, last_seen: Instant) -> SignedPacket { + pub fn from_bytes_unchecked(bytes: &Bytes, last_seen: u64) -> SignedPacket { SignedPacket { inner: Inner::try_from_bytes(bytes).unwrap(), last_seen, @@ -171,7 +171,7 @@ impl SignedPacket { return Err(Error::PacketTooLarge(encoded_packet.len())); } - let timestamp = system_time().as_micros() as u64; + let timestamp = system_time(); let signature = keypair.sign(&signable(timestamp, &encoded_packet)); @@ -182,7 +182,7 @@ impl SignedPacket { timestamp, &encoded_packet, )?, - last_seen: Instant::now(), + last_seen: system_time(), }) } @@ -234,19 +234,25 @@ impl SignedPacket { self.inner.borrow_dependent() } - pub fn last_seen(&self) -> &Instant { + /// Unix last_seen time in microseconds + pub fn last_seen(&self) -> &u64 { &self.last_seen } // === Setters === /// Set the [Self::last_seen] property - pub fn set_last_seen(&mut self, last_seen: &Instant) { + pub fn set_last_seen(&mut self, last_seen: &u64) { self.last_seen = *last_seen; } // === Public Methods === + /// Set the [Self::last_seen] to the current system time + pub fn refresh(&mut self) { + self.last_seen = system_time(); + } + /// Return whether this [SignedPacket] is more recent than the given one. /// If the timestamps are erqual, the one with the largest value is considered more recent. /// Usefel for determining which packet contains the latest information from the Dht. @@ -285,12 +291,9 @@ impl SignedPacket { let origin = self.public_key().to_z32(); let normalized_name = normalize_name(&origin, name.to_string()); - let elapsed = self.last_seen().elapsed().as_secs() as u32; - - self.packet() - .answers - .iter() - .filter(move |rr| rr.name == Name::new(&normalized_name).unwrap() && rr.ttl > elapsed) + self.packet().answers.iter().filter(move |rr| { + rr.name == Name::new(&normalized_name).unwrap() && rr.ttl > self.elapsed() + }) } /// calculates the remaining seconds by comparing the [Self::ttl] (clamped by `min` and `max`) @@ -300,10 +303,7 @@ impl SignedPacket { /// /// Panics if `min` < `max` pub fn expires_in(&self, min: u32, max: u32) -> u32 { - match self - .ttl(min, max) - .overflowing_sub(self.last_seen.elapsed().as_secs() as u32) - { + match self.ttl(min, max).overflowing_sub(self.elapsed()) { (_, true) => 0, (ttl, false) => ttl, } @@ -323,6 +323,13 @@ impl SignedPacket { .min() .map_or(min, |v| v.clamp(min, max)) } + + // === Private Methods === + + /// Time since the [Self::last_seen] in seconds + fn elapsed(&self) -> u32 { + ((system_time() - self.last_seen) / 1_000_000) as u32 + } } fn signable(timestamp: u64, v: &Bytes) -> Bytes { @@ -331,10 +338,12 @@ fn signable(timestamp: u64, v: &Bytes) -> Bytes { signable.into() } -fn system_time() -> Duration { +/// Return the number of microseconds since [SystemTime::UNIX_EPOCH] +pub fn system_time() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("time drift") + .as_micros() as u64 } #[cfg(feature = "dht")] @@ -364,7 +373,7 @@ impl TryFrom<&MutableItem> for SignedPacket { Ok(Self { inner: Inner::try_from_parts(&public_key, &signature, seq, i.value())?, - last_seen: Instant::now(), + last_seen: system_time(), }) } } @@ -389,7 +398,7 @@ impl Display for SignedPacket { f, "SignedPacket ({}):\n last_seen: {} seconds ago\n timestamp: {},\n signature: {}\n records:\n", &self.public_key(), - &self.last_seen().elapsed().as_secs(), + &self.elapsed(), &self.timestamp(), &self.signature(), )?; diff --git a/server/Cargo.toml b/server/Cargo.toml index 6c2b1d9..ccefc51 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -19,4 +19,5 @@ thiserror = "1.0.49" bytes = "1.6.0" tower_governor = "0.3.2" governor = "0.6.3" -heed = "0.20.0" +heed = { version = "0.20.0", default-features = false } +byteorder = "1.5.0" diff --git a/server/src/cache.rs b/server/src/cache.rs index 6d917f3..ca72351 100644 --- a/server/src/cache.rs +++ b/server/src/cache.rs @@ -1,17 +1,12 @@ -use std::{ - borrow::Cow, - path::Path, - time::{Duration, Instant, SystemTime}, -}; +use std::{borrow::Cow, path::Path}; -use governor::clock::Reference; use pkarr::{ cache::{PkarrCache, PkarrCacheKey}, - SignedPacket, + signed_packet::{system_time, SignedPacket}, }; -use heed::{BoxedError, BytesDecode, BytesEncode, Env}; -use heed::{Database, EnvOpenOptions}; +use byteorder::LittleEndian; +use heed::{types::U64, BoxedError, BytesDecode, BytesEncode, Database, Env, EnvOpenOptions}; use anyhow::Result; use tracing::debug; @@ -21,8 +16,8 @@ const PKARR_CACHE_TABLE_NAME_KEY_TO_TIME: &str = "pkarrcache:key_to_time"; const PKARR_CACHE_TABLE_NAME_TIME_TO_KEY: &str = "pkarrcache:time_to_key"; type PkarrCacheSignedPacketsTable = Database; -type PkarrCacheKeyToTimeTable = Database; -type PkarrCacheTimeToKeyTable = Database; +type PkarrCacheKeyToTimeTable = Database>; +type PkarrCacheTimeToKeyTable = Database, PkarrCacheKeyCodec>; pub struct PkarrCacheKeyCodec; @@ -34,44 +29,11 @@ impl<'a> BytesEncode<'a> for PkarrCacheKeyCodec { } } -pub struct InstantCodec; - -impl<'a> BytesEncode<'a> for InstantCodec { - type EItem = Instant; - - fn bytes_encode(instant: &Self::EItem) -> Result, BoxedError> { - let system_now = SystemTime::now(); - let instant_now = Instant::now(); - let approx = system_now - (instant_now - *instant); - - let vec = (approx - .duration_since(SystemTime::UNIX_EPOCH) - .expect("time drift") - // Casting as u64 is safe for ~500_000 years. - .as_micros() as u64) - .to_be_bytes() - .to_vec(); - - Ok(Cow::Owned(vec)) - } -} - -impl<'a> BytesDecode<'a> for InstantCodec { - type DItem = Instant; +impl<'a> BytesDecode<'a> for PkarrCacheKeyCodec { + type DItem = PkarrCacheKey; fn bytes_decode(bytes: &'a [u8]) -> Result { - let mut first_8_bytes: [u8; 8] = [0; 8]; - first_8_bytes.copy_from_slice(&bytes[0..8]); - let micros = u64::from_be_bytes(first_8_bytes); - - let duration_since_unix = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("time drift"); - let encoded_duration = Duration::from_micros(micros); - - let instant = Instant::now() - duration_since_unix + encoded_duration; - - Ok(instant) + Ok(PkarrCacheKey::from_bytes(bytes)?) } } @@ -81,12 +43,11 @@ impl<'a> BytesEncode<'a> for SignedPacketCodec { type EItem = SignedPacket; fn bytes_encode(signed_packet: &Self::EItem) -> Result, BoxedError> { - let instant = signed_packet.last_seen(); let bytes = signed_packet.as_bytes(); let mut vec = Vec::with_capacity(bytes.len() + 8); - vec.extend(InstantCodec::bytes_encode(instant)?.as_ref()); + vec.extend(>::bytes_encode(signed_packet.last_seen())?.as_ref()); vec.extend(bytes); Ok(Cow::Owned(vec)) @@ -97,7 +58,7 @@ impl<'a> BytesDecode<'a> for SignedPacketCodec { type DItem = SignedPacket; fn bytes_decode(bytes: &'a [u8]) -> Result { - let last_seen = InstantCodec::bytes_decode(&bytes)?; + let last_seen = >::bytes_decode(&bytes)?; Ok(SignedPacket::from_bytes_unchecked( &bytes[8..].to_vec().into(), @@ -124,9 +85,9 @@ impl HeedPkarrCache { let mut wtxn = env.write_txn()?; let _: PkarrCacheSignedPacketsTable = env.create_database(&mut wtxn, Some(PKARR_CACHE_TABLE_NAME_SIGNED_PACKET))?; - let _: Database = + let _: PkarrCacheKeyToTimeTable = env.create_database(&mut wtxn, Some(PKARR_CACHE_TABLE_NAME_KEY_TO_TIME))?; - let _: Database = + let _: PkarrCacheTimeToKeyTable = env.create_database(&mut wtxn, Some(PKARR_CACHE_TABLE_NAME_TIME_TO_KEY))?; wtxn.commit()?; @@ -145,7 +106,10 @@ impl HeedPkarrCache { Ok(db.len(&rtxn)? as usize) } pub fn internal_put(&self, key: &PkarrCacheKey, signed_packet: &SignedPacket) -> Result<()> { - // TODO: delete with capactiy + if self.capacity == 0 { + return Ok(()); + } + let mut wtxn = self.env.write_txn()?; let packets: PkarrCacheSignedPacketsTable = self @@ -163,11 +127,27 @@ impl HeedPkarrCache { .open_database(&wtxn, Some(PKARR_CACHE_TABLE_NAME_TIME_TO_KEY))? .unwrap(); + let len = packets.len(&wtxn)? as usize; + + if len >= self.capacity { + debug!(?len, ?self.capacity, "Reached cache capacity, deleting extra item."); + + let mut iter = time_to_key.rev_iter(&mut wtxn)?; + + if let Some((time, key)) = iter.next().transpose()? { + drop(iter); + + time_to_key.delete(&mut wtxn, &time)?; + key_to_time.delete(&mut wtxn, &key)?; + packets.delete(&mut wtxn, &key)?; + }; + } + if let Some(old_time) = key_to_time.get(&wtxn, &key)? { time_to_key.delete(&mut wtxn, &old_time)?; } - let new_time = Instant::now(); + let new_time = system_time(); time_to_key.put(&mut wtxn, &new_time, &key)?; key_to_time.put(&mut wtxn, &key, &new_time)?; @@ -180,6 +160,9 @@ impl HeedPkarrCache { } pub fn internal_get(&self, key: &PkarrCacheKey) -> Result> { + // TODO: Optimize to use read transaction most of the time, and batch + // updating access times every few seconds. + let mut wtxn = self.env.write_txn()?; let packets: PkarrCacheSignedPacketsTable = self @@ -201,7 +184,7 @@ impl HeedPkarrCache { time_to_key.delete(&mut wtxn, &time)?; }; - let new_time = Instant::now(); + let new_time = system_time(); time_to_key.put(&mut wtxn, &new_time, key)?; key_to_time.put(&mut wtxn, &key, &new_time)?; diff --git a/server/src/main.rs b/server/src/main.rs index 717ebd7..2188a44 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -17,8 +17,6 @@ use server::HttpServer; #[tokio::main] async fn main() -> Result<()> { - // TODO logs with tower - tracing_subscriber::fmt() .with_max_level(Level::DEBUG) .with_env_filter("pkarr=debug") @@ -32,7 +30,7 @@ async fn main() -> Result<()> { let env_path = Path::new(dir_path).join("../storage/pkarr-server/pkarr-cache"); fs::create_dir_all(&env_path)?; - let cache = Box::new(HeedPkarrCache::new(&env_path, 10).unwrap()); + let cache = Box::new(HeedPkarrCache::new(&env_path, 1).unwrap()); let client = PkarrClient::builder() .port(6881)