Skip to content

Commit

Permalink
Merge branch 'main' of github.com:n0-computer/quic-rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
fogodev committed Nov 6, 2024
2 parents 6c09355 + 8e323b5 commit 9df28ec
Show file tree
Hide file tree
Showing 34 changed files with 1,637 additions and 1,160 deletions.
755 changes: 510 additions & 245 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "quic-rpc"
version = "0.14.0"
version = "0.15.0"
edition = "2021"
authors = ["Rüdiger Klaehn <[email protected]>", "n0 team"]
keywords = ["api", "protocol", "network", "rpc"]
Expand Down Expand Up @@ -50,13 +50,14 @@ tracing-subscriber = "0.3.16"
tempfile = "3.5.0"
proc-macro2 = "1.0.66"
futures-buffered = "0.2.4"
testresult = "0.4.1"
nested_enum_utils = "0.1.0"

[features]
hyper-transport = ["dep:flume", "dep:hyper", "dep:bincode", "dep:bytes", "dep:tokio-serde", "dep:tokio-util"]
quinn-transport = ["dep:flume", "dep:quinn", "dep:bincode", "dep:tokio-serde", "dep:tokio-util"]
flume-transport = ["dep:flume"]
iroh-net-transport = ["dep:iroh-net", "dep:flume", "dep:quinn", "dep:bincode", "dep:tokio-serde", "dep:tokio-util"]
combined-transport = []
macros = []
default = ["flume-transport"]

Expand Down
4 changes: 2 additions & 2 deletions examples/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ impl Fs {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let fs = Fs;
let (server, client) = quic_rpc::transport::flume::service_connection::<IoService>(1);
let client = RpcClient::new(client);
let (server, client) = quic_rpc::transport::flume::channel(1);
let client = RpcClient::<IoService, _>::new(client);
let server = RpcServer::new(server);
let handle = tokio::task::spawn(async move {
for _ in 0..1 {
Expand Down
2 changes: 1 addition & 1 deletion examples/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ create_store_dispatch!(Store, dispatch_store_request);

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (server, client) = flume::service_connection::<StoreService>(1);
let (server, client) = flume::channel(1);
let server_handle = tokio::task::spawn(async move {
let target = Store;
run_server_loop(StoreService, server, target, dispatch_store_request).await
Expand Down
137 changes: 59 additions & 78 deletions examples/modularize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use anyhow::Result;
use futures_lite::StreamExt;
use futures_util::SinkExt;
use quic_rpc::{transport::flume, RpcClient, RpcServer, ServiceConnection, ServiceEndpoint};
use quic_rpc::{client::BoxedConnector, transport::flume, Listener, RpcClient, RpcServer};
use tracing::warn;

use app::AppService;
Expand All @@ -19,19 +19,19 @@ use app::AppService;
async fn main() -> Result<()> {
// Spawn an inmemory connection.
// Could use quic equally (all code in this example is generic over the transport)
let (server_conn, client_conn) = flume::service_connection::<AppService>(1);
let (server_conn, client_conn) = flume::channel(1);

// spawn the server
let handler = app::Handler::default();
tokio::task::spawn(run_server(server_conn, handler));

// run a client demo
client_demo(client_conn).await?;
client_demo(BoxedConnector::<AppService>::new(client_conn)).await?;

Ok(())
}

async fn run_server<C: ServiceEndpoint<AppService>>(server_conn: C, handler: app::Handler) {
async fn run_server<C: Listener<AppService>>(server_conn: C, handler: app::Handler) {
let server = RpcServer::<AppService, _>::new(server_conn);
loop {
let Ok(accepting) = server.accept().await else {
Expand All @@ -50,8 +50,8 @@ async fn run_server<C: ServiceEndpoint<AppService>>(server_conn: C, handler: app
}
}
}
pub async fn client_demo<C: ServiceConnection<AppService>>(conn: C) -> Result<()> {
let rpc_client = RpcClient::new(conn);
pub async fn client_demo(conn: BoxedConnector<AppService>) -> Result<()> {
let rpc_client = RpcClient::<AppService>::new(conn);
let client = app::Client::new(rpc_client.clone());

// call a method from the top-level app client
Expand Down Expand Up @@ -99,15 +99,12 @@ mod app {
//!
//! It could also easily compose services from other crates or internal modules.
use super::iroh;
use anyhow::Result;
use derive_more::{From, TryInto};
use quic_rpc::{
message::RpcMsg, server::RpcChannel, RpcClient, Service, ServiceConnection, ServiceEndpoint,
};
use quic_rpc::{message::RpcMsg, server::RpcChannel, Listener, RpcClient, Service};
use serde::{Deserialize, Serialize};

use super::iroh;

#[derive(Debug, Serialize, Deserialize, From, TryInto)]
pub enum Request {
Iroh(iroh::Request),
Expand Down Expand Up @@ -153,13 +150,17 @@ mod app {
}

impl Handler {
pub async fn handle_rpc_request<E: ServiceEndpoint<AppService>>(
pub async fn handle_rpc_request<E: Listener<AppService>>(
self,
req: Request,
chan: RpcChannel<AppService, E, AppService>,
chan: RpcChannel<AppService, E>,
) -> Result<()> {
match req {
Request::Iroh(req) => self.iroh.handle_rpc_request(req, chan.map()).await?,
Request::Iroh(req) => {
self.iroh
.handle_rpc_request(req, chan.map().boxed())
.await?
}
Request::AppVersion(req) => chan.rpc(req, self, Self::on_version).await?,
};
Ok(())
Expand All @@ -171,20 +172,16 @@ mod app {
}

#[derive(Debug, Clone)]
pub struct Client<S: Service, C: ServiceConnection<S>> {
pub iroh: iroh::Client<S, C>,
client: RpcClient<AppService, C, S>,
pub struct Client {
pub iroh: iroh::Client,
client: RpcClient<AppService>,
}

impl<S, C> Client<S, C>
where
S: Service,
C: ServiceConnection<S>,
{
pub fn new(client: RpcClient<AppService, C, S>) -> Self {
impl Client {
pub fn new(client: RpcClient<AppService>) -> Self {
Self {
iroh: iroh::Client::new(client.clone().map()),
client,
client: client.clone(),
iroh: iroh::Client::new(client.map().boxed()),
}
}

Expand All @@ -202,7 +199,7 @@ mod iroh {
use anyhow::Result;
use derive_more::{From, TryInto};
use quic_rpc::{server::RpcChannel, RpcClient, Service, ServiceConnection, ServiceEndpoint};
use quic_rpc::{server::RpcChannel, RpcClient, Service};
use serde::{Deserialize, Serialize};

use super::{calc, clock};
Expand Down Expand Up @@ -233,38 +230,38 @@ mod iroh {
}

impl Handler {
pub async fn handle_rpc_request<S, E>(
pub async fn handle_rpc_request(
self,
req: Request,
chan: RpcChannel<IrohService, E, S>,
) -> Result<()>
where
S: Service,
E: ServiceEndpoint<S>,
{
chan: RpcChannel<IrohService>,
) -> Result<()> {
match req {
Request::Calc(req) => self.calc.handle_rpc_request(req, chan.map()).await?,
Request::Clock(req) => self.clock.handle_rpc_request(req, chan.map()).await?,
Request::Calc(req) => {
self.calc
.handle_rpc_request(req, chan.map().boxed())
.await?
}
Request::Clock(req) => {
self.clock
.handle_rpc_request(req, chan.map().boxed())
.await?
}
}
Ok(())
}
}

#[derive(Debug, Clone)]
pub struct Client<S, C> {
pub calc: calc::Client<S, C>,
pub clock: clock::Client<S, C>,
pub struct Client {
pub calc: calc::Client,
pub clock: clock::Client,
}

impl<S, C> Client<S, C>
where
S: Service,
C: ServiceConnection<S>,
{
pub fn new(client: RpcClient<IrohService, C, S>) -> Self {
impl Client {
pub fn new(client: RpcClient<IrohService>) -> Self {
Self {
calc: calc::Client::new(client.clone().map()),
clock: clock::Client::new(client.clone().map()),
calc: calc::Client::new(client.clone().map().boxed()),
clock: clock::Client::new(client.clone().map().boxed()),
}
}
}
Expand All @@ -280,7 +277,7 @@ mod calc {
use quic_rpc::{
message::{ClientStreaming, ClientStreamingMsg, Msg, RpcMsg},
server::RpcChannel,
RpcClient, Service, ServiceConnection, ServiceEndpoint,
RpcClient, Service,
};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
Expand Down Expand Up @@ -337,15 +334,11 @@ mod calc {
pub struct Handler;

impl Handler {
pub async fn handle_rpc_request<S, E>(
pub async fn handle_rpc_request(
self,
req: Request,
chan: RpcChannel<CalcService, E, S>,
) -> Result<()>
where
S: Service,
E: ServiceEndpoint<S>,
{
chan: RpcChannel<CalcService>,
) -> Result<()> {
match req {
Request::Add(req) => chan.rpc(req, self, Self::on_add).await?,
Request::Sum(req) => chan.client_streaming(req, self, Self::on_sum).await?,
Expand Down Expand Up @@ -373,16 +366,12 @@ mod calc {
}

#[derive(Debug, Clone)]
pub struct Client<S, C> {
client: RpcClient<CalcService, C, S>,
pub struct Client {
client: RpcClient<CalcService>,
}

impl<S, C> Client<S, C>
where
C: ServiceConnection<S>,
S: Service,
{
pub fn new(client: RpcClient<CalcService, C, S>) -> Self {
impl Client {
pub fn new(client: RpcClient<CalcService>) -> Self {
Self { client }
}
pub async fn add(&self, a: i64, b: i64) -> anyhow::Result<i64> {
Expand All @@ -403,7 +392,7 @@ mod clock {
use quic_rpc::{
message::{Msg, ServerStreaming, ServerStreamingMsg},
server::RpcChannel,
RpcClient, Service, ServiceConnection, ServiceEndpoint,
RpcClient, Service,
};
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -475,15 +464,11 @@ mod clock {
h
}

pub async fn handle_rpc_request<S, E>(
pub async fn handle_rpc_request(
self,
req: Request,
chan: RpcChannel<ClockService, E, S>,
) -> Result<()>
where
S: Service,
E: ServiceEndpoint<S>,
{
chan: RpcChannel<ClockService>,
) -> Result<()> {
match req {
Request::Tick(req) => chan.server_streaming(req, self, Self::on_tick).await?,
}
Expand Down Expand Up @@ -517,16 +502,12 @@ mod clock {
}

#[derive(Debug, Clone)]
pub struct Client<S, C> {
client: RpcClient<ClockService, C, S>,
pub struct Client {
client: RpcClient<ClockService>,
}

impl<S, C> Client<S, C>
where
C: ServiceConnection<S>,
S: Service,
{
pub fn new(client: RpcClient<ClockService, C, S>) -> Self {
impl Client {
pub fn new(client: RpcClient<ClockService>) -> Self {
Self { client }
}
pub async fn tick(&self) -> Result<BoxStream<Result<usize>>> {
Expand Down
4 changes: 2 additions & 2 deletions examples/split/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use anyhow::Result;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use quic_rpc::transport::quinn::QuinnConnection;
use quic_rpc::transport::quinn::QuinnConnector;
use quic_rpc::RpcClient;
use quinn::crypto::rustls::QuicClientConfig;
use quinn::{ClientConfig, Endpoint};
Expand All @@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let server_addr: SocketAddr = "127.0.0.1:12345".parse()?;
let endpoint = make_insecure_client_endpoint("0.0.0.0:0".parse()?)?;
let client = QuinnConnection::new(endpoint, server_addr, "localhost".to_string());
let client = QuinnConnector::new(endpoint, server_addr, "localhost".to_string());
let client = RpcClient::new(client);
// let mut client = ComputeClient(client);

Expand Down
4 changes: 2 additions & 2 deletions examples/split/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_stream::stream;
use futures::stream::{Stream, StreamExt};
use quic_rpc::server::run_server_loop;
use quic_rpc::transport::quinn::QuinnServerEndpoint;
use quic_rpc::transport::quinn::QuinnListener;
use quinn::{Endpoint, ServerConfig};
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let server_addr: SocketAddr = "127.0.0.1:12345".parse()?;
let (server, _server_certs) = make_server_endpoint(server_addr)?;
let channel = QuinnServerEndpoint::new(server)?;
let channel = QuinnListener::new(server)?;
let target = Compute;
run_server_loop(
ComputeService,
Expand Down
9 changes: 5 additions & 4 deletions examples/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
use quic_rpc::{
server::RpcServerError,
transport::{flume, Connection, ServerEndpoint},
transport::{flume, Connector},
*,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -162,7 +162,7 @@ impl Store {

#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn server_future<C: ServiceEndpoint<StoreService>>(
async fn server_future<C: Listener<StoreService>>(
server: RpcServer<StoreService, C>,
) -> result::Result<(), RpcServerError<C>> {
let s = server;
Expand All @@ -184,7 +184,7 @@ async fn main() -> anyhow::Result<()> {
}
}

let (server, client) = flume::service_connection::<StoreService>(1);
let (server, client) = flume::channel(1);
let client = RpcClient::<StoreService, _>::new(client);
let server = RpcServer::<StoreService, _>::new(server);
let server_handle = tokio::task::spawn(server_future(server));
Expand Down Expand Up @@ -231,13 +231,14 @@ async fn main() -> anyhow::Result<()> {
}

async fn _main_unsugared() -> anyhow::Result<()> {
use transport::Listener;
#[derive(Clone, Debug)]
struct Service;
impl crate::Service for Service {
type Req = u64;
type Res = String;
}
let (server, client) = flume::service_connection::<Service>(1);
let (server, client) = flume::channel::<u64, String>(1);
let to_string_service = tokio::spawn(async move {
let (mut send, mut recv) = server.accept().await?;
while let Some(item) = recv.next().await {
Expand Down
Loading

0 comments on commit 9df28ec

Please sign in to comment.