From ccea806a4ae10ba978a14a9cec76c38041b293d2 Mon Sep 17 00:00:00 2001 From: Emil Sayahi <97276123+emmyoh@users.noreply.github.com> Date: Mon, 25 Nov 2024 14:42:55 -0500 Subject: [PATCH] feat: Announcing immutable replicas --- src/database/core.rs | 39 ++++ src/database/dht.rs | 134 ++++++++++++++ src/database/mod.rs | 8 + src/{database.rs => database/posts.rs} | 235 +------------------------ src/database/users.rs | 217 +++++++++++++++++++++++ src/discovery.rs | 129 ++++++++++++-- src/fs/net.rs | 11 +- src/fs/replica.rs | 6 + 8 files changed, 534 insertions(+), 245 deletions(-) create mode 100644 src/database/core.rs create mode 100644 src/database/dht.rs create mode 100644 src/database/mod.rs rename src/{database.rs => database/posts.rs} (65%) create mode 100644 src/database/users.rs diff --git a/src/database/core.rs b/src/database/core.rs new file mode 100644 index 0000000..9751970 --- /dev/null +++ b/src/database/core.rs @@ -0,0 +1,39 @@ +use super::dht::*; +use super::posts::*; +use super::users::*; +use crate::fs::FS_PATH; +use miette::IntoDiagnostic; +use native_db::*; +use std::{path::PathBuf, sync::LazyLock}; + +pub(crate) static DATABASE_PATH: LazyLock = + LazyLock::new(|| PathBuf::from(FS_PATH).join("OKU_FS_DATABASE")); +/// An Oku node's database. +pub static DATABASE: LazyLock = LazyLock::new(|| OkuDatabase::new().unwrap()); +pub(crate) static MODELS: LazyLock = LazyLock::new(|| { + let mut models = Models::new(); + models.define::().unwrap(); + models.define::().unwrap(); + models.define::().unwrap(); + models +}); + +/// The database used by Oku's protocol. +pub struct OkuDatabase { + pub(crate) database: Database<'static>, +} + +impl OkuDatabase { + /// Open an existing Oku database, or create one if it does not exist. + /// + /// # Returns + /// + /// An Oku database. + pub fn new() -> miette::Result { + Ok(Self { + database: native_db::Builder::new() + .create(&MODELS, &*DATABASE_PATH) + .into_diagnostic()?, + }) + } +} diff --git a/src/database/dht.rs b/src/database/dht.rs new file mode 100644 index 0000000..ad89ffe --- /dev/null +++ b/src/database/dht.rs @@ -0,0 +1,134 @@ +use super::core::*; +use miette::IntoDiagnostic; +use native_db::*; +use native_model::{native_model, Model}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[native_model(id = 3, version = 1)] +#[native_db( + primary_key(primary_key -> (Vec, Vec)) +)] +/// A record of a replica announcement on the DHT. +pub struct ReplicaAnnouncement { + /// The public key of the announcement. + #[primary_key] + pub key: Vec, + /// The signature of the announcement. + pub signature: Vec, +} + +impl OkuDatabase { + /// Insert or update a replica announcement record. + /// + /// # Arguments + /// + /// * `announcement` - A replica announcement record to upsert. + /// + /// # Returns + /// + /// The previous record of the announcement, if one existed. + pub fn upsert_announcement( + &self, + announcement: ReplicaAnnouncement, + ) -> miette::Result> { + let rw = self.database.rw_transaction().into_diagnostic()?; + let old_value: Option = rw.upsert(announcement).into_diagnostic()?; + rw.commit().into_diagnostic()?; + Ok(old_value) + } + + /// Insert or update multiple replica announcement records. + /// + /// # Arguments + /// + /// * `announcements` - A list of replica announcement records to upsert. + /// + /// # Returns + /// + /// A list containing the previous record of each announcement, if one existed. + pub fn upsert_announcements( + &self, + announcements: Vec, + ) -> miette::Result>> { + let rw = self.database.rw_transaction().into_diagnostic()?; + let old_announcements: Vec<_> = announcements + .clone() + .into_iter() + .filter_map(|announcement| rw.upsert(announcement).ok()) + .collect(); + rw.commit().into_diagnostic()?; + Ok(old_announcements) + } + + /// Delete a replica announcement record. + /// + /// # Arguments + /// + /// * `announcement` - A replica announcement record to delete. + /// + /// # Returns + /// + /// The deleted replica announcement record. + pub fn delete_announcement( + &self, + announcement: ReplicaAnnouncement, + ) -> miette::Result { + let rw = self.database.rw_transaction().into_diagnostic()?; + let removed_announcement = rw.remove(announcement).into_diagnostic()?; + rw.commit().into_diagnostic()?; + Ok(removed_announcement) + } + + /// Delete multiple replica announcement records. + /// + /// # Arguments + /// + /// * `announcements` - A list of replica announcement records to delete. + /// + /// # Returns + /// + /// A list containing the deleted replica announcement records. + pub fn delete_announcements( + &self, + announcements: Vec, + ) -> miette::Result> { + let rw = self.database.rw_transaction().into_diagnostic()?; + let removed_announcements = announcements + .into_iter() + .filter_map(|announcement| rw.remove(announcement).ok()) + .collect(); + rw.commit().into_diagnostic()?; + Ok(removed_announcements) + } + + /// Gets the replica announcements recorded by this node. + /// + /// # Returns + /// + /// The replica announcements recorded by this node. + pub fn get_announcements(&self) -> miette::Result> { + let r = self.database.r_transaction().into_diagnostic()?; + r.scan() + .primary() + .into_diagnostic()? + .all() + .into_diagnostic()? + .collect::, _>>() + .into_diagnostic() + } + + /// Gets a replica announcement record by its public key. + /// + /// # Arguments + /// + /// * `key` - The public key of the DHT announcement. + /// + /// # Returns + /// + /// A replica announcement record. + pub fn get_announcement(&self, key: Vec) -> miette::Result> { + let r = self.database.r_transaction().into_diagnostic()?; + r.get().primary(key).into_diagnostic() + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs new file mode 100644 index 0000000..7a469fb --- /dev/null +++ b/src/database/mod.rs @@ -0,0 +1,8 @@ +/// Core functionality of an OkuNet node's database. +pub mod core; +/// Database functionality relating to the DHT. +pub mod dht; +/// Database functionality relating to OkuNet posts. +pub mod posts; +/// Database functionality relating to OkuNet users. +pub mod users; diff --git a/src/database.rs b/src/database/posts.rs similarity index 65% rename from src/database.rs rename to src/database/posts.rs index c3c5324..9319f1b 100644 --- a/src/database.rs +++ b/src/database/posts.rs @@ -1,6 +1,7 @@ +use super::core::*; +use super::users::*; use crate::fs::{path_to_entry_key, FS_PATH}; use iroh::{client::docs::Entry, docs::AuthorId}; -use log::error; use miette::IntoDiagnostic; use native_db::*; use native_model::{native_model, Model}; @@ -24,18 +25,8 @@ use tantivy::{ use tokio::sync::Mutex; use url::Url; -pub(crate) static DATABASE_PATH: LazyLock = - LazyLock::new(|| PathBuf::from(FS_PATH).join("OKU_FS_DATABASE")); -/// An Oku node's database. -pub static DATABASE: LazyLock = LazyLock::new(|| OkuDatabase::new().unwrap()); pub(crate) static POST_INDEX_PATH: LazyLock = LazyLock::new(|| PathBuf::from(FS_PATH).join("POST_INDEX")); -pub(crate) static MODELS: LazyLock = LazyLock::new(|| { - let mut models = Models::new(); - models.define::().unwrap(); - models.define::().unwrap(); - models -}); pub(crate) static POST_SCHEMA: LazyLock<(Schema, HashMap<&str, Field>)> = LazyLock::new(|| { let mut schema_builder = Schema::builder(); let fields = HashMap::from([ @@ -71,54 +62,6 @@ pub(crate) static POST_INDEX_READER: LazyLock = pub(crate) static POST_INDEX_WRITER: LazyLock>> = LazyLock::new(|| Arc::new(Mutex::new(POST_INDEX.writer(50_000_000).unwrap()))); -#[derive(Serialize, Deserialize, Debug, Clone)] -#[native_model(id = 1, version = 1)] -#[native_db( - primary_key(author_id -> Vec) -)] -/// An Oku user. -pub struct OkuUser { - /// The content authorship identifier associated with the Oku user. - pub author_id: AuthorId, - /// The system time of when this user's content was last retrieved from OkuNet. - pub last_fetched: SystemTime, - /// The posts made by this user on OkuNet. - pub posts: Vec, - /// The OkuNet identity of the user. - pub identity: Option, -} - -impl PartialEq for OkuUser { - fn eq(&self, other: &Self) -> bool { - self.author_id == other.author_id - } -} -impl Eq for OkuUser {} -impl Hash for OkuUser { - fn hash(&self, state: &mut H) { - self.author_id.hash(state); - } -} - -impl OkuUser { - fn author_id(&self) -> Vec { - self.author_id.as_bytes().to_vec() - } -} - -#[derive(Serialize, Deserialize, PartialEq, Debug, Clone, Default)] -/// An OkuNet identity for an Oku user. -pub struct OkuIdentity { - /// The display name of the Oku user. - pub name: String, - /// The content authors followed by the Oku user. - /// OkuNet content is retrieved from followed users and the users those users follow. - pub following: HashSet, - /// The content authors blocked by the Oku user. - /// Blocked authors are ignored when fetching new OkuNet posts. - pub blocked: HashSet, -} - #[derive(Serialize, Deserialize, Debug, Clone)] #[native_model(id = 2, version = 1)] #[native_db( @@ -252,25 +195,7 @@ impl OkuNote { } } -/// The database used by Oku's protocol. -pub struct OkuDatabase { - database: Database<'static>, -} - impl OkuDatabase { - /// Open an existing Oku database, or create one if it does not exist. - /// - /// # Returns - /// - /// An Oku database. - pub fn new() -> miette::Result { - Ok(Self { - database: native_db::Builder::new() - .create(&MODELS, &*DATABASE_PATH) - .into_diagnostic()?, - }) - } - /// Search OkuNet posts with a query string. /// /// # Arguments @@ -509,160 +434,4 @@ impl OkuDatabase { ); r.get().primary(entry_key).into_diagnostic() } - - /// Insert or update an OkuNet user. - /// - /// # Arguments - /// - /// * `user` - An OkuNet user to upsert. - /// - /// # Returns - /// - /// The previous version of the user, if one existed. - pub fn upsert_user(&self, user: OkuUser) -> miette::Result> { - let rw = self.database.rw_transaction().into_diagnostic()?; - let old_value: Option = rw.upsert(user).into_diagnostic()?; - rw.commit().into_diagnostic()?; - Ok(old_value) - } - - /// Delete an OkuNet user. - /// - /// # Arguments - /// - /// * `user` - An OkuNet user to delete. - /// - /// # Returns - /// - /// The deleted user. - pub fn delete_user(&self, user: OkuUser) -> miette::Result { - let rw = self.database.rw_transaction().into_diagnostic()?; - let removed_user = rw.remove(user).into_diagnostic()?; - rw.commit().into_diagnostic()?; - Ok(removed_user) - } - - /// Delete multiple OkuNet users. - /// - /// # Arguments - /// - /// * `users` - A list of OkuNet users to delete. - /// - /// # Returns - /// - /// A list containing the deleted users. - pub fn delete_users(&self, users: Vec) -> miette::Result> { - let rw = self.database.rw_transaction().into_diagnostic()?; - let removed_users = users - .into_iter() - .filter_map(|user| rw.remove(user).ok()) - .collect(); - rw.commit().into_diagnostic()?; - Ok(removed_users) - } - - /// Delete multiple OkuNet users and their posts. - /// - /// # Arguments - /// - /// * `users` - A list of OkuNet users to delete. - /// - /// # Returns - /// - /// A list containing the deleted posts. - pub fn delete_users_with_posts(&self, users: Vec) -> miette::Result> { - Ok(self - .delete_users(users)? - .par_iter() - .filter_map(|x| self.get_posts_by_author(x.author_id).ok()) - .collect::>() - .into_par_iter() - .flat_map(|x| self.delete_posts(x).ok()) - .collect::>() - .concat()) - } - - /// Deletes OkuNet users by their author IDs and posts by authors with those IDs. - /// - /// Differs from [`Self::delete_users_with_posts`] as a post will still be deleted even if a record for the authoring user is not found. - /// - /// # Arguments - /// - /// * `author_ids` - A list of content authorship IDs. - pub fn delete_by_author_ids(&self, author_ids: Vec) -> miette::Result<()> { - let users: Vec<_> = author_ids - .par_iter() - .filter_map(|x| self.get_user(*x).ok().flatten()) - .collect(); - let posts: Vec<_> = author_ids - .into_par_iter() - .filter_map(|x| self.get_posts_by_author(x).ok()) - .flatten() - .collect(); - if let Err(e) = self.delete_users(users) { - error!("{}", e); - } - if let Err(e) = self.delete_posts(posts) { - error!("{}", e); - } - Ok(()) - } - - /// Gets the content authorship IDs of all locally-known users. - /// - /// This differs from [`Self::get_users`] as IDs of authors with posts but no user records are included. - /// - /// # Returns - /// - /// A list of IDs for all users that have content in the local database. - pub fn all_local_users(&self) -> Vec { - let user_records: HashSet<_> = self - .get_users() - .unwrap_or_default() - .par_iter() - .map(|x| x.author_id) - .collect(); - let post_record_users: HashSet<_> = self - .get_posts() - .unwrap_or_default() - .par_iter() - .map(|x| x.entry.author()) - .collect(); - user_records - .union(&post_record_users) - .map(|x| x.to_owned()) - .collect() - } - - /// Gets the OkuNet content of all known users. - /// - /// # Returns - /// - /// The OkuNet content of all users known to this node. - pub fn get_users(&self) -> miette::Result> { - let r = self.database.r_transaction().into_diagnostic()?; - r.scan() - .primary() - .into_diagnostic()? - .all() - .into_diagnostic()? - .collect::, _>>() - .into_diagnostic() - } - - /// Gets an OkuNet user's content by their content authorship ID. - /// - /// # Arguments - /// - /// * `author_id` - A content authorship ID. - /// - /// # Returns - /// - /// An OkuNet user's content. - pub fn get_user(&self, author_id: AuthorId) -> miette::Result> { - let r = self.database.r_transaction().into_diagnostic()?; - r.get() - .primary(author_id.as_bytes().to_vec()) - .into_diagnostic() - } } diff --git a/src/database/users.rs b/src/database/users.rs new file mode 100644 index 0000000..af680cc --- /dev/null +++ b/src/database/users.rs @@ -0,0 +1,217 @@ +use super::core::*; +use super::posts::*; +use iroh::{client::docs::Entry, docs::AuthorId}; +use log::error; +use miette::IntoDiagnostic; +use native_db::*; +use native_model::{native_model, Model}; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use serde::{Deserialize, Serialize}; +use std::hash::{Hash, Hasher}; +use std::{collections::HashSet, time::SystemTime}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[native_model(id = 1, version = 1)] +#[native_db( + primary_key(author_id -> Vec) +)] +/// An Oku user. +pub struct OkuUser { + /// The content authorship identifier associated with the Oku user. + pub author_id: AuthorId, + /// The system time of when this user's content was last retrieved from OkuNet. + pub last_fetched: SystemTime, + /// The posts made by this user on OkuNet. + pub posts: Vec, + /// The OkuNet identity of the user. + pub identity: Option, +} + +impl PartialEq for OkuUser { + fn eq(&self, other: &Self) -> bool { + self.author_id == other.author_id + } +} +impl Eq for OkuUser {} +impl Hash for OkuUser { + fn hash(&self, state: &mut H) { + self.author_id.hash(state); + } +} + +impl OkuUser { + fn author_id(&self) -> Vec { + self.author_id.as_bytes().to_vec() + } +} + +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone, Default)] +/// An OkuNet identity for an Oku user. +pub struct OkuIdentity { + /// The display name of the Oku user. + pub name: String, + /// The content authors followed by the Oku user. + /// OkuNet content is retrieved from followed users and the users those users follow. + pub following: HashSet, + /// The content authors blocked by the Oku user. + /// Blocked authors are ignored when fetching new OkuNet posts. + pub blocked: HashSet, +} + +impl OkuDatabase { + /// Insert or update an OkuNet user. + /// + /// # Arguments + /// + /// * `user` - An OkuNet user to upsert. + /// + /// # Returns + /// + /// The previous version of the user, if one existed. + pub fn upsert_user(&self, user: OkuUser) -> miette::Result> { + let rw = self.database.rw_transaction().into_diagnostic()?; + let old_value: Option = rw.upsert(user).into_diagnostic()?; + rw.commit().into_diagnostic()?; + Ok(old_value) + } + + /// Delete an OkuNet user. + /// + /// # Arguments + /// + /// * `user` - An OkuNet user to delete. + /// + /// # Returns + /// + /// The deleted user. + pub fn delete_user(&self, user: OkuUser) -> miette::Result { + let rw = self.database.rw_transaction().into_diagnostic()?; + let removed_user = rw.remove(user).into_diagnostic()?; + rw.commit().into_diagnostic()?; + Ok(removed_user) + } + + /// Delete multiple OkuNet users. + /// + /// # Arguments + /// + /// * `users` - A list of OkuNet users to delete. + /// + /// # Returns + /// + /// A list containing the deleted users. + pub fn delete_users(&self, users: Vec) -> miette::Result> { + let rw = self.database.rw_transaction().into_diagnostic()?; + let removed_users = users + .into_iter() + .filter_map(|user| rw.remove(user).ok()) + .collect(); + rw.commit().into_diagnostic()?; + Ok(removed_users) + } + + /// Delete multiple OkuNet users and their posts. + /// + /// # Arguments + /// + /// * `users` - A list of OkuNet users to delete. + /// + /// # Returns + /// + /// A list containing the deleted posts. + pub fn delete_users_with_posts(&self, users: Vec) -> miette::Result> { + Ok(self + .delete_users(users)? + .par_iter() + .filter_map(|x| self.get_posts_by_author(x.author_id).ok()) + .collect::>() + .into_par_iter() + .flat_map(|x| self.delete_posts(x).ok()) + .collect::>() + .concat()) + } + + /// Deletes OkuNet users by their author IDs and posts by authors with those IDs. + /// + /// Differs from [`Self::delete_users_with_posts`] as a post will still be deleted even if a record for the authoring user is not found. + /// + /// # Arguments + /// + /// * `author_ids` - A list of content authorship IDs. + pub fn delete_by_author_ids(&self, author_ids: Vec) -> miette::Result<()> { + let users: Vec<_> = author_ids + .par_iter() + .filter_map(|x| self.get_user(*x).ok().flatten()) + .collect(); + let posts: Vec<_> = author_ids + .into_par_iter() + .filter_map(|x| self.get_posts_by_author(x).ok()) + .flatten() + .collect(); + if let Err(e) = self.delete_users(users) { + error!("{}", e); + } + if let Err(e) = self.delete_posts(posts) { + error!("{}", e); + } + Ok(()) + } + + /// Gets the content authorship IDs of all locally-known users. + /// + /// This differs from [`Self::get_users`] as IDs of authors with posts but no user records are included. + /// + /// # Returns + /// + /// A list of IDs for all users that have content in the local database. + pub fn all_local_users(&self) -> Vec { + let user_records: HashSet<_> = self + .get_users() + .unwrap_or_default() + .par_iter() + .map(|x| x.author_id) + .collect(); + let post_record_users: HashSet<_> = self + .get_posts() + .unwrap_or_default() + .par_iter() + .map(|x| x.entry.author()) + .collect(); + user_records + .union(&post_record_users) + .map(|x| x.to_owned()) + .collect() + } + + /// Gets the OkuNet content of all known users. + /// + /// # Returns + /// + /// The OkuNet content of all users known to this node. + pub fn get_users(&self) -> miette::Result> { + let r = self.database.r_transaction().into_diagnostic()?; + r.scan() + .primary() + .into_diagnostic()? + .all() + .into_diagnostic()? + .collect::, _>>() + .into_diagnostic() + } + + /// Gets an OkuNet user's content by their content authorship ID. + /// + /// # Arguments + /// + /// * `author_id` - A content authorship ID. + /// + /// # Returns + /// + /// An OkuNet user's content. + pub fn get_user(&self, author_id: AuthorId) -> miette::Result> { + let r = self.database.r_transaction().into_diagnostic()?; + r.get() + .primary(author_id.as_bytes().to_vec()) + .into_diagnostic() + } +} diff --git a/src/discovery.rs b/src/discovery.rs index 6f41010..ff507aa 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,10 +1,13 @@ +use crate::database::core::DATABASE; use crate::{error::OkuDiscoveryError, fs::OkuFs}; use iroh::base::ticket::Ticket; +use iroh::blobs::HashAndFormat; use iroh::docs::CapabilityKind; use iroh::{client::docs::ShareMode, docs::NamespaceId}; use log::{error, info}; use miette::IntoDiagnostic; use std::{path::PathBuf, time::Duration}; +use tokio::task::JoinSet; /// The delay between republishing content to the Mainline DHT. pub const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60); @@ -18,7 +21,10 @@ impl OkuFs { /// # Arguments /// /// * `namespace_id` - The ID of the replica to announce. - pub async fn announce_replica(&self, namespace_id: NamespaceId) -> miette::Result<()> { + pub async fn announce_mutable_replica( + &self, + namespace_id: NamespaceId, + ) -> miette::Result { let ticket = mainline::Bytes::from( self.create_document_ticket(namespace_id, ShareMode::Read) .await? @@ -39,7 +45,7 @@ impl OkuFs { let mutable_item = mainline::MutableItem::new(replica_private_key, ticket, newest_timestamp, None); match self.dht.put_mutable(mutable_item).await { - Ok(_) => info!("Announced replica {} … ", namespace_id.to_string()), + Ok(_) => info!("Announced mutable replica {} … ", namespace_id.to_string()), Err(e) => error!( "{}", OkuDiscoveryError::ProblemAnnouncingContent( @@ -48,7 +54,83 @@ impl OkuFs { ) ), } - Ok(()) + Ok(namespace_id) + } + + /// Announces a read-only replica to the Mainline DHT. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica to announce. + pub async fn announce_immutable_replica( + &self, + namespace_id: NamespaceId, + ) -> miette::Result { + let public_key_bytes = namespace_id + .into_public_key() + .map_err(|e| miette::miette!("{}", e))? + .as_bytes() + .to_vec(); + let announcement = DATABASE + .get_announcement(public_key_bytes) + .ok() + .flatten() + .ok_or(miette::miette!( + "Prior announcement not found in database for replica {:?} … ", + namespace_id + ))?; + + let ticket = mainline::Bytes::from( + self.create_document_ticket(namespace_id, ShareMode::Read) + .await? + .to_bytes(), + ); + let newest_timestamp = self + .get_newest_timestamp_in_folder(namespace_id, PathBuf::from("/")) + .await? as i64; + let mutable_item = mainline::MutableItem::new_signed_unchecked( + announcement.key.try_into().map_err(|_e| { + miette::miette!("Replica announcement key does not fit into 32 bytes … ") + })?, + announcement.signature.try_into().map_err(|_e| { + miette::miette!("Replica announcement signature does not fit into 64 bytes … ") + })?, + ticket, + newest_timestamp, + None, + ); + match self.dht.put_mutable(mutable_item).await { + Ok(_) => info!( + "Announced immutable replica {} … ", + namespace_id.to_string() + ), + Err(e) => error!( + "{}", + OkuDiscoveryError::ProblemAnnouncingContent( + namespace_id.to_string(), + e.to_string() + ) + ), + } + Ok(namespace_id) + } + + /// Announces a replica to the Mainline DHT. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica to announce. + /// + /// * `capability_kind` - Whether the replica is writable by the current node or read-only. + pub async fn announce_replica( + &self, + namespace_id: NamespaceId, + capability_kind: CapabilityKind, + ) -> miette::Result { + match capability_kind { + CapabilityKind::Read => self.announce_immutable_replica(namespace_id).await, + CapabilityKind::Write => self.announce_mutable_replica(namespace_id).await, + } } /// Announce the home replica @@ -89,17 +171,42 @@ impl OkuFs { /// Announces all writable replicas to the Mainline DHT. pub async fn announce_replicas(&self) -> miette::Result<()> { - let mut replicas = self.list_replicas().await?; - replicas - .retain(|(_replica, capability_kind)| matches!(capability_kind, CapabilityKind::Write)); + let mut future_set = JoinSet::new(); - if let Ok(home_replica) = self.announce_home_replica().await { - replicas.retain(|(replica, _capability_kind)| *replica != home_replica); - } + // Prepare to announce home replica + let self_clone = self.clone(); + future_set.spawn(async move { self_clone.announce_home_replica().await }); - for (replica, _capability_kind) in replicas { - self.announce_replica(replica).await?; + // Prepare to announce all replicas + let replicas = self.list_replicas().await?; + for (replica, capability_kind) in replicas { + let self_clone = self.clone(); + future_set + .spawn(async move { self_clone.announce_replica(replica, capability_kind).await }); + } + info!("Pending announcements: {} … ", future_set.len()); + // Execute announcements in parallel + while let Some(res) = future_set.join_next().await { + match res { + Ok(result) => match result { + Ok(_) => (), + Err(e) => error!("{}", e), + }, + Err(e) => error!("{}", e), + } } + Ok(()) } } + +/// From: https://github.com/n0-computer/iroh-experiments/blob/4e052c6b34720e26683083270706926a84e49411/content-discovery/iroh-mainline-content-discovery/src/client.rs#L53 +/// +/// The mapping from an iroh [HashAndFormat] to a bittorrent infohash, aka [mainline::Id]. +/// +/// Since an infohash is just 20 bytes, this can not be a bidirectional mapping. +pub fn to_infohash(haf: HashAndFormat) -> mainline::Id { + let mut data = [0u8; 20]; + data.copy_from_slice(&haf.hash.as_bytes()[..20]); + mainline::Id::from_bytes(data).unwrap() +} diff --git a/src/fs/net.rs b/src/fs/net.rs index 00d2677..c8004c0 100644 --- a/src/fs/net.rs +++ b/src/fs/net.rs @@ -2,7 +2,12 @@ use std::{collections::HashSet, path::PathBuf, time::SystemTime}; use crate::{ config::OkuFsConfig, - database::{OkuIdentity, OkuNote, OkuPost, OkuUser, DATABASE}, + database::{ + core::DATABASE, + dht::ReplicaAnnouncement, + posts::{OkuNote, OkuPost}, + users::{OkuIdentity, OkuUser}, + }, discovery::REPUBLISH_DELAY, fs::{merge_tickets, OkuFs}, }; @@ -598,6 +603,10 @@ impl OkuFs { tokio::pin!(get_stream); let mut tickets = Vec::new(); while let Some(mutable_item) = get_stream.next().await { + let _ = DATABASE.upsert_announcement(ReplicaAnnouncement { + key: mutable_item.key().to_vec(), + signature: mutable_item.signature().to_vec(), + }); tickets.push(DocTicket::from_bytes(mutable_item.value())?) } merge_tickets(tickets).ok_or(anyhow!( diff --git a/src/fs/replica.rs b/src/fs/replica.rs index 4b0bd1d..66e5f8a 100644 --- a/src/fs/replica.rs +++ b/src/fs/replica.rs @@ -1,5 +1,7 @@ use super::*; use crate::config::OkuFsConfig; +use crate::database::core::DATABASE; +use crate::database::dht::ReplicaAnnouncement; use crate::error::{OkuDiscoveryError, OkuFsError, OkuFuseError}; use anyhow::anyhow; use futures::{pin_mut, StreamExt}; @@ -309,6 +311,10 @@ impl OkuFs { tokio::pin!(get_stream); let mut tickets = Vec::new(); while let Some(mutable_item) = get_stream.next().await { + let _ = DATABASE.upsert_announcement(ReplicaAnnouncement { + key: mutable_item.key().to_vec(), + signature: mutable_item.signature().to_vec(), + }); tickets.push(DocTicket::from_bytes(mutable_item.value())?) } merge_tickets(tickets).ok_or(anyhow!(