From 3d233597f395acc709c5aa80951eb5484b2ebb2c Mon Sep 17 00:00:00 2001 From: Emil Sayahi <97276123+emmyoh@users.noreply.github.com> Date: Mon, 22 Apr 2024 10:20:38 -0400 Subject: [PATCH] feat(discovery): Begin implementing announcement --- .cargo/config.toml | 17 +++ .github/workflows/main.yml | 69 +++++++++++ .gitignore | 5 +- Cargo.toml | 1 + rust-toolchain.toml | 3 + src/discovery.rs | 130 +++++++++++++------- src/error.rs | 7 +- src/fs.rs | 235 +++++++++++++++++++++++++++++++------ src/main.rs | 131 ++++++++++++++++++++- 9 files changed, 521 insertions(+), 77 deletions(-) create mode 100644 .cargo/config.toml create mode 100644 .github/workflows/main.yml create mode 100644 rust-toolchain.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..8fa5432 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,17 @@ +[doc.extern-map.registries] +crates-io = "https://docs.rs/" + +[target.aarch64-apple-darwin] +rustflags = ["-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"] + +[target.x86_64-apple-darwin] +rustflags = ["-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"] + +[target.x86_64-pc-windows-gnu] +rustflags = ["-C", "link-arg=-lpsapi", "-C", "link-arg=-lbcrypt", "-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"] + +[target.aarch64-unknown-linux-gnu] +rustflags = ["-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"] + +[target.x86_64-unknown-linux-gnu] +rustflags = ["-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"] \ No newline at end of file diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..e10f8c5 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,69 @@ +name: Document and check codebase +on: + push: + branches: [ "master" ] +env: + CARGO_TERM_COLOR: always +jobs: + build_documentation: + name: Build documentation + runs-on: ubuntu-latest + steps: + - name: Setup Rust toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + target: x86_64-unknown-linux-gnu + default: true + profile: default + - name: Checkout codebase + uses: actions/checkout@v4 + - name: Generate documentation + run: time cargo doc --no-deps -Zrustdoc-map --all-features --release --quiet + - name: Fix permissions + run: | + chmod -c -R +rX "target/doc/" | while read line; do + echo "::warning title=Invalid file permissions automatically fixed::$line" + done + - name: Upload Pages artifact + uses: actions/upload-pages-artifact@v3 + with: + path: "target/doc/" + deploy_documentation: + needs: build_documentation + name: Deploy documentation to GitHub Pages + permissions: + pages: write + id-token: write + environment: + name: github-pages + url: ${{ steps.deployment.outputs.page_url }} + runs-on: ubuntu-latest + steps: + - name: Deploy to GitHub Pages + id: deployment + uses: actions/deploy-pages@v4 + apply_suggestions: + name: Format code, apply compiler suggestions + runs-on: ubuntu-latest + steps: + - name: Checkout codebase + uses: actions/checkout@v4 + - name: Setup Rust toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + components: clippy, rustfmt + profile: minimal + - name: Format + run: cargo fmt + - name: Apply compiler suggestions + run: | + cargo fix --all-features --edition --edition-idioms + cargo clippy --fix -Z unstable-options + - name: Commit changes to code, if any + run: | + git config user.name github-actions + git config user.email github-actions@github.com + git diff --quiet && git diff --staged --quiet || git commit -am "chore: Format and apply compiler suggestions." + git push \ No newline at end of file diff --git a/.gitignore b/.gitignore index dc5f487..381fbdc 100644 --- a/.gitignore +++ b/.gitignore @@ -96,4 +96,7 @@ Cargo.lock **/*.rs.bk # MSVC Windows builds of rustc generate these, which store debugging information -*.pdb \ No newline at end of file +*.pdb + +## Oku +.oku \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index f6e5829..0a752a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ async-trait = "0.1.79" bincode = "1.3.3" bytes = "1.6.0" chrono = "0.4.37" +clap = { version = "4.5.4", features = ["derive"] } derive_more = "0.99.17" flume = "0.11.0" futures = "0.3.30" diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..6679663 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "nightly" +targets = [ "aarch64-unknown-linux-gnu", "x86_64-unknown-linux-gnu", "aarch64-apple-darwin", "x86_64-apple-darwin", "x86_64-pc-windows-gnu" ] \ No newline at end of file diff --git a/src/discovery.rs b/src/discovery.rs index 22cbbc6..e2f9c94 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,16 +1,65 @@ use crate::error::OkuDiscoveryError; - +use futures::StreamExt; use iroh::{ bytes::{Hash, HashAndFormat}, - net::NodeId, + sync::NamespaceId, ticket::BlobTicket, }; -use iroh_mainline_content_discovery::protocol::{Query, QueryFlags}; -use iroh_mainline_content_discovery::to_infohash; -use iroh_mainline_content_discovery::UdpDiscovery; +use iroh_mainline_content_discovery::announce_dht; +use std::{collections::BTreeSet, error::Error, str::FromStr, time::Duration}; + +/// The delay between republishing content to the mainline DHT. +pub const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60); + +/// The initial delay before publishing content to the mainline DHT. +pub const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500); + +/// The port used for communication between other Oku filesystem nodes. +pub const DISCOVERY_PORT: u16 = 4938; +pub const ANNOUNCE_PARALLELISM: usize = 10; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use std::{error::Error, str::FromStr}; +// pub fn announce_dht( +// dht: mainline::dht::Dht, +// content: BTreeSet, +// port: Option, +// announce_parallelism: usize, +// ) -> impl Stream)> { +// let dht = dht.as_async(); +// futures::stream::iter(content) +// .map(move |content| { +// let dht = dht.clone(); +// async move { +// let info_hash = to_infohash(content); +// println!( +// "announcing content that corresponds to infohash {}", +// info_hash +// ); +// let res = dht.announce_peer(info_hash, port).await; +// (content, res) +// } +// }) +// .buffer_unordered(announce_parallelism) +// } + +/// Announces a local replica to the mainline DHT. +/// +/// # Arguments +/// +/// * `namespace_id` - The ID of the replica to announce. +pub async fn announce_replica(namespace_id: NamespaceId) -> Result<(), Box> { + let mut content = BTreeSet::new(); + content.insert(HashAndFormat::raw(Hash::new(namespace_id.clone()))); + let dht = mainline::Dht::default(); + let announce_stream = announce_dht(dht, content, DISCOVERY_PORT, ANNOUNCE_PARALLELISM); + tokio::pin!(announce_stream); + while let Some((content, res)) = announce_stream.next().await { + match res { + Ok(_) => println!("announced {:?}", content), + Err(e) => eprintln!("error announcing {:?}: {:?}", content, e), + } + } + Ok(()) +} /* The `ContentRequest` enum is derived from the `ContentArg` enum in the `iroh-examples` repository (https://github.com/n0-computer/iroh-examples/blob/6f184933efa72eec1d8cf2e8d07905650c0fdb46/content-discovery/iroh-mainline-content-discovery-cli/src/args.rs#L23). @@ -38,6 +87,7 @@ impl ContentRequest { }, } } + /// Get the hash for this content request. pub fn hash(&self) -> Hash { match self { ContentRequest::Hash(hash) => *hash, @@ -62,38 +112,38 @@ impl FromStr for ContentRequest { } } -pub async fn query_dht( - content: ContentRequest, - partial: bool, - verified: bool, - udp_port: Option, -) -> Result<(), Box> { - let _providers: Vec = Vec::new(); - let bind_addr = SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::UNSPECIFIED, - udp_port.unwrap_or_default(), - )); - let discovery = UdpDiscovery::new(bind_addr).await?; - let dht = mainline::Dht::default(); - let q = Query { - content: content.hash_and_format(), - flags: QueryFlags { - complete: !partial, - verified: verified, - }, - }; - println!("content corresponds to infohash {}", to_infohash(q.content)); +// pub async fn query_dht( +// content: ContentRequest, +// partial: bool, +// verified: bool, +// udp_port: Option, +// ) -> Result<(), Box> { +// let _providers: Vec = Vec::new(); +// let bind_addr = SocketAddr::V4(SocketAddrV4::new( +// Ipv4Addr::UNSPECIFIED, +// udp_port.unwrap_or_default(), +// )); +// let discovery = UdpDiscovery::new(bind_addr).await?; +// let dht = mainline::Dht::default(); +// let q = Query { +// content: content.hash_and_format(), +// flags: QueryFlags { +// complete: !partial, +// verified: verified, +// }, +// }; +// println!("content corresponds to infohash {}", to_infohash(q.content)); - let _stream = discovery.query_dht(dht, q).await?; - // while let Some(announce) = stream.next().await { - // if announce.verify().is_ok() { - // println!("found verified provider {}", announce.host); - // providers.push(announce.host); - // } else { - // println!("got wrong signed announce!"); - // } - // } - // Ok(providers) +// let _stream = discovery.query_dht(dht, q).await?; +// // while let Some(announce) = stream.next().await { +// // if announce.verify().is_ok() { +// // println!("found verified provider {}", announce.host); +// // providers.push(announce.host); +// // } else { +// // println!("got wrong signed announce!"); +// // } +// // } +// // Ok(providers) - Ok(()) -} +// Ok(()) +// } diff --git a/src/error.rs b/src/error.rs index a892b5f..366da4b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,17 +43,22 @@ pub enum OkuFsError { } #[derive(Error, Debug, Diagnostic)] +/// Content discovery errors. pub enum OkuDiscoveryError { #[error("Invalid hash and format.")] #[diagnostic(code(discovery::invalid_hash_and_format), url(docsrs))] + /// Invalid hash and format. InvalidHashAndFormat, #[error("Unable to discover node address for node ID.")] #[diagnostic(code(discovery::node_address_discovery_failed), url(docsrs))] + /// Unable to discover node address for node ID. NodeAddressDiscoveryFailed, - #[error("Unable to find nodes able to satisfy query")] + #[error("Unable to find nodes able to satisfy query.")] #[diagnostic(code(discovery::no_nodes_found), url(docsrs))] + /// Unable to find nodes able to satisfy query. NoNodesFound, #[error("Unsupported protocol identifier: {0}")] #[diagnostic(code(discovery::unsupported_alpn), url(docsrs))] + /// Unsupported protocol identifier. UnsupportedALPN(String), } diff --git a/src/fs.rs b/src/fs.rs index d592d74..0a7822c 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,8 +1,11 @@ +use crate::discovery::DISCOVERY_PORT; +use crate::discovery::{announce_replica, INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY}; use crate::error::OkuDiscoveryError; use crate::{discovery::ContentRequest, error::OkuFsError}; use bytes::Bytes; -use futures::StreamExt; +use futures::{pin_mut, StreamExt}; use iroh::base::ticket::Ticket; +use iroh::client::Entry; use iroh::net::magic_endpoint::accept_conn; use iroh::ticket::DocTicket; use iroh::{ @@ -21,31 +24,57 @@ use iroh_mainline_content_discovery::UdpDiscovery; use iroh_pkarr_node_discovery::PkarrNodeDiscovery; use path_clean::PathClean; use rand_core::OsRng; -use sha3::{Digest, Sha3_256}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::{error::Error, path::PathBuf}; +/// The path on disk where the file system is stored. pub const FS_PATH: &str = ".oku"; + +/// The protocol identifier for exchanging document tickets. pub const ALPN_DOCUMENT_TICKET_FETCH: &[u8] = b"oku/document-ticket/fetch/v0"; fn normalise_path(path: PathBuf) -> PathBuf { PathBuf::from("/").join(path).clean() } -pub fn path_to_entry_key(path: PathBuf) -> String { +/// Converts a path to a key for an entry in a file system replica. +/// +/// # Arguments +/// +/// * `path` - The path to convert to a key. +/// +/// # Returns +/// +/// A null-terminated byte string representing the path. +pub fn path_to_entry_key(path: PathBuf) -> Bytes { let path = normalise_path(path.clone()); - let mut hasher = Sha3_256::new(); - hasher.update(path.clone().into_os_string().into_encoded_bytes()); - let path_hash = hasher.finalize(); - format!("{}\u{F0000}{}", path.display(), hex::encode(path_hash)) + // let mut hasher = Sha3_256::new(); + // hasher.update(path.clone().into_os_string().into_encoded_bytes()); + // let path_hash = hasher.finalize(); + // format!("{}\u{F0000}{}", path.display(), hex::encode(path_hash)) + let mut path_bytes = path.into_os_string().into_encoded_bytes(); + path_bytes.push(b'\0'); + path_bytes.into() } +/// An instance of an Oku file system. +/// +/// The `OkuFs` struct is the primary interface for interacting with an Oku file system. pub struct OkuFs { + /// An Iroh node responsible for storing replicas on the local machine, as well as joining swarms to fetch replicas from other nodes. node: FsNode, + /// The public key of the author of the file system. author_id: AuthorId, } impl OkuFs { + /// Starts an instance of an Oku file system. + /// In the background, an Iroh node is started, and the node's address is periodically announced to the mainline DHT. + /// If no author credentials are found on disk, new credentials are generated. + /// + /// # Returns + /// + /// A running instance of an Oku file system. pub async fn start() -> Result> { let node_path = PathBuf::from(FS_PATH).join("node"); let node = FsNode::persistent(node_path).await?.spawn().await?; @@ -55,20 +84,42 @@ impl OkuFs { let author_id = if authors_count == 0 { node.authors.create().await? } else { + let authors = node.authors.list().await?; + futures::pin_mut!(authors); let authors_list: Vec = authors.map(|author| author.unwrap()).collect().await; authors_list[0] }; - let node_addr = node.my_addr().await?; + let oku_fs = OkuFs { node, author_id }; + let node_addr = oku_fs.node.my_addr().await?; let addr_info = node_addr.info; - let magic_endpoint = node.magic_endpoint(); + let magic_endpoint = oku_fs.node.magic_endpoint(); let secret_key = magic_endpoint.secret_key(); let mut discovery_service = ConcurrentDiscovery::new(); let pkarr = PkarrNodeDiscovery::builder().secret_key(secret_key).build(); discovery_service.add(pkarr); discovery_service.publish(&addr_info); - Ok(OkuFs { node, author_id }) + let docs_client = &oku_fs.node.docs; + let docs_client = docs_client.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(INITIAL_PUBLISH_DELAY).await; + let replicas = docs_client.list().await.unwrap(); + pin_mut!(replicas); + while let Some(replica) = replicas.next().await { + let (namespace_id, _) = replica.unwrap(); + announce_replica(namespace_id).await.unwrap(); + } + tokio::time::sleep(REPUBLISH_DELAY - INITIAL_PUBLISH_DELAY).await; + } + }); + Ok(oku_fs) } + /// Create a mechanism for discovering other nodes on the network given their IDs. + /// + /// # Returns + /// + /// A discovery service for finding other node's addresses given their IDs. pub async fn create_discovery_service(&self) -> Result> { let node_addr = self.node.my_addr().await?; let addr_info = node_addr.info; @@ -81,10 +132,16 @@ impl OkuFs { Ok(discovery_service) } + /// Shuts down the Oku file system. pub fn shutdown(self) { self.node.shutdown(); } + /// Creates a new replica in the file system. + /// + /// # Returns + /// + /// The ID of the new replica, being its public key. pub async fn create_replica(&self) -> Result> { let docs_client = &self.node.docs; let new_document = docs_client.create().await?; @@ -93,14 +150,71 @@ impl OkuFs { Ok(document_id) } - pub async fn delete_replica(&self, namespace: NamespaceId) -> Result<(), Box> { + /// Deletes a replica from the file system. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica to delete. + pub async fn delete_replica(&self, namespace_id: NamespaceId) -> Result<(), Box> { let docs_client = &self.node.docs; - Ok(docs_client.drop_doc(namespace).await?) + Ok(docs_client.drop_doc(namespace_id).await?) + } + + /// Lists all replicas in the file system. + /// + /// # Returns + /// + /// A list of all replicas in the file system. + pub async fn list_replicas(&self) -> Result, Box> { + let docs_client = &self.node.docs; + let replicas = docs_client.list().await?; + pin_mut!(replicas); + let replica_ids: Vec = + replicas.map(|replica| replica.unwrap().0).collect().await; + Ok(replica_ids) + } + + /// Lists all files in a replica. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica to list files in. + /// + /// # Returns + /// + /// A list of all files in the replica. + pub async fn list_files( + &self, + namespace_id: NamespaceId, + ) -> Result, Box> { + let docs_client = &self.node.docs; + let document = docs_client + .open(namespace_id) + .await? + .ok_or(OkuFsError::FsEntryNotFound)?; + let query = iroh::sync::store::Query::single_latest_per_key().build(); + let entries = document.get_many(query).await?; + pin_mut!(entries); + let files: Vec = entries.map(|entry| entry.unwrap()).collect().await; + Ok(files) } + /// Creates a file (if it does not exist) or modifies an existing file. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica containing the file to create or modify. + /// + /// * `path` - The path of the file to create or modify. + /// + /// * `data` - The data to write to the file. + /// + /// # Returns + /// + /// The hash of the file. pub async fn create_or_modify_file( &self, - namespace: NamespaceId, + namespace_id: NamespaceId, path: PathBuf, data: impl Into, ) -> Result> { @@ -108,7 +222,7 @@ impl OkuFs { let data_bytes = data.into(); let docs_client = &self.node.docs; let document = docs_client - .open(namespace) + .open(namespace_id) .await? .ok_or(OkuFsError::FsEntryNotFound)?; let entry_hash = document @@ -118,30 +232,52 @@ impl OkuFs { Ok(entry_hash) } + /// Deletes a file. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica containing the file to delete. + /// + /// * `path` - The path of the file to delete. + /// + /// # Returns + /// + /// The number of entries deleted in the replica, which should be 1 if the file was successfully deleted. pub async fn delete_file( &self, - namespace: NamespaceId, + namespace_id: NamespaceId, path: PathBuf, ) -> Result> { let file_key = path_to_entry_key(path); let docs_client = &self.node.docs; let document = docs_client - .open(namespace) + .open(namespace_id) .await? .ok_or(OkuFsError::FsEntryNotFound)?; let entries_deleted = document.del(self.author_id, file_key).await?; Ok(entries_deleted) } + /// Reads a file. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica containing the file to read. + /// + /// * `path` - The path of the file to read. + /// + /// # Returns + /// + /// The data read from the file. pub async fn read_file( &self, - namespace: NamespaceId, + namespace_id: NamespaceId, path: PathBuf, ) -> Result> { let file_key = path_to_entry_key(path); let docs_client = &self.node.docs; let document = docs_client - .open(namespace) + .open(namespace_id) .await? .ok_or(OkuFsError::FsEntryNotFound)?; let entry = document @@ -151,29 +287,53 @@ impl OkuFs { Ok(entry.content_bytes(self.node.client()).await?) } + /// Moves a file by copying it to a new location and deleting the original. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica containing the file to move. + /// + /// * `from` - The path of the file to move. + /// + /// * `to` - The path to move the file to. + /// + /// # Returns + /// + /// A tuple containing the hash of the file at the new destination and the number of replica entries deleted during the operation, which should be 1 if the file at the original path was deleted. pub async fn move_file( &self, - namespace: NamespaceId, + namespace_id: NamespaceId, from: PathBuf, to: PathBuf, ) -> Result<(Hash, usize), Box> { - let data = self.read_file(namespace, from.clone()).await?; + let data = self.read_file(namespace_id, from.clone()).await?; let hash = self - .create_or_modify_file(namespace, to.clone(), data) + .create_or_modify_file(namespace_id, to.clone(), data) .await?; - let entries_deleted = self.delete_file(namespace, from).await?; + let entries_deleted = self.delete_file(namespace_id, from).await?; Ok((hash, entries_deleted)) } + /// Deletes a directory and all its contents. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica containing the directory to delete. + /// + /// * `path` - The path of the directory to delete. + /// + /// # Returns + /// + /// The number of entries deleted. pub async fn delete_directory( &self, - namespace: NamespaceId, + namespace_id: NamespaceId, path: PathBuf, ) -> Result> { let path = normalise_path(path).join(""); // Ensure path ends with a slash let docs_client = &self.node.docs; let document = docs_client - .open(namespace) + .open(namespace_id) .await? .ok_or(OkuFsError::FsEntryNotFound)?; let entries_deleted = document @@ -182,6 +342,8 @@ impl OkuFs { Ok(entries_deleted) } + /// Handles incoming requests for document tickets. + /// This function listens for incoming connections from peers and responds to requests for document tickets. pub async fn listen_for_document_ticket_fetch_requests(&self) -> Result<(), Box> { let mut alpns: Vec> = Vec::new(); alpns.push(ALPN_DOCUMENT_TICKET_FETCH.to_vec()); @@ -218,12 +380,22 @@ impl OkuFs { Ok(()) } + /// Joins a swarm to fetch the latest version of a replica and save it to the local machine. + /// + /// # Arguments + /// + /// * `namespace_id` - The ID of the replica to fetch. + /// + /// * `partial` - Whether to discover peers who claim to only have a partial copy of the replica. + /// + /// * `verified` - Whether to discover peers who have been verified to have the replica. + /// + /// * `udp_port` - Optionally-specified UDP port to use for querying the mainline DHT. pub async fn get_external_replica( &self, - namespace: NamespaceId, + namespace_id: NamespaceId, partial: bool, verified: bool, - udp_port: Option, ) -> Result<(), Box> { // let discovery_items_stream = self // .discovery_service @@ -242,7 +414,7 @@ impl OkuFs { // Some(node_addrs) // } // }; - let content = ContentRequest::Hash(Hash::new(namespace.clone())); + let content = ContentRequest::Hash(Hash::new(namespace_id.clone())); let secret_key = self.node.magic_endpoint().secret_key(); let endpoint = MagicEndpoint::builder() .alpns(vec![]) @@ -250,10 +422,7 @@ impl OkuFs { .discovery(Box::new(self.create_discovery_service().await?)) .bind(0) .await?; - let bind_addr = SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::UNSPECIFIED, - udp_port.unwrap_or_default(), - )); + let bind_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DISCOVERY_PORT)); let discovery = UdpDiscovery::new(bind_addr).await?; let dht = mainline::Dht::default(); let q = Query { @@ -292,7 +461,7 @@ impl OkuFs { .await .ok_or(OkuDiscoveryError::NoNodesFound)?; let (mut send, mut recv) = connection.open_bi().await?; - send.write_all(&postcard::to_stdvec(namespace.as_bytes())?) + send.write_all(&postcard::to_stdvec(namespace_id.as_bytes())?) .await?; let ticket = DocTicket::from_bytes(&recv.read_to_end(256).await?)?; let docs_client = &self.node.docs; @@ -300,7 +469,7 @@ impl OkuFs { Ok(()) } - // pub async fn get_external_replica(&self, namespace: NamespaceId) -> Result<(), Box> { + // pub async fn get_external_replica(&self, namespace_id: NamespaceId) -> Result<(), Box> { // // let providers: Vec = // // discovery::query_dht(ContentRequest::Hash(Hash::new(namespace)), true, true, None) // // .await?; diff --git a/src/main.rs b/src/main.rs index cc197b2..ba41e0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,133 @@ -use std::error::Error; +use bytes::Bytes; +use clap::{Parser, Subcommand}; +use iroh::sync::NamespaceId; +use oku_fs::fs::OkuFs; +use std::{error::Error, path::PathBuf}; + +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Cli { + #[command(subcommand)] + command: Option, +} +#[derive(Subcommand)] +enum Commands { + CreateReplica, + CreateFile { + #[arg(short, long, value_name = "REPLICA_ID")] + replica_id: NamespaceId, + #[arg(short, long, value_name = "PATH")] + path: PathBuf, + #[arg(short, long, value_name = "DATA")] + data: Bytes, + }, + ListFiles { + #[arg(value_name = "REPLICA_ID")] + replica_id: NamespaceId, + }, + ListReplicas, + GetFile { + #[arg(short, long, value_name = "REPLICA_ID")] + replica_id: NamespaceId, + #[arg(short, long, value_name = "PATH")] + path: PathBuf, + }, + RemoveFile { + #[arg(short, long, value_name = "REPLICA_ID")] + replica_id: NamespaceId, + #[arg(short, long, value_name = "PATH")] + path: PathBuf, + }, + RemoveDirectory { + #[arg(short, long, value_name = "REPLICA_ID")] + replica_id: NamespaceId, + #[arg(short, long, value_name = "PATH")] + path: PathBuf, + }, + RemoveReplica { + #[arg(short, long, value_name = "REPLICA_ID")] + replica_id: NamespaceId, + }, + MoveFile { + #[arg(short, long, value_name = "REPLICA_ID")] + replica_id: NamespaceId, + #[arg(short, long, value_name = "OLD_PATH")] + old_path: PathBuf, + #[arg(short, long, value_name = "NEW_PATH")] + new_path: PathBuf, + }, + GetReplica { + #[arg(short, long, value_name = "REPLICA_ID")] + replica_id: NamespaceId, + }, +} + #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { - println!("Hello, world!"); + let cli = Cli::parse(); + let node = OkuFs::start().await?; + match cli.command { + Some(Commands::CreateReplica) => { + let replica_id = node.create_replica().await?; + println!("Created replica with ID: {}", replica_id); + } + Some(Commands::CreateFile { + replica_id, + path, + data, + }) => { + node.create_or_modify_file(replica_id, path.clone(), data) + .await?; + println!("Created file at {:?}", path); + } + Some(Commands::ListFiles { replica_id }) => { + let files = node.list_files(replica_id).await?; + for file in files { + println!("{:#?}", file); + } + } + Some(Commands::ListReplicas) => { + let replicas = node.list_replicas().await?; + for replica in replicas { + println!("{}", replica); + } + } + Some(Commands::GetFile { replica_id, path }) => { + let data = node.read_file(replica_id, path).await?; + println!("{}", String::from_utf8_lossy(&data)); + } + Some(Commands::RemoveFile { replica_id, path }) => { + node.delete_file(replica_id, path.clone()).await?; + println!("Removed file at {:?}", path); + } + Some(Commands::RemoveDirectory { replica_id, path }) => { + node.delete_directory(replica_id, path.clone()).await?; + println!("Removed directory at {:?}", path); + } + Some(Commands::RemoveReplica { replica_id }) => { + node.delete_replica(replica_id).await?; + println!("Removed replica with ID: {}", replica_id); + } + Some(Commands::MoveFile { + replica_id, + old_path, + new_path, + }) => { + node.move_file(replica_id, old_path.clone(), new_path.clone()) + .await?; + println!("Moved file from {:?} to {:?}", old_path, new_path); + } + Some(Commands::GetReplica { replica_id }) => { + node.get_external_replica(replica_id, true, true).await?; + let files = node.list_files(replica_id).await?; + for file in files { + println!("{:#?}", file); + } + } + None => { + println!("Node will listen for incoming connections."); + loop {} + } + } Ok(()) }