From d743f299af701d4195398e478aa4e42f5cab2738 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 15:31:00 +0200 Subject: [PATCH] Add a way to get the authors client --- src/engine.rs | 6 +++- src/rpc.rs | 80 +++++++++++++++++++++++++++++++++++++++++- src/rpc/client/docs.rs | 7 +++- 3 files changed, 90 insertions(+), 3 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index b9727e2..e596fc1 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -6,7 +6,7 @@ use std::{ io, path::PathBuf, str::FromStr, - sync::{Arc, RwLock}, + sync::{Arc, OnceLock, RwLock}, }; use anyhow::{bail, Context, Result}; @@ -56,6 +56,8 @@ pub struct Engine { content_status_cb: ContentStatusCallback, local_pool_handle: LocalPoolHandle, blob_store: D, + #[cfg(feature = "rpc")] + pub(crate) rpc_handler: Arc>, } impl Engine { @@ -118,6 +120,8 @@ impl Engine { default_author: Arc::new(default_author), local_pool_handle, blob_store: bao_store, + #[cfg(feature = "rpc")] + rpc_handler: Default::default(), }) } diff --git a/src/rpc.rs b/src/rpc.rs index 8e196f9..c836a02 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,7 +1,14 @@ //! Quic RPC implementation for docs. use proto::RpcService; -use quic_rpc::server::{ChannelTypes, RpcChannel}; +use quic_rpc::{ + server::{ChannelTypes, RpcChannel}, + transport::flume::FlumeConnector, + RpcClient, RpcServer, +}; +use tokio::task::JoinSet; +use tokio_util::task::AbortOnDropHandle; +use tracing::{error, warn}; use crate::engine::Engine; @@ -14,6 +21,18 @@ type RpcError = serde_error::Error; type RpcResult = std::result::Result; impl Engine { + /// Get an in memory client to interact with the docs engine. + pub fn client( + &self, + ) -> &crate::rpc::client::docs::Client< + FlumeConnector, + > { + &self + .rpc_handler + .get_or_init(|| RpcHandler::new(self)) + .client + } + /// Handle a docs request from the RPC server. pub async fn handle_rpc_request>( &self, @@ -65,3 +84,62 @@ impl Engine { } } } + +#[derive(Debug)] +pub(crate) struct RpcHandler { + /// Client to hand out + client: crate::rpc::client::docs::Client< + FlumeConnector, + >, + /// Handler task + _handler: AbortOnDropHandle<()>, +} + +impl RpcHandler { + fn new(engine: &Engine) -> Self { + let engine = engine.clone(); + let (listener, connector) = quic_rpc::transport::flume::channel(1); + let listener = RpcServer::new(listener); + let client = crate::rpc::client::docs::Client::new(RpcClient::new(connector)); + let task = tokio::spawn(async move { + let mut tasks = JoinSet::new(); + loop { + tokio::select! { + Some(res) = tasks.join_next(), if !tasks.is_empty() => { + if let Err(e) = res { + if e.is_panic() { + error!("Panic handling RPC request: {e}"); + } + } + } + req = listener.accept() => { + let req = match req { + Ok(req) => req, + Err(e) => { + warn!("Error accepting RPC request: {e}"); + continue; + } + }; + let engine = engine.clone(); + tasks.spawn(async move { + let (req, client) = match req.read_first().await { + Ok((req, client)) => (req, client), + Err(e) => { + warn!("Error reading first message: {e}"); + return; + } + }; + if let Err(cause) = engine.handle_rpc_request(req, client).await { + warn!("Error handling RPC request: {:?}", cause); + } + }); + } + } + } + }); + Self { + client, + _handler: AbortOnDropHandle::new(task), + } + } +} diff --git a/src/rpc/client/docs.rs b/src/rpc/client/docs.rs index 540defe..f27bfdb 100644 --- a/src/rpc/client/docs.rs +++ b/src/rpc/client/docs.rs @@ -19,7 +19,7 @@ use portable_atomic::{AtomicBool, Ordering}; use quic_rpc::{client::BoxedConnector, message::RpcMsg, Connector}; use serde::{Deserialize, Serialize}; -use super::flatten; +use super::{authors, flatten}; use crate::{ actor::OpenState, rpc::proto::{ @@ -50,6 +50,11 @@ impl> Client { Self { rpc } } + /// Returns an authors client. + pub fn authors(&self) -> authors::Client { + authors::Client::new(self.rpc.clone()) + } + /// Creates a client. pub async fn create(&self) -> Result> { let res = self.rpc.rpc(CreateRequest {}).await??;