Skip to content

Commit

Permalink
Added Transmog compatability.
Browse files Browse the repository at this point in the history
This commit adds support for specifying the serialization format at the
time of accepting a connection, which means that ALPN protocol
negotiation can be used to control which serialization format is used on
an incoming connection.

The next feature is the ability to switch serialization formats on a
per-stream basis, after the r#type negotation.
  • Loading branch information
ecton committed Jan 5, 2022
1 parent 04336dd commit 60f1873
Show file tree
Hide file tree
Showing 14 changed files with 519 additions and 134 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ trust-dns = ["trust-dns-resolver"]

[dependencies]
async-trait = "0.1"
bincode = "1"
transmog = "0.1.0-dev.2"
bytes = "1"
ct-logs = "0.9"
flume = "0.10"
Expand Down Expand Up @@ -54,6 +54,8 @@ fabruic = { path = "", features = ["rcgen", "test"] }
quinn-proto = { version = "0.8", default-features = false }
tokio = { version = "1", features = ["macros"] }
trust-dns-proto = "0.21.0-alpha.4"
transmog-bincode = { version = "0.1.0-dev.2" }
transmog-pot = { version = "0.1.0-dev.2" }

[profile.release]
codegen-units = 1
Expand Down
5 changes: 3 additions & 2 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{Error, Result};
use fabruic::{Endpoint, KeyPair};
use futures_util::{future, StreamExt, TryFutureExt};
use transmog_bincode::Bincode;

const SERVER_NAME: &str = "test";
/// Some random port.
Expand Down Expand Up @@ -38,7 +39,7 @@ async fn main() -> Result<()> {
index,
connecting.remote_address()
);
let connection = connecting.accept::<()>().await?;
let connection = connecting.accept::<(), _>(Bincode::default()).await?;
println!(
"[client:{}] Successfully connected to {}",
index,
Expand Down Expand Up @@ -107,7 +108,7 @@ async fn main() -> Result<()> {
// every new incoming connections is handled in it's own task
connections.push(
tokio::spawn(async move {
let mut connection = connecting.accept::<()>().await?;
let mut connection = connecting.accept::<(), _>(Bincode::default()).await?;
println!("[server] New Connection: {}", connection.remote_address());

// start listening to new incoming streams
Expand Down
25 changes: 17 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
// TODO: error type is becoming too big, split it up

use std::{
fmt::{self, Debug, Formatter},
fmt::{self, Debug, Display, Formatter},
io,
};

pub use bincode::ErrorKind;
use quinn::ConnectionClose;
pub use quinn::{ConnectError, ConnectionError, ReadError, WriteError};
use thiserror::Error;
Expand Down Expand Up @@ -207,7 +206,9 @@ impl From<ConnectionError> for Connecting {
reason,
}) if reason.as_ref() == b"peer doesn't support any known protocol"
&& error_code.to_string() == "the cryptographic handshake failed: error 120" =>
Self::ProtocolMismatch,
{
Self::ProtocolMismatch
}
other => Self::Connection(other),
}
}
Expand Down Expand Up @@ -241,23 +242,25 @@ pub enum Incoming {

/// Error receiving a message from a [`Receiver`](crate::Receiver).
#[derive(Debug, Error)]
#[allow(variant_size_differences)]
pub enum Receiver {
/// Failed to read from a [`Receiver`](crate::Receiver).
#[error("Error reading from `Receiver`: {0}")]
Read(#[from] ReadError),
/// Failed to [`Deserialize`](serde::Deserialize) a message from a
/// [`Receiver`](crate::Receiver).
#[error("Error deserializing a message from `Receiver`: {0}")]
Deserialize(#[from] ErrorKind),
Deserialize(Box<dyn SerializationError>),
}

/// Error sending a message to a [`Sender`](crate::Sender).
#[derive(Debug, Error)]
#[allow(variant_size_differences)]
pub enum Sender {
/// Failed to [`Serialize`](serde::Serialize) a message for a
/// [`Sender`](crate::Sender).
#[error("Error serializing a message to `Sender`: {0}")]
Serialize(ErrorKind),
Serialize(Box<dyn SerializationError>),
/// Failed to write to a [`Sender`](crate::Sender).
#[error("Error writing to `Sender`: {0}")]
Write(#[from] WriteError),
Expand All @@ -266,8 +269,14 @@ pub enum Sender {
Closed(#[from] AlreadyClosed),
}

impl From<Box<ErrorKind>> for Sender {
fn from(error: Box<ErrorKind>) -> Self {
Self::Serialize(*error)
impl Sender {
/// Returns a new instance after boxing `err`.
pub(crate) fn from_serialization<E: SerializationError>(err: E) -> Self {
Self::Serialize(Box::new(err))
}
}

/// An error raised from serialization.
pub trait SerializationError: Display + Debug + Send + Sync + 'static {}

impl<T> SerializationError for T where T: Display + Debug + Send + Sync + 'static {}
18 changes: 12 additions & 6 deletions src/quic/connection/connecting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
use std::net::SocketAddr;

use quinn::{crypto::rustls::HandshakeData, NewConnection};
use serde::{de::DeserializeOwned, Serialize};
use transmog::OwnedDeserializer;

use crate::{error, Connection};
use crate::{
error::{self, SerializationError},
Connection,
};

/// Represent's an intermediate state to build a [`Connection`].
#[must_use = "`Connecting` does nothing unless accepted with `Connecting::accept`"]
Expand Down Expand Up @@ -47,17 +50,20 @@ impl Connecting {
///
/// # Errors
/// [`error::Connecting`] if the [`Connection`] failed to be established.
pub async fn accept<T: DeserializeOwned + Serialize + Send + 'static>(
self,
) -> Result<Connection<T>, error::Connecting> {
pub async fn accept<T, F>(self, format: F) -> Result<Connection<T, F>, error::Connecting>
where
T: Send + 'static,
F: OwnedDeserializer<T> + Clone,
F::Error: SerializationError,
{
self.0
.await
.map(
|NewConnection {
connection,
bi_streams,
..
}| Connection::new(connection, bi_streams),
}| Connection::new(connection, bi_streams, format),
)
.map_err(error::Connecting::from)
}
Expand Down
85 changes: 70 additions & 15 deletions src/quic/connection/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,30 @@ use std::fmt::{self, Debug, Formatter};

use futures_util::StreamExt;
use quinn::{RecvStream, SendStream};
use serde::{de::DeserializeOwned, Serialize};
use transmog::{Format, OwnedDeserializer};

use super::ReceiverStream;
use crate::{error, Receiver, Sender};
use crate::{
error::{self, SerializationError},
Receiver, Sender,
};

/// An intermediate state to define which type to accept in this stream. See
/// [`accept_stream`](Self::accept).
#[must_use = "`Incoming` does nothing unless accepted with `Incoming::accept`"]
pub struct Incoming<T: DeserializeOwned> {
pub struct Incoming<T, F: OwnedDeserializer<T>> {
/// [`SendStream`] to build [`Sender`].
sender: SendStream,
/// [`RecvStream`] to build [`Receiver`].
receiver: ReceiverStream<T>,
receiver: ReceiverStream<T, F>,
/// Requested type.
r#type: Option<Result<T, error::Incoming>>,
}

impl<T: DeserializeOwned> Debug for Incoming<T> {
impl<T, F> Debug for Incoming<T, F>
where
F: OwnedDeserializer<T>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming")
.field("sender", &self.sender)
Expand All @@ -31,12 +37,16 @@ impl<T: DeserializeOwned> Debug for Incoming<T> {
}
}

impl<T: DeserializeOwned> Incoming<T> {
impl<T, F> Incoming<T, F>
where
F: OwnedDeserializer<T> + Clone,
F::Error: SerializationError,
{
/// Builds a new [`Incoming`] from raw [`quinn`] types.
pub(super) fn new(sender: SendStream, receiver: RecvStream) -> Self {
pub(super) fn new(sender: SendStream, receiver: RecvStream, format: F) -> Self {
Self {
sender,
receiver: ReceiverStream::new(receiver),
receiver: ReceiverStream::new(receiver, format),
r#type: None,
}
}
Expand Down Expand Up @@ -80,12 +90,57 @@ impl<T: DeserializeOwned> Incoming<T> {
/// - [`error::Incoming::Receiver`] if receiving the type information to the
/// peer failed, see [`error::Receiver`] for more details
/// - [`error::Incoming::Closed`] if the stream was closed
pub async fn accept<
S: DeserializeOwned + Serialize + Send + 'static,
R: DeserializeOwned + Serialize + Send + 'static,
>(
pub async fn accept<S: Send + 'static, R: Send + 'static>(
self,
) -> Result<(Sender<S, F>, Receiver<R>), error::Incoming>
where
F: OwnedDeserializer<R> + Format<'static, S> + 'static,
<F as Format<'static, S>>::Error: SerializationError,
<F as Format<'static, R>>::Error: SerializationError,
{
let format = self.receiver.format.clone();
self.accept_with_format(format).await
}

/// Accept the incoming stream with the given types.
///
/// Use `S` and `R` to define which type this stream is sending and
/// receiving.
///
/// # Errors
/// - [`error::Incoming::Receiver`] if receiving the type information to the
/// peer failed, see [`error::Receiver`] for more details
/// - [`error::Incoming::Closed`] if the stream was closed
pub async fn accept_raw<S: Send + 'static, R: Send + 'static>(
self,
) -> Result<(Sender<S, F>, Receiver<R>), error::Incoming>
where
F: OwnedDeserializer<R> + Format<'static, S> + 'static,
<F as Format<'static, S>>::Error: SerializationError,
<F as Format<'static, R>>::Error: SerializationError,
{
let format = self.receiver.format.clone();
self.accept_with_format(format).await
}

/// Accept the incoming stream with the given types.
///
/// Use `S` and `R` to define which type this stream is sending and
/// receiving.
///
/// # Errors
/// - [`error::Incoming::Receiver`] if receiving the type information to the
/// peer failed, see [`error::Receiver`] for more details
/// - [`error::Incoming::Closed`] if the stream was closed
pub async fn accept_with_format<S: Send + 'static, R: Send + 'static, NewFormat>(
mut self,
) -> Result<(Sender<S>, Receiver<R>), error::Incoming> {
format: NewFormat,
) -> Result<(Sender<S, NewFormat>, Receiver<R>), error::Incoming>
where
NewFormat: OwnedDeserializer<R> + Format<'static, S> + Clone + 'static,
<NewFormat as Format<'static, S>>::Error: SerializationError,
<NewFormat as Format<'static, R>>::Error: SerializationError,
{
match self.r#type {
Some(Ok(_)) => (),
Some(Err(error)) => return Err(error),
Expand All @@ -100,8 +155,8 @@ impl<T: DeserializeOwned> Incoming<T> {
}
}

let sender = Sender::new(self.sender);
let receiver = Receiver::new(self.receiver.transmute());
let sender = Sender::new(self.sender, format.clone());
let receiver = Receiver::new(self.receiver.transmute(format));

Ok((sender, receiver))
}
Expand Down
Loading

0 comments on commit 60f1873

Please sign in to comment.