From d743f299af701d4195398e478aa4e42f5cab2738 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 15:31:00 +0200 Subject: [PATCH 1/7] Add a way to get the authors client --- src/engine.rs | 6 +++- src/rpc.rs | 80 +++++++++++++++++++++++++++++++++++++++++- src/rpc/client/docs.rs | 7 +++- 3 files changed, 90 insertions(+), 3 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index b9727e2..e596fc1 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -6,7 +6,7 @@ use std::{ io, path::PathBuf, str::FromStr, - sync::{Arc, RwLock}, + sync::{Arc, OnceLock, RwLock}, }; use anyhow::{bail, Context, Result}; @@ -56,6 +56,8 @@ pub struct Engine { content_status_cb: ContentStatusCallback, local_pool_handle: LocalPoolHandle, blob_store: D, + #[cfg(feature = "rpc")] + pub(crate) rpc_handler: Arc>, } impl Engine { @@ -118,6 +120,8 @@ impl Engine { default_author: Arc::new(default_author), local_pool_handle, blob_store: bao_store, + #[cfg(feature = "rpc")] + rpc_handler: Default::default(), }) } diff --git a/src/rpc.rs b/src/rpc.rs index 8e196f9..c836a02 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,7 +1,14 @@ //! Quic RPC implementation for docs. use proto::RpcService; -use quic_rpc::server::{ChannelTypes, RpcChannel}; +use quic_rpc::{ + server::{ChannelTypes, RpcChannel}, + transport::flume::FlumeConnector, + RpcClient, RpcServer, +}; +use tokio::task::JoinSet; +use tokio_util::task::AbortOnDropHandle; +use tracing::{error, warn}; use crate::engine::Engine; @@ -14,6 +21,18 @@ type RpcError = serde_error::Error; type RpcResult = std::result::Result; impl Engine { + /// Get an in memory client to interact with the docs engine. + pub fn client( + &self, + ) -> &crate::rpc::client::docs::Client< + FlumeConnector, + > { + &self + .rpc_handler + .get_or_init(|| RpcHandler::new(self)) + .client + } + /// Handle a docs request from the RPC server. pub async fn handle_rpc_request>( &self, @@ -65,3 +84,62 @@ impl Engine { } } } + +#[derive(Debug)] +pub(crate) struct RpcHandler { + /// Client to hand out + client: crate::rpc::client::docs::Client< + FlumeConnector, + >, + /// Handler task + _handler: AbortOnDropHandle<()>, +} + +impl RpcHandler { + fn new(engine: &Engine) -> Self { + let engine = engine.clone(); + let (listener, connector) = quic_rpc::transport::flume::channel(1); + let listener = RpcServer::new(listener); + let client = crate::rpc::client::docs::Client::new(RpcClient::new(connector)); + let task = tokio::spawn(async move { + let mut tasks = JoinSet::new(); + loop { + tokio::select! { + Some(res) = tasks.join_next(), if !tasks.is_empty() => { + if let Err(e) = res { + if e.is_panic() { + error!("Panic handling RPC request: {e}"); + } + } + } + req = listener.accept() => { + let req = match req { + Ok(req) => req, + Err(e) => { + warn!("Error accepting RPC request: {e}"); + continue; + } + }; + let engine = engine.clone(); + tasks.spawn(async move { + let (req, client) = match req.read_first().await { + Ok((req, client)) => (req, client), + Err(e) => { + warn!("Error reading first message: {e}"); + return; + } + }; + if let Err(cause) = engine.handle_rpc_request(req, client).await { + warn!("Error handling RPC request: {:?}", cause); + } + }); + } + } + } + }); + Self { + client, + _handler: AbortOnDropHandle::new(task), + } + } +} diff --git a/src/rpc/client/docs.rs b/src/rpc/client/docs.rs index 540defe..f27bfdb 100644 --- a/src/rpc/client/docs.rs +++ b/src/rpc/client/docs.rs @@ -19,7 +19,7 @@ use portable_atomic::{AtomicBool, Ordering}; use quic_rpc::{client::BoxedConnector, message::RpcMsg, Connector}; use serde::{Deserialize, Serialize}; -use super::flatten; +use super::{authors, flatten}; use crate::{ actor::OpenState, rpc::proto::{ @@ -50,6 +50,11 @@ impl> Client { Self { rpc } } + /// Returns an authors client. + pub fn authors(&self) -> authors::Client { + authors::Client::new(self.rpc.clone()) + } + /// Creates a client. pub async fn create(&self) -> Result> { let res = self.rpc.rpc(CreateRequest {}).await??; From 169789cdb36fb1f9b52b1df2e69bd9f5eee583b3 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 15:48:59 +0200 Subject: [PATCH 2/7] Prevent unused import without rpc feature --- src/engine.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index e596fc1..d930e01 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -6,7 +6,7 @@ use std::{ io, path::PathBuf, str::FromStr, - sync::{Arc, OnceLock, RwLock}, + sync::{Arc, RwLock}, }; use anyhow::{bail, Context, Result}; @@ -57,7 +57,7 @@ pub struct Engine { local_pool_handle: LocalPoolHandle, blob_store: D, #[cfg(feature = "rpc")] - pub(crate) rpc_handler: Arc>, + pub(crate) rpc_handler: Arc>, } impl Engine { From bf58139c9df4270a5ab6ea9d968169beb959e984 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 16:09:16 +0200 Subject: [PATCH 3/7] add repr transparent --- src/rpc/client/authors.rs | 1 + src/rpc/client/docs.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/rpc/client/authors.rs b/src/rpc/client/authors.rs index 3529abb..18a154f 100644 --- a/src/rpc/client/authors.rs +++ b/src/rpc/client/authors.rs @@ -19,6 +19,7 @@ use crate::{ /// Iroh docs client. #[derive(Debug, Clone)] +#[repr(transparent)] pub struct Client> { pub(super) rpc: quic_rpc::RpcClient, } diff --git a/src/rpc/client/docs.rs b/src/rpc/client/docs.rs index f27bfdb..a6e5d00 100644 --- a/src/rpc/client/docs.rs +++ b/src/rpc/client/docs.rs @@ -40,6 +40,7 @@ pub use crate::{ /// Iroh docs client. #[derive(Debug, Clone)] +#[repr(transparent)] pub struct Client> { pub(super) rpc: quic_rpc::RpcClient, } From 8cfd2f729ba21b8dba8bc8ca850c90e6511a4dbc Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 18:27:25 +0200 Subject: [PATCH 4/7] use spawn_accept_loop --- Cargo.lock | 4 ++-- Cargo.toml | 1 + src/rpc.rs | 50 +++++--------------------------------------------- 3 files changed, 8 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ffa716e..081b196 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3394,8 +3394,7 @@ dependencies = [ [[package]] name = "quic-rpc" version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e131f594054d27d077162815db3b5e9ddd76a28fbb9091b68095971e75c286" +source = "git+https://github.com/n0-computer/quic-rpc?branch=main#32d5bc1a08609f4f0b5650980088f07d81971a55" dependencies = [ "anyhow", "derive_more", @@ -3409,6 +3408,7 @@ dependencies = [ "serde", "slab", "tokio", + "tokio-util", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 12edbf0..29597c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,3 +105,4 @@ iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" } iroh-gossip = { git = "https://github.com/n0-computer/iroh-gossip", branch = "main" } +quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" } diff --git a/src/rpc.rs b/src/rpc.rs index c836a02..1b01e60 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -6,9 +6,7 @@ use quic_rpc::{ transport::flume::FlumeConnector, RpcClient, RpcServer, }; -use tokio::task::JoinSet; use tokio_util::task::AbortOnDropHandle; -use tracing::{error, warn}; use crate::engine::Engine; @@ -35,13 +33,12 @@ impl Engine { /// Handle a docs request from the RPC server. pub async fn handle_rpc_request>( - &self, + self, msg: crate::rpc::proto::Request, chan: RpcChannel, ) -> Result<(), quic_rpc::server::RpcServerError> { use crate::rpc::proto::Request::*; - - let this = self.clone(); + let this = self; match msg { Open(msg) => chan.rpc(msg, this, Self::doc_open).await, Close(msg) => chan.rpc(msg, this, Self::doc_close).await, @@ -101,45 +98,8 @@ impl RpcHandler { let (listener, connector) = quic_rpc::transport::flume::channel(1); let listener = RpcServer::new(listener); let client = crate::rpc::client::docs::Client::new(RpcClient::new(connector)); - let task = tokio::spawn(async move { - let mut tasks = JoinSet::new(); - loop { - tokio::select! { - Some(res) = tasks.join_next(), if !tasks.is_empty() => { - if let Err(e) = res { - if e.is_panic() { - error!("Panic handling RPC request: {e}"); - } - } - } - req = listener.accept() => { - let req = match req { - Ok(req) => req, - Err(e) => { - warn!("Error accepting RPC request: {e}"); - continue; - } - }; - let engine = engine.clone(); - tasks.spawn(async move { - let (req, client) = match req.read_first().await { - Ok((req, client)) => (req, client), - Err(e) => { - warn!("Error reading first message: {e}"); - return; - } - }; - if let Err(cause) = engine.handle_rpc_request(req, client).await { - warn!("Error handling RPC request: {:?}", cause); - } - }); - } - } - } - }); - Self { - client, - _handler: AbortOnDropHandle::new(task), - } + let _handler = listener + .spawn_accept_loop(move |req, chan| engine.clone().handle_rpc_request(req, chan)); + Self { client, _handler } } } From 0f9249c64fb277251e6a2921a36bd83a8a66305e Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 19:05:18 +0200 Subject: [PATCH 5/7] use released quic-rpc --- Cargo.lock | 5 +++-- Cargo.toml | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 081b196..5a0a796 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3393,8 +3393,9 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.15.0" -source = "git+https://github.com/n0-computer/quic-rpc?branch=main#32d5bc1a08609f4f0b5650980088f07d81971a55" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc623a188942fc875926f7baeb2cb08ed4288b64f29072656eb051e360ee7623" dependencies = [ "anyhow", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index 29597c2..6a0485c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ tracing = "0.1" # rpc nested_enum_utils = { version = "0.1.0", optional = true } -quic-rpc = { version = "0.15", optional = true } +quic-rpc = { version = "0.15.1", optional = true } quic-rpc-derive = { version = "0.15", optional = true } serde-error = { version = "0.1.3", optional = true } portable-atomic = { version = "1.9.0", optional = true } @@ -105,4 +105,3 @@ iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" } iroh-gossip = { git = "https://github.com/n0-computer/iroh-gossip", branch = "main" } -quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" } From 17aba72712d7eb68a8bd2bb1519a98b7e8712657 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 19:11:40 +0200 Subject: [PATCH 6/7] more imports to make the types less ugly --- src/rpc.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/rpc.rs b/src/rpc.rs index 1b01e60..46f10dc 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,6 +1,6 @@ //! Quic RPC implementation for docs. -use proto::RpcService; +use proto::{Request, Response, RpcService}; use quic_rpc::{ server::{ChannelTypes, RpcChannel}, transport::flume::FlumeConnector, @@ -20,11 +20,7 @@ type RpcResult = std::result::Result; impl Engine { /// Get an in memory client to interact with the docs engine. - pub fn client( - &self, - ) -> &crate::rpc::client::docs::Client< - FlumeConnector, - > { + pub fn client(&self) -> &client::docs::Client> { &self .rpc_handler .get_or_init(|| RpcHandler::new(self)) @@ -34,10 +30,10 @@ impl Engine { /// Handle a docs request from the RPC server. pub async fn handle_rpc_request>( self, - msg: crate::rpc::proto::Request, + msg: Request, chan: RpcChannel, ) -> Result<(), quic_rpc::server::RpcServerError> { - use crate::rpc::proto::Request::*; + use Request::*; let this = self; match msg { Open(msg) => chan.rpc(msg, this, Self::doc_open).await, @@ -85,9 +81,7 @@ impl Engine { #[derive(Debug)] pub(crate) struct RpcHandler { /// Client to hand out - client: crate::rpc::client::docs::Client< - FlumeConnector, - >, + client: client::docs::Client>, /// Handler task _handler: AbortOnDropHandle<()>, } @@ -97,7 +91,7 @@ impl RpcHandler { let engine = engine.clone(); let (listener, connector) = quic_rpc::transport::flume::channel(1); let listener = RpcServer::new(listener); - let client = crate::rpc::client::docs::Client::new(RpcClient::new(connector)); + let client = client::docs::Client::new(RpcClient::new(connector)); let _handler = listener .spawn_accept_loop(move |req, chan| engine.clone().handle_rpc_request(req, chan)); Self { client, _handler } From 3cf208a7ec59214c0bbeb6ba2429db40a2af5d17 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 19:30:24 +0200 Subject: [PATCH 7/7] Add MemClient type alias --- src/rpc.rs | 9 ++++----- src/rpc/client/docs.rs | 8 +++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/rpc.rs b/src/rpc.rs index 46f10dc..d1e974a 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,9 +1,8 @@ //! Quic RPC implementation for docs. -use proto::{Request, Response, RpcService}; +use proto::{Request, RpcService}; use quic_rpc::{ server::{ChannelTypes, RpcChannel}, - transport::flume::FlumeConnector, RpcClient, RpcServer, }; use tokio_util::task::AbortOnDropHandle; @@ -20,7 +19,7 @@ type RpcResult = std::result::Result; impl Engine { /// Get an in memory client to interact with the docs engine. - pub fn client(&self) -> &client::docs::Client> { + pub fn client(&self) -> &client::docs::MemClient { &self .rpc_handler .get_or_init(|| RpcHandler::new(self)) @@ -81,7 +80,7 @@ impl Engine { #[derive(Debug)] pub(crate) struct RpcHandler { /// Client to hand out - client: client::docs::Client>, + client: client::docs::MemClient, /// Handler task _handler: AbortOnDropHandle<()>, } @@ -91,7 +90,7 @@ impl RpcHandler { let engine = engine.clone(); let (listener, connector) = quic_rpc::transport::flume::channel(1); let listener = RpcServer::new(listener); - let client = client::docs::Client::new(RpcClient::new(connector)); + let client = client::docs::MemClient::new(RpcClient::new(connector)); let _handler = listener .spawn_accept_loop(move |req, chan| engine.clone().handle_rpc_request(req, chan)); Self { client, _handler } diff --git a/src/rpc/client/docs.rs b/src/rpc/client/docs.rs index a6e5d00..6f40409 100644 --- a/src/rpc/client/docs.rs +++ b/src/rpc/client/docs.rs @@ -16,7 +16,9 @@ use iroh_base::node_addr::AddrInfoOptions; use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash}; use iroh_net::NodeAddr; use portable_atomic::{AtomicBool, Ordering}; -use quic_rpc::{client::BoxedConnector, message::RpcMsg, Connector}; +use quic_rpc::{ + client::BoxedConnector, message::RpcMsg, transport::flume::FlumeConnector, Connector, +}; use serde::{Deserialize, Serialize}; use super::{authors, flatten}; @@ -38,6 +40,10 @@ pub use crate::{ Entry, }; +/// Type alias for a memory-backed client. +pub type MemClient = + Client>; + /// Iroh docs client. #[derive(Debug, Clone)] #[repr(transparent)]