Skip to content

Commit

Permalink
feat(discovery): Begin implementing announcement
Browse files Browse the repository at this point in the history
  • Loading branch information
emmyoh committed Apr 22, 2024
1 parent 6060631 commit 3d23359
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 77 deletions.
17 changes: 17 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[doc.extern-map.registries]
crates-io = "https://docs.rs/"

[target.aarch64-apple-darwin]
rustflags = ["-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"]

[target.x86_64-apple-darwin]
rustflags = ["-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"]

[target.x86_64-pc-windows-gnu]
rustflags = ["-C", "link-arg=-lpsapi", "-C", "link-arg=-lbcrypt", "-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"]

[target.aarch64-unknown-linux-gnu]
rustflags = ["-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"]

[target.x86_64-unknown-linux-gnu]
rustflags = ["-C", "target-cpu=native", "-Z", "tune-cpu=native", "-C", "strip=symbols", "-Z", "unstable-options"]
69 changes: 69 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: Document and check codebase
on:
push:
branches: [ "master" ]
env:
CARGO_TERM_COLOR: always
jobs:
build_documentation:
name: Build documentation
runs-on: ubuntu-latest
steps:
- name: Setup Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
target: x86_64-unknown-linux-gnu
default: true
profile: default
- name: Checkout codebase
uses: actions/checkout@v4
- name: Generate documentation
run: time cargo doc --no-deps -Zrustdoc-map --all-features --release --quiet
- name: Fix permissions
run: |
chmod -c -R +rX "target/doc/" | while read line; do
echo "::warning title=Invalid file permissions automatically fixed::$line"
done
- name: Upload Pages artifact
uses: actions/upload-pages-artifact@v3
with:
path: "target/doc/"
deploy_documentation:
needs: build_documentation
name: Deploy documentation to GitHub Pages
permissions:
pages: write
id-token: write
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4
apply_suggestions:
name: Format code, apply compiler suggestions
runs-on: ubuntu-latest
steps:
- name: Checkout codebase
uses: actions/checkout@v4
- name: Setup Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: clippy, rustfmt
profile: minimal
- name: Format
run: cargo fmt
- name: Apply compiler suggestions
run: |
cargo fix --all-features --edition --edition-idioms
cargo clippy --fix -Z unstable-options
- name: Commit changes to code, if any
run: |
git config user.name github-actions
git config user.email [email protected]
git diff --quiet && git diff --staged --quiet || git commit -am "chore: Format and apply compiler suggestions."
git push
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,7 @@ Cargo.lock
**/*.rs.bk

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
*.pdb

## Oku
.oku
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async-trait = "0.1.79"
bincode = "1.3.3"
bytes = "1.6.0"
chrono = "0.4.37"
clap = { version = "4.5.4", features = ["derive"] }
derive_more = "0.99.17"
flume = "0.11.0"
futures = "0.3.30"
Expand Down
3 changes: 3 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "nightly"
targets = [ "aarch64-unknown-linux-gnu", "x86_64-unknown-linux-gnu", "aarch64-apple-darwin", "x86_64-apple-darwin", "x86_64-pc-windows-gnu" ]
130 changes: 90 additions & 40 deletions src/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,65 @@
use crate::error::OkuDiscoveryError;

use futures::StreamExt;
use iroh::{
bytes::{Hash, HashAndFormat},
net::NodeId,
sync::NamespaceId,
ticket::BlobTicket,
};
use iroh_mainline_content_discovery::protocol::{Query, QueryFlags};
use iroh_mainline_content_discovery::to_infohash;
use iroh_mainline_content_discovery::UdpDiscovery;
use iroh_mainline_content_discovery::announce_dht;
use std::{collections::BTreeSet, error::Error, str::FromStr, time::Duration};

/// The delay between republishing content to the mainline DHT.
pub const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);

/// The initial delay before publishing content to the mainline DHT.
pub const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);

/// The port used for communication between other Oku filesystem nodes.
pub const DISCOVERY_PORT: u16 = 4938;
pub const ANNOUNCE_PARALLELISM: usize = 10;

use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::{error::Error, str::FromStr};
// pub fn announce_dht(
// dht: mainline::dht::Dht,
// content: BTreeSet<HashAndFormat>,
// port: Option<u16>,
// announce_parallelism: usize,
// ) -> impl Stream<Item = (HashAndFormat, mainline::Result<mainline::StoreQueryMetdata>)> {
// let dht = dht.as_async();
// futures::stream::iter(content)
// .map(move |content| {
// let dht = dht.clone();
// async move {
// let info_hash = to_infohash(content);
// println!(
// "announcing content that corresponds to infohash {}",
// info_hash
// );
// let res = dht.announce_peer(info_hash, port).await;
// (content, res)
// }
// })
// .buffer_unordered(announce_parallelism)
// }

/// Announces a local replica to the mainline DHT.
///
/// # Arguments
///
/// * `namespace_id` - The ID of the replica to announce.
pub async fn announce_replica(namespace_id: NamespaceId) -> Result<(), Box<dyn Error>> {
let mut content = BTreeSet::new();
content.insert(HashAndFormat::raw(Hash::new(namespace_id.clone())));
let dht = mainline::Dht::default();
let announce_stream = announce_dht(dht, content, DISCOVERY_PORT, ANNOUNCE_PARALLELISM);
tokio::pin!(announce_stream);
while let Some((content, res)) = announce_stream.next().await {
match res {
Ok(_) => println!("announced {:?}", content),
Err(e) => eprintln!("error announcing {:?}: {:?}", content, e),
}
}
Ok(())
}

/*
The `ContentRequest` enum is derived from the `ContentArg` enum in the `iroh-examples` repository (https://github.com/n0-computer/iroh-examples/blob/6f184933efa72eec1d8cf2e8d07905650c0fdb46/content-discovery/iroh-mainline-content-discovery-cli/src/args.rs#L23).
Expand Down Expand Up @@ -38,6 +87,7 @@ impl ContentRequest {
},
}
}
/// Get the hash for this content request.
pub fn hash(&self) -> Hash {
match self {
ContentRequest::Hash(hash) => *hash,
Expand All @@ -62,38 +112,38 @@ impl FromStr for ContentRequest {
}
}

pub async fn query_dht(
content: ContentRequest,
partial: bool,
verified: bool,
udp_port: Option<u16>,
) -> Result<(), Box<dyn Error>> {
let _providers: Vec<NodeId> = Vec::new();
let bind_addr = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
udp_port.unwrap_or_default(),
));
let discovery = UdpDiscovery::new(bind_addr).await?;
let dht = mainline::Dht::default();
let q = Query {
content: content.hash_and_format(),
flags: QueryFlags {
complete: !partial,
verified: verified,
},
};
println!("content corresponds to infohash {}", to_infohash(q.content));
// pub async fn query_dht(
// content: ContentRequest,
// partial: bool,
// verified: bool,
// udp_port: Option<u16>,
// ) -> Result<(), Box<dyn Error>> {
// let _providers: Vec<NodeId> = Vec::new();
// let bind_addr = SocketAddr::V4(SocketAddrV4::new(
// Ipv4Addr::UNSPECIFIED,
// udp_port.unwrap_or_default(),
// ));
// let discovery = UdpDiscovery::new(bind_addr).await?;
// let dht = mainline::Dht::default();
// let q = Query {
// content: content.hash_and_format(),
// flags: QueryFlags {
// complete: !partial,
// verified: verified,
// },
// };
// println!("content corresponds to infohash {}", to_infohash(q.content));

let _stream = discovery.query_dht(dht, q).await?;
// while let Some(announce) = stream.next().await {
// if announce.verify().is_ok() {
// println!("found verified provider {}", announce.host);
// providers.push(announce.host);
// } else {
// println!("got wrong signed announce!");
// }
// }
// Ok(providers)
// let _stream = discovery.query_dht(dht, q).await?;
// // while let Some(announce) = stream.next().await {
// // if announce.verify().is_ok() {
// // println!("found verified provider {}", announce.host);
// // providers.push(announce.host);
// // } else {
// // println!("got wrong signed announce!");
// // }
// // }
// // Ok(providers)

Ok(())
}
// Ok(())
// }
7 changes: 6 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,22 @@ pub enum OkuFsError {
}

#[derive(Error, Debug, Diagnostic)]
/// Content discovery errors.
pub enum OkuDiscoveryError {
#[error("Invalid hash and format.")]
#[diagnostic(code(discovery::invalid_hash_and_format), url(docsrs))]
/// Invalid hash and format.
InvalidHashAndFormat,
#[error("Unable to discover node address for node ID.")]
#[diagnostic(code(discovery::node_address_discovery_failed), url(docsrs))]
/// Unable to discover node address for node ID.
NodeAddressDiscoveryFailed,
#[error("Unable to find nodes able to satisfy query")]
#[error("Unable to find nodes able to satisfy query.")]
#[diagnostic(code(discovery::no_nodes_found), url(docsrs))]
/// Unable to find nodes able to satisfy query.
NoNodesFound,
#[error("Unsupported protocol identifier: {0}")]
#[diagnostic(code(discovery::unsupported_alpn), url(docsrs))]
/// Unsupported protocol identifier.
UnsupportedALPN(String),
}
Loading

0 comments on commit 3d23359

Please sign in to comment.