Skip to content

Commit

Permalink
Add a way to get the authors client
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Nov 14, 2024
1 parent 9166b22 commit d743f29
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 3 deletions.
6 changes: 5 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
io,
path::PathBuf,
str::FromStr,
sync::{Arc, RwLock},
sync::{Arc, OnceLock, RwLock},
};

use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -56,6 +56,8 @@ pub struct Engine<D> {
content_status_cb: ContentStatusCallback,
local_pool_handle: LocalPoolHandle,
blob_store: D,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::RpcHandler>>,
}

impl<D: iroh_blobs::store::Store> Engine<D> {
Expand Down Expand Up @@ -118,6 +120,8 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
default_author: Arc::new(default_author),
local_pool_handle,
blob_store: bao_store,
#[cfg(feature = "rpc")]
rpc_handler: Default::default(),
})
}

Expand Down
80 changes: 79 additions & 1 deletion src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
//! Quic RPC implementation for docs.
use proto::RpcService;
use quic_rpc::server::{ChannelTypes, RpcChannel};
use quic_rpc::{
server::{ChannelTypes, RpcChannel},
transport::flume::FlumeConnector,
RpcClient, RpcServer,
};
use tokio::task::JoinSet;
use tokio_util::task::AbortOnDropHandle;
use tracing::{error, warn};

use crate::engine::Engine;

Expand All @@ -14,6 +21,18 @@ type RpcError = serde_error::Error;
type RpcResult<T> = std::result::Result<T, RpcError>;

impl<D: iroh_blobs::store::Store> Engine<D> {
/// Get an in memory client to interact with the docs engine.
pub fn client(
&self,
) -> &crate::rpc::client::docs::Client<
FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>,
> {
&self
.rpc_handler
.get_or_init(|| RpcHandler::new(self))
.client
}

/// Handle a docs request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
&self,
Expand Down Expand Up @@ -65,3 +84,62 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
}
}
}

#[derive(Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
client: crate::rpc::client::docs::Client<
FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>,
>,
/// Handler task
_handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
let engine = engine.clone();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = crate::rpc::client::docs::Client::new(RpcClient::new(connector));
let task = tokio::spawn(async move {
let mut tasks = JoinSet::new();
loop {
tokio::select! {
Some(res) = tasks.join_next(), if !tasks.is_empty() => {
if let Err(e) = res {
if e.is_panic() {
error!("Panic handling RPC request: {e}");
}
}
}
req = listener.accept() => {
let req = match req {
Ok(req) => req,
Err(e) => {
warn!("Error accepting RPC request: {e}");
continue;
}
};
let engine = engine.clone();
tasks.spawn(async move {
let (req, client) = match req.read_first().await {
Ok((req, client)) => (req, client),
Err(e) => {
warn!("Error reading first message: {e}");
return;
}
};
if let Err(cause) = engine.handle_rpc_request(req, client).await {
warn!("Error handling RPC request: {:?}", cause);
}
});
}
}
}
});
Self {
client,
_handler: AbortOnDropHandle::new(task),
}
}
}
7 changes: 6 additions & 1 deletion src/rpc/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use portable_atomic::{AtomicBool, Ordering};
use quic_rpc::{client::BoxedConnector, message::RpcMsg, Connector};
use serde::{Deserialize, Serialize};

use super::flatten;
use super::{authors, flatten};
use crate::{
actor::OpenState,
rpc::proto::{
Expand Down Expand Up @@ -50,6 +50,11 @@ impl<C: Connector<RpcService>> Client<C> {
Self { rpc }
}

/// Returns an authors client.
pub fn authors(&self) -> authors::Client<C> {
authors::Client::new(self.rpc.clone())
}

/// Creates a client.
pub async fn create(&self) -> Result<Doc<C>> {
let res = self.rpc.rpc(CreateRequest {}).await??;
Expand Down

0 comments on commit d743f29

Please sign in to comment.