diff --git a/Cargo.lock b/Cargo.lock index d8218b8cd6..d3f18730fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -141,9 +141,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.86" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" [[package]] name = "arc-swap" @@ -802,6 +802,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +[[package]] +name = "cordyceps" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec10f0a762d93c4498d2e97a333805cb6250d60bead623f71d8034f9a4152ba3" +dependencies = [ + "loom", + "tracing", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -927,7 +937,7 @@ dependencies = [ "bitflags 2.6.0", "crossterm_winapi", "libc", - "mio", + "mio 0.8.11", "parking_lot", "signal-hook", "signal-hook-mio", @@ -1611,10 +1621,11 @@ dependencies = [ [[package]] name = "futures-buffered" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fa130f3777d0d4b0993653c20bc433026d3290627693c4ed1b18dd237357ab" +checksum = "34acda8ae8b63fbe0b2195c998b180cff89a8212fb2622a78b572a9f1c6f7684" dependencies = [ + "cordyceps", "diatomic-waker", "futures-core", "pin-project-lite", @@ -1774,6 +1785,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows 0.48.0", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2446,6 +2470,7 @@ dependencies = [ "iroh-metrics", "iroh-net", "iroh-quinn", + "iroh-router", "iroh-test", "nested_enum_utils", "num_cpus", @@ -2932,6 +2957,22 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "iroh-router" +version = "0.27.0" +dependencies = [ + "anyhow", + "clap", + "futures-buffered", + "futures-lite 2.3.0", + "futures-util", + "iroh-net", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "iroh-test" version = "0.27.0" @@ -3086,6 +3127,19 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru" version = "0.12.3" @@ -3196,6 +3250,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "nanorand" version = "0.7.0" @@ -4808,6 +4874,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -5085,7 +5157,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" dependencies = [ "libc", - "mio", + "mio 0.8.11", "signal-hook", ] @@ -5611,28 +5683,27 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.1" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -6314,6 +6385,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows" version = "0.51.1" diff --git a/Cargo.toml b/Cargo.toml index 3365113182..6c80d6fe81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "iroh-test", "iroh-net/bench", "iroh-cli", + "iroh-router", ] resolver = "2" @@ -49,3 +50,4 @@ iroh-base = { path = "./iroh-base" } iroh-net = { path = "./iroh-net" } iroh-metrics = { path = "./iroh-metrics" } iroh-test = { path = "./iroh-test" } +iroh-router = { path = "./iroh-router" } diff --git a/iroh-router/Cargo.toml b/iroh-router/Cargo.toml new file mode 100644 index 0000000000..71201e565a --- /dev/null +++ b/iroh-router/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "iroh-router" +version = "0.27.0" +edition = "2021" +readme = "README.md" +description = "protocol router support for iroh" +license = "MIT OR Apache-2.0" +authors = ["dignifiedquire ", "n0 team"] +repository = "https://github.com/n0-computer/iroh" +keywords = ["quic", "networking", "holepunching", "p2p"] + + +[dependencies] +anyhow = "1.0.91" +futures-buffered = "0.2.9" +futures-lite = "2.3.0" +futures-util = "0.3.31" +iroh-net = { version = "0.27.0", path = "../iroh-net" } +tokio = "1.41.0" +tokio-util = "0.7.12" +tracing = "0.1.40" + +# Examples +clap = { version = "4", features = ["derive"], optional = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true } + +[lints] +workspace = true + + +[features] +default = [] +examples = ["dep:clap", "dep:tracing-subscriber"] + +[[example]] +name = "custom-protocol" +required-features = ["examples"] diff --git a/iroh-router/README.md b/iroh-router/README.md new file mode 100644 index 0000000000..54639b5f9f --- /dev/null +++ b/iroh-router/README.md @@ -0,0 +1,20 @@ +# iroh-router + +This crate contains the definitions for custom protocols for `iroh`. + +# License + +This project is licensed under either of + + * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or + http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or + http://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in this project by you, as defined in the Apache-2.0 license, +shall be dual licensed as above, without any additional terms or conditions. diff --git a/iroh-router/examples/custom-protocol.rs b/iroh-router/examples/custom-protocol.rs new file mode 100644 index 0000000000..b4bd6e8b23 --- /dev/null +++ b/iroh-router/examples/custom-protocol.rs @@ -0,0 +1,232 @@ +//! Example for adding a custom protocol. +//! +//! We are building a very simple custom protocol here. +//! +//! Our custom protocol allows querying the text stored on the other node. +//! +//! The example is contrived - we only use memory nodes, and our database is a hashmap in a mutex, +//! and our queries just match if the query string appears as-is. +//! +//! ## Usage +//! +//! In one terminal, run +//! +//! cargo run --example custom-protocol --features=examples -- listen "hello-world" "foo-bar" "hello-moon" +//! +//! This spawns an iroh endpoint with three blobs. It will print the node's node id. +//! +//! In another terminal, run +//! +//! cargo run --example custom-protocol --features=examples -- query hello +//! +//! Replace with the node id from above. This will connect to the listening node with our +//! custom protocol and query for the string `hello`. The listening node will return a number of how many +//! strings match the query. +//! +//! For this example, this will print: +//! +//! Found 2 matches +//! +//! That's it! Follow along in the code below, we added a bunch of comments to explain things. + +use std::{collections::BTreeSet, sync::Arc}; + +use anyhow::Result; +use clap::Parser; +use futures_lite::future::Boxed as BoxedFuture; +use iroh_net::{ + endpoint::{get_remote_node_id, Connecting}, + Endpoint, NodeId, +}; +use iroh_router::{ProtocolHandler, Router}; +use tokio::sync::Mutex; +use tracing_subscriber::{prelude::*, EnvFilter}; + +#[derive(Debug, Parser)] +pub struct Cli { + #[clap(subcommand)] + command: Command, +} + +#[derive(Debug, Parser)] +pub enum Command { + /// Spawn a node in listening mode. + Listen { + /// Each text string will be imported as a blob and inserted into the search database. + text: Vec, + }, + /// Query a remote node for data and print the results. + Query { + /// The node id of the node we want to query. + node_id: NodeId, + /// The text we want to match. + query: String, + }, +} + +/// Each custom protocol is identified by its ALPN string. +/// +/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake, +/// and the connection is aborted unless both nodes pass the same bytestring. +const ALPN: &[u8] = b"iroh-example/text-search/0"; + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); + let args = Cli::parse(); + + // Build an endpoint + let endpoint = Endpoint::builder().discovery_n0().bind().await?; + + // Build our custom protocol handler. The `builder` exposes access to various subsystems in the + // iroh node. In our case, we need a blobs client and the endpoint. + let proto = BlobSearch::new(endpoint.clone()); + + let builder = Router::builder(endpoint); + + // Add our protocol, identified by our ALPN, to the node, and spawn the node. + let router = builder.accept(ALPN.to_vec(), proto.clone()).spawn().await?; + + match args.command { + Command::Listen { text } => { + let node_id = router.endpoint().node_id(); + println!("our node id: {node_id}"); + + // Insert the text strings as blobs and index them. + for text in text.into_iter() { + proto.insert(text).await?; + } + + // Wait for Ctrl-C to be pressed. + tokio::signal::ctrl_c().await?; + } + Command::Query { node_id, query } => { + // Query the remote node. + // This will send the query over our custom protocol, read hashes on the reply stream, + // and download each hash over iroh-blobs. + let num_matches = proto.query_remote(node_id, &query).await?; + + // Print out our query results. + println!("Found {} matches", num_matches); + } + } + + router.shutdown().await?; + + Ok(()) +} + +#[derive(Debug, Clone)] +struct BlobSearch { + endpoint: Endpoint, + blobs: Arc>>, +} + +impl ProtocolHandler for BlobSearch { + /// The `accept` method is called for each incoming connection for our ALPN. + /// + /// The returned future runs on a newly spawned tokio task, so it can run as long as + /// the connection lasts. + fn accept(self: Arc, connecting: Connecting) -> BoxedFuture> { + // We have to return a boxed future from the handler. + Box::pin(async move { + // Wait for the connection to be fully established. + let connection = connecting.await?; + // We can get the remote's node id from the connection. + let node_id = get_remote_node_id(&connection)?; + println!("accepted connection from {node_id}"); + + // Our protocol is a simple request-response protocol, so we expect the + // connecting peer to open a single bi-directional stream. + let (mut send, mut recv) = connection.accept_bi().await?; + + // We read the query from the receive stream, while enforcing a max query length. + let query_bytes = recv.read_to_end(64).await?; + + // Now, we can perform the actual query on our local database. + let query = String::from_utf8(query_bytes)?; + let num_matches = self.query_local(&query).await; + + // We want to return a list of hashes. We do the simplest thing possible, and just send + // one hash after the other. Because the hashes have a fixed size of 32 bytes, this is + // very easy to parse on the other end. + send.write_all(&num_matches.to_le_bytes()).await?; + + // By calling `finish` on the send stream we signal that we will not send anything + // further, which makes the receive stream on the other end terminate. + send.finish()?; + + // Wait until the remote closes the connection, which it does once it + // received the response. + connection.closed().await; + + Ok(()) + }) + } +} + +impl BlobSearch { + /// Create a new protocol handler. + pub fn new(endpoint: Endpoint) -> Arc { + Arc::new(Self { + endpoint, + blobs: Default::default(), + }) + } + + /// Query a remote node, download all matching blobs and print the results. + pub async fn query_remote(&self, node_id: NodeId, query: &str) -> Result { + // Establish a connection to our node. + // We use the default node discovery in iroh, so we can connect by node id without + // providing further information. + let conn = self.endpoint.connect(node_id, ALPN).await?; + + // Open a bi-directional in our connection. + let (mut send, mut recv) = conn.open_bi().await?; + + // Send our query. + send.write_all(query.as_bytes()).await?; + + // Finish the send stream, signalling that no further data will be sent. + // This makes the `read_to_end` call on the accepting side terminate. + send.finish()?; + + // The response is a 64 bit integer + // We simply read it into a byte buffer. + let mut num_matches = [0u8; 8]; + + // Read 8 bytes from the stream. + recv.read_exact(&mut num_matches).await?; + + let num_matches = u64::from_le_bytes(num_matches); + + // Dropping the connection here will close it. + + Ok(num_matches) + } + + /// Query the local database. + /// + /// Returns how many matches were found. + pub async fn query_local(&self, query: &str) -> u64 { + let guard = self.blobs.lock().await; + let count: usize = guard.iter().filter(|text| text.contains(query)).count(); + count as u64 + } + + /// Insert a text string into the database. + pub async fn insert(&self, text: String) -> Result<()> { + let mut guard = self.blobs.lock().await; + guard.insert(text); + Ok(()) + } +} + +/// Set the RUST_LOG env var to one of {debug,info,warn} to see logging. +fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} diff --git a/iroh-router/src/lib.rs b/iroh-router/src/lib.rs new file mode 100644 index 0000000000..6bdd6cd565 --- /dev/null +++ b/iroh-router/src/lib.rs @@ -0,0 +1,5 @@ +mod protocol; +mod router; + +pub use protocol::{ProtocolHandler, ProtocolMap}; +pub use router::{Router, RouterBuilder}; diff --git a/iroh-router/src/protocol.rs b/iroh-router/src/protocol.rs new file mode 100644 index 0000000000..22affa67d9 --- /dev/null +++ b/iroh-router/src/protocol.rs @@ -0,0 +1,77 @@ +use std::{any::Any, collections::BTreeMap, sync::Arc}; + +use anyhow::Result; +use futures_buffered::join_all; +use futures_lite::future::Boxed as BoxedFuture; +use iroh_net::endpoint::Connecting; + +/// Handler for incoming connections. +/// +/// A router accepts connections for arbitrary ALPN protocols. +/// +/// With this trait, you can handle incoming connections for any protocol. +/// +/// Implement this trait on a struct that should handle incoming connections. +/// The protocol handler must then be registered on the node for an ALPN protocol with +/// [`crate::node::builder::ProtocolBuilder::accept`]. +pub trait ProtocolHandler: Send + Sync + IntoArcAny + std::fmt::Debug + 'static { + /// Handle an incoming connection. + /// + /// This runs on a freshly spawned tokio task so this can be long-running. + fn accept(self: Arc, conn: Connecting) -> BoxedFuture>; + + /// Called when the node shuts down. + fn shutdown(self: Arc) -> BoxedFuture<()> { + Box::pin(async move {}) + } +} + +/// Helper trait to facilite casting from `Arc` to `Arc`. +/// +/// This trait has a blanket implementation so there is no need to implement this yourself. +pub trait IntoArcAny { + fn into_arc_any(self: Arc) -> Arc; +} + +impl IntoArcAny for T { + fn into_arc_any(self: Arc) -> Arc { + self + } +} + +/// A typed map of protocol handlers, mapping them from ALPNs. +#[derive(Debug, Clone, Default)] +pub struct ProtocolMap(BTreeMap, Arc>); + +impl ProtocolMap { + /// Returns the registered protocol handler for an ALPN as a concrete type. + pub fn get_typed(&self, alpn: &[u8]) -> Option> { + let protocol: Arc = self.0.get(alpn)?.clone(); + let protocol_any: Arc = protocol.into_arc_any(); + let protocol_ref = Arc::downcast(protocol_any).ok()?; + Some(protocol_ref) + } + + /// Returns the registered protocol handler for an ALPN as a [`Arc`]. + pub fn get(&self, alpn: &[u8]) -> Option> { + self.0.get(alpn).cloned() + } + + /// Inserts a protocol handler. + pub fn insert(&mut self, alpn: Vec, handler: Arc) { + self.0.insert(alpn, handler); + } + + /// Returns an iterator of all registered ALPN protocol identifiers. + pub fn alpns(&self) -> impl Iterator> { + self.0.keys() + } + + /// Shuts down all protocol handlers. + /// + /// Calls and awaits [`ProtocolHandler::shutdown`] for all registered handlers concurrently. + pub async fn shutdown(&self) { + let handlers = self.0.values().cloned().map(ProtocolHandler::shutdown); + join_all(handlers).await; + } +} diff --git a/iroh-router/src/router.rs b/iroh-router/src/router.rs new file mode 100644 index 0000000000..bf4d290623 --- /dev/null +++ b/iroh-router/src/router.rs @@ -0,0 +1,211 @@ +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +use futures_util::{ + future::{MapErr, Shared}, + FutureExt, TryFutureExt, +}; +use iroh_net::Endpoint; +use tokio::task::{JoinError, JoinSet}; +use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; +use tracing::{debug, error, warn}; + +use crate::{ProtocolHandler, ProtocolMap}; + +#[derive(Clone, Debug)] +pub struct Router { + endpoint: Endpoint, + protocols: Arc, + // `Router` needs to be `Clone + Send`, and we need to `task.await` in its `shutdown()` impl. + // So we need + // - `Shared` so we can `task.await` from all `Node` clones + // - `MapErr` to map the `JoinError` to a `String`, because `JoinError` is `!Clone` + // - `AbortOnDropHandle` to make sure that the `task` is cancelled when all `Node`s are dropped + // (`Shared` acts like an `Arc` around its inner future). + task: Shared, JoinErrToStr>>, + cancel_token: CancellationToken, +} + +type JoinErrToStr = Box String + Send + Sync + 'static>; + +impl Router { + pub fn builder(endpoint: Endpoint) -> RouterBuilder { + RouterBuilder::new(endpoint) + } + + /// Returns a protocol handler for an ALPN. + /// + /// This downcasts to the concrete type and returns `None` if the handler registered for `alpn` + /// does not match the passed type. + pub fn get_protocol(&self, alpn: &[u8]) -> Option> { + self.protocols.get_typed(alpn) + } + + pub fn endpoint(&self) -> &Endpoint { + &self.endpoint + } + + pub async fn shutdown(self) -> Result<()> { + // Trigger shutdown of the main run task by activating the cancel token. + self.cancel_token.cancel(); + + // Wait for the main task to terminate. + self.task.await.map_err(|err| anyhow!(err))?; + + Ok(()) + } +} + +#[derive(Debug)] +pub struct RouterBuilder { + endpoint: Endpoint, + protocols: ProtocolMap, +} + +impl RouterBuilder { + pub fn new(endpoint: Endpoint) -> Self { + Self { + endpoint, + protocols: ProtocolMap::default(), + } + } + + pub fn accept(mut self, alpn: Vec, handler: Arc) -> Self { + self.protocols.insert(alpn, handler); + self + } + + /// Returns the [`Endpoint`] of the node. + pub fn endpoint(&self) -> &Endpoint { + &self.endpoint + } + + /// Returns a protocol handler for an ALPN. + /// + /// This downcasts to the concrete type and returns `None` if the handler registered for `alpn` + /// does not match the passed type. + pub fn get_protocol(&self, alpn: &[u8]) -> Option> { + self.protocols.get_typed(alpn) + } + + pub async fn spawn(self) -> Result { + // Update the endpoint with our alpns. + let alpns = self + .protocols + .alpns() + .map(|alpn| alpn.to_vec()) + .collect::>(); + + let protocols = Arc::new(self.protocols); + if let Err(err) = self.endpoint.set_alpns(alpns) { + shutdown(&self.endpoint, protocols.clone()).await; + return Err(err); + } + + let mut join_set = JoinSet::new(); + let endpoint = self.endpoint.clone(); + let protos = protocols.clone(); + let cancel = CancellationToken::new(); + let cancel_token = cancel.clone(); + + let run_loop_fut = async move { + let protocols = protos; + loop { + tokio::select! { + biased; + _ = cancel_token.cancelled() => { + break; + }, + // handle incoming p2p connections. + Some(incoming) = endpoint.accept() => { + let protocols = protocols.clone(); + join_set.spawn(async move { + handle_connection(incoming, protocols).await; + anyhow::Ok(()) + }); + }, + // handle task terminations and quit on panics. + res = join_set.join_next(), if !join_set.is_empty() => { + match res { + Some(Err(outer)) => { + if outer.is_panic() { + error!("Task panicked: {outer:?}"); + break; + } else if outer.is_cancelled() { + debug!("Task cancelled: {outer:?}"); + } else { + error!("Task failed: {outer:?}"); + break; + } + } + Some(Ok(Err(inner))) => { + debug!("Task errored: {inner:?}"); + } + _ => {} + } + }, + else => break, + } + } + + shutdown(&endpoint, protocols).await; + + // Abort remaining tasks. + tracing::info!("Shutting down remaining tasks"); + join_set.shutdown().await; + }; + let task = tokio::task::spawn(run_loop_fut); + let task = AbortOnDropHandle::new(task) + .map_err(Box::new(|e: JoinError| e.to_string()) as JoinErrToStr) + .shared(); + + Ok(Router { + endpoint: self.endpoint, + protocols, + task, + cancel_token: cancel, + }) + } +} + +/// Shutdown the different parts of the router concurrently. +async fn shutdown(endpoint: &Endpoint, protocols: Arc) { + let error_code = 1u16; + + // We ignore all errors during shutdown. + let _ = tokio::join!( + // Close the endpoint. + // Closing the Endpoint is the equivalent of calling Connection::close on all + // connections: Operations will immediately fail with ConnectionError::LocallyClosed. + // All streams are interrupted, this is not graceful. + endpoint + .clone() + .close(error_code.into(), b"provider terminating"), + // Shutdown protocol handlers. + protocols.shutdown(), + ); +} + +async fn handle_connection(incoming: iroh_net::endpoint::Incoming, protocols: Arc) { + let mut connecting = match incoming.accept() { + Ok(conn) => conn, + Err(err) => { + warn!("Ignoring connection: accepting failed: {err:#}"); + return; + } + }; + let alpn = match connecting.alpn().await { + Ok(alpn) => alpn, + Err(err) => { + warn!("Ignoring connection: invalid handshake: {err:#}"); + return; + } + }; + let Some(handler) = protocols.get(&alpn) else { + warn!("Ignoring connection: unsupported ALPN protocol"); + return; + }; + if let Err(err) = handler.accept(connecting).await { + warn!("Handling incoming connection ended with error: {err}"); + } +} diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 401f8cf700..05b22e193c 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -31,6 +31,7 @@ iroh-base = { version = "0.27.0", features = ["key"] } iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.27.0", optional = true } iroh-net = { version = "0.27.0", features = ["discovery-local-network"] } +iroh-router = { version = "0.27.0" } nested_enum_utils = "0.1.0" num_cpus = { version = "1.15.0" } portable-atomic = "1" diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index f5c224e148..c6aeae5a5f 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -50,7 +50,7 @@ use iroh::{ endpoint::{get_remote_node_id, Connecting}, Endpoint, NodeId, }, - node::ProtocolHandler, + router::ProtocolHandler, }; use tracing_subscriber::{prelude::*, EnvFilter}; diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 90f372c3ba..d60f62f851 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -102,6 +102,8 @@ pub use iroh_docs as docs; pub use iroh_gossip as gossip; #[doc(inline)] pub use iroh_net as net; +#[doc(inline)] +pub use iroh_router as router; pub mod client; pub mod node; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 3209f8b9fa..73767fe24b 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -50,7 +50,6 @@ use futures_lite::StreamExt; use futures_util::future::{MapErr, Shared}; use iroh_base::key::PublicKey; use iroh_blobs::{ - protocol::Closed, store::Store as BaoStore, util::local_pool::{LocalPool, LocalPoolHandle}, }; @@ -59,16 +58,14 @@ use iroh_net::{ endpoint::{DirectAddrsStream, RemoteInfo}, AddrInfo, Endpoint, NodeAddr, }; +use iroh_router::{ProtocolHandler, Router}; use protocol::blobs::BlobsProtocol; use quic_rpc::{transport::ServerEndpoint as _, RpcServer}; use tokio::task::{JoinError, JoinSet}; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; -use crate::node::{ - nodes_storage::store_node_addrs, - protocol::{docs::DocsProtocol, ProtocolMap}, -}; +use crate::node::{nodes_storage::store_node_addrs, protocol::docs::DocsProtocol}; mod builder; mod nodes_storage; @@ -76,8 +73,6 @@ mod protocol; mod rpc; mod rpc_status; -pub use protocol::ProtocolHandler; - pub use self::{ builder::{ Builder, DiscoveryConfig, DocsStorage, GcPolicy, ProtocolBuilder, StorageConfig, @@ -117,7 +112,7 @@ pub struct Node { // - `AbortOnDropHandle` to make sure that the `task` is cancelled when all `Node`s are dropped // (`Shared` acts like an `Arc` around its inner future). task: Shared, JoinErrToStr>>, - protocols: Arc, + router: Router, } pub(crate) type JoinErrToStr = Box String + Send + Sync + 'static>; @@ -246,7 +241,7 @@ impl Node { /// This downcasts to the concrete type and returns `None` if the handler registered for `alpn` /// does not match the passed type. pub fn get_protocol(&self, alpn: &[u8]) -> Option> { - self.protocols.get_typed(alpn) + self.router.get_protocol(alpn) } } @@ -274,7 +269,7 @@ impl NodeInner { self: Arc, external_rpc: IrohServerEndpoint, internal_rpc: IrohServerEndpoint, - protocols: Arc, + router: Router, gc_policy: GcPolicy, gc_done_callback: Option>, nodes_data_path: Option, @@ -296,11 +291,11 @@ impl NodeInner { // Spawn a task for the garbage collection. if let GcPolicy::Interval(gc_period) = gc_policy { - let protocols = protocols.clone(); + let router = router.clone(); let handle = local_pool.spawn(move || async move { - let docs_engine = protocols.get_typed::(DOCS_ALPN); - let blobs = protocols - .get_typed::>(iroh_blobs::protocol::ALPN) + let docs_engine = router.get_protocol::(DOCS_ALPN); + let blobs = router + .get_protocol::>(iroh_blobs::protocol::ALPN) .expect("missing blobs"); blobs @@ -400,7 +395,7 @@ impl NodeInner { request = external_rpc.accept() => { match request { Ok(accepting) => { - rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting, protocols.clone()); + rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting, router.clone()); } Err(e) => { info!("rpc request error: {:?}", e); @@ -411,21 +406,13 @@ impl NodeInner { request = internal_rpc.accept() => { match request { Ok(accepting) => { - rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting, protocols.clone()); + rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting, router.clone()); } Err(e) => { info!("internal rpc request error: {:?}", e); } } }, - // handle incoming p2p connections. - Some(incoming) = self.endpoint.accept() => { - let protocols = protocols.clone(); - join_set.spawn(async move { - handle_connection(incoming, protocols).await; - Ok(()) - }); - }, // handle task terminations and quit on panics. res = join_set.join_next(), if !join_set.is_empty() => { match res { @@ -450,7 +437,9 @@ impl NodeInner { } } - self.shutdown(protocols).await; + if let Err(err) = router.shutdown().await { + tracing::warn!("Error when shutting down router: {:?}", err); + }; // Abort remaining tasks. join_set.shutdown().await; @@ -460,48 +449,6 @@ impl NodeInner { tracing::info!("Shutting down local pool"); local_pool.shutdown().await; } - - /// Shutdown the different parts of the node concurrently. - async fn shutdown(&self, protocols: Arc) { - let error_code = Closed::ProviderTerminating; - - // We ignore all errors during shutdown. - let _ = tokio::join!( - // Close the endpoint. - // Closing the Endpoint is the equivalent of calling Connection::close on all - // connections: Operations will immediately fail with ConnectionError::LocallyClosed. - // All streams are interrupted, this is not graceful. - self.endpoint - .clone() - .close(error_code.into(), error_code.reason()), - // Shutdown protocol handlers. - protocols.shutdown(), - ); - } -} - -async fn handle_connection(incoming: iroh_net::endpoint::Incoming, protocols: Arc) { - let mut connecting = match incoming.accept() { - Ok(conn) => conn, - Err(err) => { - warn!("Ignoring connection: accepting failed: {err:#}"); - return; - } - }; - let alpn = match connecting.alpn().await { - Ok(alpn) => alpn, - Err(err) => { - warn!("Ignoring connection: invalid handshake: {err:#}"); - return; - } - }; - let Some(handler) = protocols.get(&alpn) else { - warn!("Ignoring connection: unsupported ALPN protocol"); - return; - }; - if let Err(err) = handler.accept(connecting).await { - warn!("Handling incoming connection ended with error: {err}"); - } } fn node_addresses_for_storage(ep: &Endpoint) -> Vec { diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 9ed124c0ac..a431084f0b 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -26,19 +26,22 @@ use iroh_net::{ relay::{force_staging_infra, RelayMode}, Endpoint, }; +use iroh_router::{ProtocolHandler, RouterBuilder}; use quic_rpc::transport::{boxed::BoxableServerEndpoint, quinn::QuinnServerEndpoint}; use serde::{Deserialize, Serialize}; use tokio::task::JoinError; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; use tracing::{debug, error_span, trace, Instrument}; -use super::{rpc_status::RpcStatus, IrohServerEndpoint, JoinErrToStr, Node, NodeInner}; +use super::{ + protocol::gossip::GossipProtocol, rpc_status::RpcStatus, IrohServerEndpoint, JoinErrToStr, + Node, NodeInner, +}; use crate::{ client::RPC_ALPN, node::{ nodes_storage::load_node_addrs, - protocol::{blobs::BlobsProtocol, docs::DocsProtocol, ProtocolMap}, - ProtocolHandler, + protocol::{blobs::BlobsProtocol, docs::DocsProtocol}, }, rpc_protocol::RpcService, util::{fs::load_secret_key, path::IrohPaths}, @@ -674,7 +677,7 @@ where let inner = Arc::new(NodeInner { rpc_addr: self.rpc_addr, db: Default::default(), - endpoint, + endpoint: endpoint.clone(), client, cancel_token: CancellationToken::new(), local_pool_handle: lp.handle().clone(), @@ -682,7 +685,7 @@ where let protocol_builder = ProtocolBuilder { inner, - protocols: Default::default(), + router: RouterBuilder::new(endpoint), internal_rpc, external_rpc: self.rpc_endpoint, gc_policy: self.gc_policy, @@ -716,7 +719,7 @@ pub struct ProtocolBuilder { inner: Arc>, internal_rpc: IrohServerEndpoint, external_rpc: IrohServerEndpoint, - protocols: ProtocolMap, + router: RouterBuilder, #[debug("callback")] gc_done_callback: Option>, gc_policy: GcPolicy, @@ -738,7 +741,7 @@ impl ProtocolBuilder { /// # use std::sync::Arc; /// # use anyhow::Result; /// # use futures_lite::future::Boxed as BoxedFuture; - /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::Iroh}; + /// # use iroh::{node::{Node}, net::endpoint::Connecting, client::Iroh, router::ProtocolHandler}; /// # /// # #[tokio::main] /// # async fn main() -> Result<()> { @@ -774,7 +777,7 @@ impl ProtocolBuilder { /// /// pub fn accept(mut self, alpn: Vec, handler: Arc) -> Self { - self.protocols.insert(alpn, handler); + self.router = self.router.accept(alpn, handler); self } @@ -801,7 +804,7 @@ impl ProtocolBuilder { /// This downcasts to the concrete type and returns `None` if the handler registered for `alpn` /// does not match the passed type. pub fn get_protocol(&self, alpn: &[u8]) -> Option> { - self.protocols.get_typed(alpn) + self.router.get_protocol::

(alpn) } /// Registers the core iroh protocols (blobs, gossip, docs). @@ -823,7 +826,7 @@ impl ProtocolBuilder { self = self.accept(iroh_blobs::protocol::ALPN.to_vec(), Arc::new(blobs_proto)); // Register gossip. - self = self.accept(GOSSIP_ALPN.to_vec(), Arc::new(gossip)); + self = self.accept(GOSSIP_ALPN.to_vec(), Arc::new(GossipProtocol(gossip))); // Register docs, if enabled. if let Some(docs) = docs { @@ -839,24 +842,15 @@ impl ProtocolBuilder { inner, internal_rpc, external_rpc, - protocols, + router, gc_done_callback, gc_policy, nodes_data_path, local_pool: rt, } = self; - let protocols = Arc::new(protocols); let node_id = inner.endpoint.node_id(); - // Update the endpoint with our alpns. - let alpns = protocols - .alpns() - .map(|alpn| alpn.to_vec()) - .collect::>(); - if let Err(err) = inner.endpoint.set_alpns(alpns) { - inner.shutdown(protocols).await; - return Err(err); - } + let router = router.spawn().await?; // Spawn the main task and store it in the node for structured termination in shutdown. let fut = inner @@ -864,7 +858,7 @@ impl ProtocolBuilder { .run( external_rpc, internal_rpc, - protocols.clone(), + router.clone(), gc_policy, gc_done_callback, nodes_data_path, @@ -875,7 +869,7 @@ impl ProtocolBuilder { let node = Node { inner, - protocols, + router, task: AbortOnDropHandle::new(task) .map_err(Box::new(|e: JoinError| e.to_string()) as JoinErrToStr) .shared(), diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 2669d9ba1e..1744a49b41 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -1,81 +1,3 @@ -use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; - -use anyhow::Result; -use futures_lite::future::Boxed as BoxedFuture; -use futures_util::future::join_all; -use iroh_net::endpoint::Connecting; - pub(crate) mod blobs; pub(crate) mod docs; pub(crate) mod gossip; - -/// Handler for incoming connections. -/// -/// An iroh node can accept connections for arbitrary ALPN protocols. By default, the iroh node -/// only accepts connections for the ALPNs of the core iroh protocols (blobs, gossip, docs). -/// -/// With this trait, you can handle incoming connections for custom protocols. -/// -/// Implement this trait on a struct that should handle incoming connections. -/// The protocol handler must then be registered on the node for an ALPN protocol with -/// [`crate::node::builder::ProtocolBuilder::accept`]. -pub trait ProtocolHandler: Send + Sync + IntoArcAny + fmt::Debug + 'static { - /// Handle an incoming connection. - /// - /// This runs on a freshly spawned tokio task so this can be long-running. - fn accept(self: Arc, conn: Connecting) -> BoxedFuture>; - - /// Called when the node shuts down. - fn shutdown(self: Arc) -> BoxedFuture<()> { - Box::pin(async move {}) - } -} - -/// Helper trait to facilite casting from `Arc` to `Arc`. -/// -/// This trait has a blanket implementation so there is no need to implement this yourself. -pub trait IntoArcAny { - fn into_arc_any(self: Arc) -> Arc; -} - -impl IntoArcAny for T { - fn into_arc_any(self: Arc) -> Arc { - self - } -} - -#[derive(Debug, Clone, Default)] -pub(super) struct ProtocolMap(BTreeMap, Arc>); - -impl ProtocolMap { - /// Returns the registered protocol handler for an ALPN as a concrete type. - pub(super) fn get_typed(&self, alpn: &[u8]) -> Option> { - let protocol: Arc = self.0.get(alpn)?.clone(); - let protocol_any: Arc = protocol.into_arc_any(); - let protocol_ref = Arc::downcast(protocol_any).ok()?; - Some(protocol_ref) - } - - /// Returns the registered protocol handler for an ALPN as a [`Arc`]. - pub(super) fn get(&self, alpn: &[u8]) -> Option> { - self.0.get(alpn).cloned() - } - - /// Inserts a protocol handler. - pub(super) fn insert(&mut self, alpn: Vec, handler: Arc) { - self.0.insert(alpn, handler); - } - - /// Returns an iterator of all registered ALPN protocol identifiers. - pub(super) fn alpns(&self) -> impl Iterator> { - self.0.keys() - } - - /// Shuts down all protocol handlers. - /// - /// Calls and awaits [`ProtocolHandler::shutdown`] for all registered handlers concurrently. - pub(super) async fn shutdown(&self) { - let handlers = self.0.values().cloned().map(ProtocolHandler::shutdown); - join_all(handlers).await; - } -} diff --git a/iroh/src/node/protocol/blobs.rs b/iroh/src/node/protocol/blobs.rs index f385c8361f..9cca5f14c3 100644 --- a/iroh/src/node/protocol/blobs.rs +++ b/iroh/src/node/protocol/blobs.rs @@ -17,9 +17,9 @@ use iroh_blobs::{ HashAndFormat, TempTag, }; use iroh_net::{endpoint::Connecting, Endpoint, NodeAddr}; +use iroh_router::ProtocolHandler; use tracing::{debug, warn}; -use super::ProtocolHandler; use crate::{ client::blobs::DownloadMode, rpc_protocol::blobs::{BatchId, DownloadRequest as BlobDownloadRequest}, diff --git a/iroh/src/node/protocol/docs.rs b/iroh/src/node/protocol/docs.rs index 597e5ee864..07884f3e99 100644 --- a/iroh/src/node/protocol/docs.rs +++ b/iroh/src/node/protocol/docs.rs @@ -6,8 +6,9 @@ use iroh_blobs::downloader::Downloader; use iroh_docs::engine::{DefaultAuthorStorage, Engine}; use iroh_gossip::net::Gossip; use iroh_net::{endpoint::Connecting, Endpoint}; +use iroh_router::ProtocolHandler; -use crate::node::{DocsStorage, ProtocolHandler}; +use crate::node::DocsStorage; /// Wrapper around [`Engine`] so that we can implement our RPC methods directly. #[derive(Debug, Clone)] diff --git a/iroh/src/node/protocol/gossip.rs b/iroh/src/node/protocol/gossip.rs index 980a9868bb..f1f907fd35 100644 --- a/iroh/src/node/protocol/gossip.rs +++ b/iroh/src/node/protocol/gossip.rs @@ -3,10 +3,21 @@ use std::sync::Arc; use anyhow::Result; use futures_lite::future::Boxed as BoxedFuture; use iroh_net::endpoint::Connecting; +use iroh_router::ProtocolHandler; -use super::ProtocolHandler; +/// [`ProtocolHandler`] implementation for `iroh_gossip`. +#[derive(Debug)] +pub(crate) struct GossipProtocol(pub(crate) iroh_gossip::net::Gossip); -impl ProtocolHandler for iroh_gossip::net::Gossip { +impl std::ops::Deref for GossipProtocol { + type Target = iroh_gossip::net::Gossip; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ProtocolHandler for GossipProtocol { fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { Box::pin(async move { self.handle_connection(conn.await?).await }) } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index cb8b346b0a..868b168e7b 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -28,15 +28,16 @@ use iroh_blobs::{ BlobFormat, HashAndFormat, Tag, }; use iroh_docs::net::DOCS_ALPN; -use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; +use iroh_gossip::net::GOSSIP_ALPN; use iroh_io::AsyncSliceReader; use iroh_net::{relay::RelayUrl, NodeAddr, NodeId}; +use iroh_router::Router; use quic_rpc::server::{RpcChannel, RpcServerError}; use tokio::task::JoinSet; use tokio_util::either::Either; use tracing::{debug, info, warn}; -use super::{protocol::ProtocolMap, IrohServerEndpoint}; +use super::IrohServerEndpoint; use crate::{ client::{ blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption}, @@ -44,14 +45,14 @@ use crate::{ NodeStatus, }, node::{ - protocol::{blobs::BlobsProtocol, docs::DocsProtocol}, + protocol::{blobs::BlobsProtocol, docs::DocsProtocol, gossip::GossipProtocol}, NodeInner, }, rpc_protocol::{ - authors, blobs, + authors, blobs::{ - AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate, - BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, + self, AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, + AddStreamUpdate, BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, @@ -63,16 +64,14 @@ use crate::{ ExportFileRequest, ExportFileResponse, ImportFileRequest, ImportFileResponse, Request as DocsRequest, SetHashRequest, }, - gossip, net, + gossip, net::{ - AddAddrRequest, AddrRequest, IdRequest, NodeWatchRequest, RelayRequest, + self, AddAddrRequest, AddrRequest, IdRequest, NodeWatchRequest, RelayRequest, RemoteInfoRequest, RemoteInfoResponse, RemoteInfosIterRequest, RemoteInfosIterResponse, WatchResponse, }, - node, - node::{ShutdownRequest, StatsRequest, StatsResponse, StatusRequest}, - tags, - tags::{DeleteRequest as TagDeleteRequest, ListRequest as ListTagsRequest, SyncMode}, + node::{self, ShutdownRequest, StatsRequest, StatsResponse, StatusRequest}, + tags::{self, DeleteRequest as TagDeleteRequest, ListRequest as ListTagsRequest, SyncMode}, Request, RpcService, }, }; @@ -88,23 +87,23 @@ const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; #[derive(Debug, Clone)] pub(crate) struct Handler { pub(crate) inner: Arc>, - pub(crate) protocols: Arc, + pub(crate) router: Router, } impl Handler { - pub fn new(inner: Arc>, protocols: Arc) -> Self { - Self { inner, protocols } + pub fn new(inner: Arc>, router: Router) -> Self { + Self { inner, router } } } impl Handler { fn docs(&self) -> Option> { - self.protocols.get_typed::(DOCS_ALPN) + self.router.get_protocol::(DOCS_ALPN) } fn blobs(&self) -> Arc> { - self.protocols - .get_typed::>(iroh_blobs::protocol::ALPN) + self.router + .get_protocol::>(iroh_blobs::protocol::ALPN) .expect("missing blobs") } @@ -142,9 +141,9 @@ impl Handler { inner: Arc>, join_set: &mut JoinSet>, accepting: quic_rpc::server::Accepting, - protocols: Arc, + router: Router, ) { - let handler = Self::new(inner, protocols); + let handler = Self::new(inner, router); join_set.spawn(async move { let (msg, chan) = accepting.read_first().await?; if let Err(err) = handler.handle_rpc_request(msg, chan).await { @@ -255,8 +254,8 @@ impl Handler { Subscribe(msg) => { chan.bidi_streaming(msg, self, |handler, req, updates| { let stream = handler - .protocols - .get_typed::(GOSSIP_ALPN) + .router + .get_protocol::(GOSSIP_ALPN) .expect("missing gossip") .join_with_stream( req.topic, @@ -780,8 +779,8 @@ impl Handler { let progress = AsyncChannelProgressSender::new(sender); let blobs_protocol = self - .protocols - .get_typed::>(iroh_blobs::protocol::ALPN) + .router + .get_protocol::>(iroh_blobs::protocol::ALPN) .expect("missing blobs"); self.local_pool_handle().spawn_detached(move || async move {