Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Tokio mem channel #131

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rustls = { version = "0.23", default-features = false, features = ["ring"], opti
slab = "0.4.9" # iroh-quinn
smallvec = "1.13.2"
time = "0.3.36" # serde
tokio-stream = "0.1.17"

[dev-dependencies]
anyhow = "1.0.73"
Expand Down
2 changes: 1 addition & 1 deletion examples/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {
let fs = Fs;
let (server, client) = quic_rpc::transport::flume::channel(1);
let client = RpcClient::<IoService, _>::new(client);
let server = RpcServer::new(server);
let mut server = RpcServer::new(server);
let handle = tokio::task::spawn(async move {
for _ in 0..1 {
let (req, chan) = server.accept().await?.read_first().await?;
Expand Down
4 changes: 2 additions & 2 deletions examples/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn main() -> anyhow::Result<()> {
async fn server_future<C: Listener<StoreService>>(
server: RpcServer<StoreService, C>,
) -> result::Result<(), RpcServerError<C>> {
let s = server;
let mut s = server;
let store = Store;
loop {
let (req, chan) = s.accept().await?.read_first().await?;
Expand Down Expand Up @@ -239,7 +239,7 @@ async fn _main_unsugared() -> anyhow::Result<()> {
type Req = u64;
type Res = String;
}
let (server, client) = flume::channel::<u64, String>(1);
let (mut server, client) = flume::channel::<u64, String>(1);
let to_string_service = tokio::spawn(async move {
let (mut send, mut recv) = server.accept().await?;
while let Some(item) = recv.next().await {
Expand Down
15 changes: 3 additions & 12 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ pub struct RpcServer<S, C = BoxedListener<S>> {
_p: PhantomData<S>,
}

impl<S, C: Clone> Clone for RpcServer<S, C> {
fn clone(&self) -> Self {
Self {
source: self.source.clone(),
_p: PhantomData,
}
}
}

impl<S: Service, C: Listener<S>> RpcServer<S, C> {
/// Create a new rpc server for a specific service for a [Service] given a compatible
/// [Listener].
Expand Down Expand Up @@ -201,7 +192,7 @@ impl<S: Service, C: Listener<S>> Accepting<S, C> {
impl<S: Service, C: Listener<S>> RpcServer<S, C> {
/// Accepts a new channel from a client. The result is an [Accepting] object that
/// can be used to read the first request.
pub async fn accept(&self) -> result::Result<Accepting<S, C>, RpcServerError<C>> {
pub async fn accept(&mut self) -> result::Result<Accepting<S, C>, RpcServerError<C>> {
let (send, recv) = self.source.accept().await.map_err(RpcServerError::Accept)?;
Ok(Accepting {
send,
Expand All @@ -220,7 +211,7 @@ impl<S: Service, C: Listener<S>> RpcServer<S, C> {
/// Each request will be handled in a separate task.
///
/// It is the caller's responsibility to poll the returned future to drive the server.
pub async fn accept_loop<Fun, Fut, E>(self, handler: Fun)
pub async fn accept_loop<Fun, Fut, E>(mut self, handler: Fun)
where
S: Service,
C: Listener<S>,
Expand Down Expand Up @@ -462,7 +453,7 @@ where
F: FnMut(RpcChannel<S, C>, S::Req, T) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), RpcServerError<C>>> + Send + 'static,
{
let server: RpcServer<S, C> = RpcServer::<S, C>::new(conn);
let mut server: RpcServer<S, C> = RpcServer::<S, C>::new(conn);
loop {
let (req, chan) = server.accept().await?.read_first().await?;
let target = target.clone();
Expand Down
33 changes: 6 additions & 27 deletions src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,8 @@ impl<In: RpcMessage, Out: RpcMessage> StreamTypes for BoxedStreamTypes<In, Out>

/// A boxable listener
pub trait BoxableListener<In: RpcMessage, Out: RpcMessage>: Debug + Send + Sync + 'static {
/// Clone the listener and box it
fn clone_box(&self) -> Box<dyn BoxableListener<In, Out>>;

/// Accept a channel from a remote client
fn accept_bi_boxed(&self) -> AcceptFuture<In, Out>;
fn accept_bi_boxed(&mut self) -> AcceptFuture<In, Out>;

/// Get the local address
fn local_addr(&self) -> &[super::LocalAddr];
Expand All @@ -311,12 +308,6 @@ impl<In: RpcMessage, Out: RpcMessage> BoxedListener<In, Out> {
}
}

impl<In: RpcMessage, Out: RpcMessage> Clone for BoxedListener<In, Out> {
fn clone(&self) -> Self {
Self(self.0.clone_box())
}
}

impl<In: RpcMessage, Out: RpcMessage> StreamTypes for BoxedListener<In, Out> {
type In = In;
type Out = Out;
Expand All @@ -333,7 +324,7 @@ impl<In: RpcMessage, Out: RpcMessage> ConnectionErrors for BoxedListener<In, Out

impl<In: RpcMessage, Out: RpcMessage> super::Listener for BoxedListener<In, Out> {
fn accept(
&self,
&mut self,
) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::AcceptError>> + Send
{
self.0.accept_bi_boxed()
Expand Down Expand Up @@ -378,11 +369,7 @@ impl<In: RpcMessage, Out: RpcMessage> BoxableConnector<In, Out>
impl<In: RpcMessage, Out: RpcMessage> BoxableListener<In, Out>
for super::quinn::QuinnListener<In, Out>
{
fn clone_box(&self) -> Box<dyn BoxableListener<In, Out>> {
Box::new(self.clone())
}

fn accept_bi_boxed(&self) -> AcceptFuture<In, Out> {
fn accept_bi_boxed(&mut self) -> AcceptFuture<In, Out> {
let f = async move {
let (send, recv) = super::Listener::accept(self).await?;
let send = send.sink_map_err(anyhow::Error::from);
Expand Down Expand Up @@ -422,11 +409,7 @@ impl<In: RpcMessage, Out: RpcMessage> BoxableConnector<In, Out>
impl<In: RpcMessage, Out: RpcMessage> BoxableListener<In, Out>
for super::iroh::IrohListener<In, Out>
{
fn clone_box(&self) -> Box<dyn BoxableListener<In, Out>> {
Box::new(self.clone())
}

fn accept_bi_boxed(&self) -> AcceptFuture<In, Out> {
fn accept_bi_boxed(&mut self) -> AcceptFuture<In, Out> {
let f = async move {
let (send, recv) = super::Listener::accept(self).await?;
let send = send.sink_map_err(anyhow::Error::from);
Expand Down Expand Up @@ -458,11 +441,7 @@ impl<In: RpcMessage, Out: RpcMessage> BoxableConnector<In, Out>
impl<In: RpcMessage, Out: RpcMessage> BoxableListener<In, Out>
for super::flume::FlumeListener<In, Out>
{
fn clone_box(&self) -> Box<dyn BoxableListener<In, Out>> {
Box::new(self.clone())
}

fn accept_bi_boxed(&self) -> AcceptFuture<In, Out> {
fn accept_bi_boxed(&mut self) -> AcceptFuture<In, Out> {
AcceptFuture::direct(super::Listener::accept(self))
}

Expand Down Expand Up @@ -520,7 +499,7 @@ mod tests {
use crate::transport::{Connector, Listener};

let (server, client) = crate::transport::flume::channel(1);
let server = super::BoxedListener::new(server);
let mut server = super::BoxedListener::new(server);
let client = super::BoxedConnector::new(client);
// spawn echo server
tokio::spawn(async move {
Expand Down
6 changes: 3 additions & 3 deletions src/transport/combined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,17 +241,17 @@ impl<A: Listener, B: Listener<In = A::In, Out = A::Out>> StreamTypes for Combine
}

impl<A: Listener, B: Listener<In = A::In, Out = A::Out>> Listener for CombinedListener<A, B> {
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::AcceptError> {
async fn accept(&mut self) -> Result<(Self::SendSink, Self::RecvStream), Self::AcceptError> {
let a_fut = async {
if let Some(a) = &self.a {
if let Some(a) = &mut self.a {
let (send, recv) = a.accept().await.map_err(AcceptError::A)?;
Ok((SendSink::A(send), RecvStream::A(recv)))
} else {
std::future::pending().await
}
};
let b_fut = async {
if let Some(b) = &self.b {
if let Some(b) = &mut self.b {
let (send, recv) = b.accept().await.map_err(AcceptError::B)?;
Ok((SendSink::B(send), RecvStream::B(recv)))
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/transport/flume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl<In: RpcMessage, Out: RpcMessage> StreamTypes for FlumeListener<In, Out> {

impl<In: RpcMessage, Out: RpcMessage> Listener for FlumeListener<In, Out> {
#[allow(refining_impl_trait)]
fn accept(&self) -> AcceptFuture<In, Out> {
fn accept(&mut self) -> AcceptFuture<In, Out> {
AcceptFuture {
wrapped: self.stream.clone().into_recv_async(),
_p: PhantomData,
Expand Down
2 changes: 1 addition & 1 deletion src/transport/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ impl<In: RpcMessage, Out: RpcMessage> Listener for HyperListener<In, Out> {
&self.local_addr
}

async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), AcceptError> {
async fn accept(&mut self) -> Result<(Self::SendSink, Self::RecvStream), AcceptError> {
let (recv, send) = self
.channel
.recv_async()
Expand Down
2 changes: 1 addition & 1 deletion src/transport/iroh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl<In: RpcMessage, Out: RpcMessage> StreamTypes for IrohListener<In, Out> {
}

impl<In: RpcMessage, Out: RpcMessage> Listener for IrohListener<In, Out> {
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), AcceptError> {
async fn accept(&mut self) -> Result<(Self::SendSink, Self::RecvStream), AcceptError> {
let (send, recv) = self
.inner
.receiver
Expand Down
2 changes: 1 addition & 1 deletion src/transport/mapped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ mod tests {
// create a listener / connector pair. Type will be inferred
let (s, c) = crate::transport::flume::channel(32);
// wrap the server in a RpcServer, this is where the service type is specified
let server = RpcServer::<FullService, _>::new(s.clone());
let mut server = RpcServer::<FullService, _>::new(s.clone());
// when using a boxed transport, we can omit the transport type and use the default
let _server_boxed: RpcServer<FullService> = RpcServer::<FullService>::new(s.boxed());
// create a client in a RpcClient, this is where the service type is specified
Expand Down
2 changes: 1 addition & 1 deletion src/transport/misc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<In: RpcMessage, Out: RpcMessage> StreamTypes for DummyListener<In, Out> {
}

impl<In: RpcMessage, Out: RpcMessage> Listener for DummyListener<In, Out> {
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::AcceptError> {
async fn accept(&mut self) -> Result<(Self::SendSink, Self::RecvStream), Self::AcceptError> {
futures_lite::future::pending().await
}

Expand Down
7 changes: 4 additions & 3 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ pub mod mapped;
pub mod misc;
#[cfg(feature = "quinn-transport")]
pub mod quinn;
pub mod tokio;

#[cfg(any(feature = "quinn-transport", feature = "iroh-transport"))]
mod util;

/// Errors that can happen when creating and using a [`Connector`] or [`Listener`].
pub trait ConnectionErrors: Debug + Clone + Send + Sync + 'static {
pub trait ConnectionErrors: Debug + Send + Sync + 'static {
/// Error when sending a message via a channel
type SendError: RpcError;
/// Error when receiving a message via a channel
Expand Down Expand Up @@ -78,7 +79,7 @@ pub trait StreamTypes: ConnectionErrors {
/// A connection to a specific remote machine
///
/// A connection can be used to open bidirectional typed channels using [`Connector::open`].
pub trait Connector: StreamTypes {
pub trait Connector: StreamTypes + Clone {
/// Open a channel to the remote che
fn open(
&self,
Expand Down Expand Up @@ -110,7 +111,7 @@ pub trait Listener: StreamTypes {
/// Accept a new typed bidirectional channel on any of the connections we
/// have currently opened.
fn accept(
&self,
&mut self,
) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::AcceptError>> + Send;

/// The local addresses this endpoint is bound to.
Expand Down
2 changes: 1 addition & 1 deletion src/transport/quinn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<In: RpcMessage, Out: RpcMessage> StreamTypes for QuinnListener<In, Out> {
}

impl<In: RpcMessage, Out: RpcMessage> Listener for QuinnListener<In, Out> {
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), AcceptError> {
async fn accept(&mut self) -> Result<(Self::SendSink, Self::RecvStream), AcceptError> {
let (send, recv) = self
.inner
.receiver
Expand Down
Loading
Loading