diff --git a/Cargo.lock b/Cargo.lock index a5b7ada6..848b6962 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -875,6 +875,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "displaydoc" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "dlv-list" version = "0.3.0" @@ -2829,14 +2840,14 @@ dependencies = [ [[package]] name = "tide-disco" -version = "0.4.6" +version = "0.4.7" dependencies = [ + "anyhow", "ark-serialize", "ark-std", "async-std", "async-trait", "async-tungstenite 0.24.0", - "bincode", "clap", "config", "derivative", @@ -2879,6 +2890,7 @@ dependencies = [ "tracing-subscriber", "tracing-test", "url", + "versioned-binary-serialization", ] [[package]] @@ -3312,6 +3324,18 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "versioned-binary-serialization" +version = "0.1.0" +source = "git+https://github.com/EspressoSystems/versioned-binary-serialization.git?tag=0.1.0#6b5bf0c0b74f8384c940880d5a988d5ec757de3e" +dependencies = [ + "anyhow", + "bincode", + "displaydoc", + "serde", + "serde_with", +] + [[package]] name = "waker-fn" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 9f056552..ac6d9853 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tide-disco" -version = "0.4.6" +version = "0.4.7" edition = "2021" authors = ["Espresso Systems "] description = "Discoverability for Tide" @@ -12,9 +12,9 @@ name = "hello-world" test = true [dependencies] +anyhow = "1.0" async-std = { version = "1.8.0", features = ["attributes"] } async-trait = "0.1.74" -bincode = "1.3.3" clap = { version = "4.5", features = ["derive"] } config = "0.13.4" derivative = "2.2" @@ -54,6 +54,7 @@ tracing-futures = "0.2" tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } url = "2.5.0" +versioned-binary-serialization = { git = "https://github.com/EspressoSystems/versioned-binary-serialization.git", tag = "0.1.0" } [target.'cfg(not(windows))'.dependencies] signal-hook-async-std = "0.2.2" diff --git a/examples/hello-world/main.rs b/examples/hello-world/main.rs index ebeaa073..097fd1b1 100644 --- a/examples/hello-world/main.rs +++ b/examples/hello-world/main.rs @@ -39,11 +39,12 @@ impl From for HelloError { } async fn serve(port: u16) -> io::Result<()> { - let mut app = App::<_, HelloError>::with_state(RwLock::new("Hello".to_string())); + let mut app = App::<_, HelloError, 0, 1>::with_state(RwLock::new("Hello".to_string())); app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap()); let mut api = - Api::, HelloError>::from_file("examples/hello-world/api.toml").unwrap(); + Api::, HelloError, 0, 1>::from_file("examples/hello-world/api.toml") + .unwrap(); api.with_version(env!("CARGO_PKG_VERSION").parse().unwrap()); // Can invoke by browsing diff --git a/src/api.rs b/src/api.rs index 0d1bb7b1..ead4587a 100644 --- a/src/api.rs +++ b/src/api.rs @@ -252,10 +252,10 @@ mod meta_defaults { /// TOML file and registered as a module of an [App](crate::App). #[derive(Derivative)] #[derivative(Debug(bound = ""))] -pub struct Api { +pub struct Api { meta: Arc, name: String, - routes: HashMap>, + routes: HashMap>, routes_by_path: HashMap>, #[derivative(Debug = "ignore")] health_check: Option>, @@ -265,28 +265,34 @@ pub struct Api { long_description: String, } -impl<'a, State, Error> IntoIterator for &'a Api { - type Item = &'a Route; - type IntoIter = Values<'a, String, Route>; +impl<'a, State, Error, const MAJOR: u16, const MINOR: u16> IntoIterator + for &'a Api +{ + type Item = &'a Route; + type IntoIter = Values<'a, String, Route>; fn into_iter(self) -> Self::IntoIter { self.routes.values() } } -impl IntoIterator for Api { - type Item = Route; - type IntoIter = IntoValues>; +impl IntoIterator + for Api +{ + type Item = Route; + type IntoIter = IntoValues>; fn into_iter(self) -> Self::IntoIter { self.routes.into_values() } } -impl Index<&str> for Api { - type Output = Route; +impl Index<&str> + for Api +{ + type Output = Route; - fn index(&self, index: &str) -> &Route { + fn index(&self, index: &str) -> &Route { &self.routes[index] } } @@ -296,20 +302,22 @@ impl Index<&str> for Api { /// This type iterates over all of the routes that have a given path. /// [routes_by_path](Api::routes_by_path), in turn, returns an iterator over paths whose items /// contain a [RoutesWithPath] iterator. -pub struct RoutesWithPath<'a, State, Error> { +pub struct RoutesWithPath<'a, State, Error, const MAJOR: u16, const MINOR: u16> { routes: std::slice::Iter<'a, String>, - api: &'a Api, + api: &'a Api, } -impl<'a, State, Error> Iterator for RoutesWithPath<'a, State, Error> { - type Item = &'a Route; +impl<'a, State, Error, const MAJOR: u16, const MINOR: u16> Iterator + for RoutesWithPath<'a, State, Error, MAJOR, MINOR> +{ + type Item = &'a Route; fn next(&mut self) -> Option { Some(&self.api.routes[self.routes.next()?]) } } -impl Api { +impl Api { /// Parse an API from a TOML specification. pub fn new(api: impl Into) -> Result { let mut api = api.into(); @@ -413,7 +421,9 @@ impl Api { } /// Iterate over groups of routes with the same path. - pub fn routes_by_path(&self) -> impl Iterator)> { + pub fn routes_by_path( + &self, + ) -> impl Iterator)> { self.routes_by_path.iter().map(|(path, routes)| { ( path.as_str(), @@ -441,7 +451,9 @@ impl Api { /// is contained in the API crate, it should result in a reasonable version: /// /// ``` - /// # fn ex(api: &mut tide_disco::Api<(), ()>) { + /// # const MAJOR: u16 = 0; + /// # const MINOR: u16 = 1; + /// # fn ex(api: &mut tide_disco::Api<(), (), MAJOR, MINOR>) { /// api.with_version(env!("CARGO_PKG_VERSION").parse().unwrap()); /// # } /// ``` @@ -479,8 +491,10 @@ impl Api { /// # use tide_disco::Api; /// /// type State = u64; + /// const MAJOR: u16 = 0; + /// const MINOR: u16 = 1; /// - /// # fn ex(api: &mut Api) { + /// # fn ex(api: &mut Api) { /// api.at("getstate", |req, state| async { Ok(*state) }.boxed()); /// # } /// ``` @@ -503,8 +517,10 @@ impl Api { /// # use tide_disco::Api; /// /// type State = Mutex; + /// const MAJOR: u16 = 0; + /// const MINOR: u16 = 1; /// - /// # fn ex(api: &mut Api) { + /// # fn ex(api: &mut Api) { /// api.at("increment", |req, state| async { /// let mut guard = state.lock().await; /// *guard += 1; @@ -634,8 +650,10 @@ impl Api { /// # use tide_disco::Api; /// /// type State = RwLock; + /// const MAJOR: u16 = 0; + /// const MINOR: u16 = 1; /// - /// # fn ex(api: &mut Api) { + /// # fn ex(api: &mut Api) { /// api.get("getstate", |req, state| async { Ok(*state) }.boxed()); /// # } /// ``` @@ -731,8 +749,10 @@ impl Api { /// # use tide_disco::Api; /// /// type State = RwLock; + /// const MAJOR: u16 = 0; + /// const MINOR: u16 = 1; /// - /// # fn ex(api: &mut Api) { + /// # fn ex(api: &mut Api) { /// api.post("increment", |req, state| async { /// *state += 1; /// Ok(*state) @@ -798,8 +818,10 @@ impl Api { /// # use tide_disco::Api; /// /// type State = RwLock; + /// const MAJOR: u16 = 0; + /// const MINOR: u16 = 1; /// - /// # fn ex(api: &mut Api) { + /// # fn ex(api: &mut Api) { /// api.post("replace", |req, state| async move { /// *state = req.integer_param("new_state")?; /// Ok(()) @@ -864,8 +886,10 @@ impl Api { /// # use tide_disco::Api; /// /// type State = RwLock>; + /// const MAJOR: u16 = 0; + /// const MINOR: u16 = 1; /// - /// # fn ex(api: &mut Api) { + /// # fn ex(api: &mut Api) { /// api.delete("state", |req, state| async { /// *state = None; /// Ok(()) @@ -932,8 +956,8 @@ impl Api { /// use futures::{FutureExt, SinkExt, StreamExt}; /// use tide_disco::{error::ServerError, socket::Connection, Api}; /// - /// # fn ex(api: &mut Api<(), ServerError>) { - /// api.socket("sum", |_req, mut conn: Connection, _state| async move { + /// # fn ex(api: &mut Api<(), ServerError, 0, 1>) { + /// api.socket("sum", |_req, mut conn: Connection, _state| async move { /// let mut sum = 0; /// while let Some(amount) = conn.next().await { /// sum += amount?; @@ -970,7 +994,7 @@ impl Api { + Sync + Fn( RequestParams, - socket::Connection, + socket::Connection, &State, ) -> BoxFuture<'_, Result<(), Error>>, ToClient: 'static + Serialize + ?Sized, @@ -998,7 +1022,10 @@ impl Api { State: 'static + Send + Sync, Error: 'static + Send + Display, { - self.register_socket_handler(name, socket::stream_handler(handler)) + self.register_socket_handler( + name, + socket::stream_handler::<_, _, _, _, MAJOR, MINOR>(handler), + ) } fn register_socket_handler( @@ -1065,9 +1092,11 @@ impl Api { /// counter: Counter, /// metrics: Registry, /// } + /// const MAJOR: u16 = 0; + /// const MINOR: u16 = 1; /// - /// # fn ex(_api: Api, ServerError>) -> Result<(), ApiError> { - /// let mut api: Api, ServerError>; + /// # fn ex(_api: Api, ServerError, MAJOR, MINOR>) -> Result<(), ApiError> { + /// let mut api: Api, ServerError, MAJOR, MINOR>; /// # api = _api; /// api.metrics("metrics", |_req, state| async move { /// state.counter.inc(); @@ -1132,7 +1161,7 @@ impl Api { State: 'static + Send + Sync, H: 'static + HealthCheck, { - self.health_check = Some(route::health_check_handler(handler)); + self.health_check = Some(route::health_check_handler::<_, _, MAJOR, MINOR>(handler)); self } @@ -1143,7 +1172,7 @@ impl Api { } else { // If there is no healthcheck handler registered, just return [HealthStatus::Available] // by default; after all, if this handler is getting hit at all, the service must be up. - route::health_check_response( + route::health_check_response::<_, MAJOR, MINOR>( &req.accept().unwrap_or_else(|_| { // The healthcheck endpoint is not allowed to fail, so just use the default content // type if we can't parse the Accept header. @@ -1172,7 +1201,7 @@ impl Api { pub fn map_err( self, f: impl 'static + Clone + Send + Sync + Fn(Error) -> Error2, - ) -> Api + ) -> Api where Error: 'static + Send + Sync, Error2: 'static, @@ -1239,7 +1268,8 @@ struct ReadHandler { } #[async_trait] -impl Handler for ReadHandler +impl Handler + for ReadHandler where F: 'static + Send @@ -1254,7 +1284,7 @@ where state: &State, ) -> Result> { let accept = req.accept()?; - response_from_result( + response_from_result::<_, _, MAJOR, MINOR>( &accept, state.read(|state| (self.handler)(req, state)).await, ) @@ -1268,7 +1298,8 @@ struct WriteHandler { } #[async_trait] -impl Handler for WriteHandler +impl Handler + for WriteHandler where F: 'static + Send @@ -1283,7 +1314,7 @@ where state: &State, ) -> Result> { let accept = req.accept()?; - response_from_result( + response_from_result::<_, _, MAJOR, MINOR>( &accept, state.write(|state| (self.handler)(req, state)).await, ) @@ -1315,6 +1346,7 @@ mod test { use prometheus::{Counter, Registry}; use std::borrow::Cow; use toml::toml; + use versioned_binary_serialization::{BinarySerializer, Serializer}; #[cfg(windows)] use async_tungstenite::tungstenite::Error as WsError; @@ -1344,7 +1376,7 @@ mod test { #[async_std::test] async fn test_socket_endpoint() { - let mut app = App::<_, ServerError>::with_state(RwLock::new(())); + let mut app = App::<_, ServerError, 0, 1>::with_state(RwLock::new(())); let api_toml = toml! { [meta] FORMAT_VERSION = "0.1.0" @@ -1365,7 +1397,7 @@ mod test { let mut api = app.module::("mod", api_toml).unwrap(); api.socket( "echo", - |_req, mut conn: Connection, _state| { + |_req, mut conn: Connection, _state| { async move { while let Some(msg) = conn.next().await { conn.send(&msg?).await?; @@ -1376,23 +1408,29 @@ mod test { }, ) .unwrap() - .socket("once", |_req, mut conn: Connection<_, (), _>, _state| { - async move { - conn.send("msg").boxed().await?; - Ok(()) - } - .boxed() - }) + .socket( + "once", + |_req, mut conn: Connection<_, (), _, 0, 1>, _state| { + async move { + conn.send("msg").boxed().await?; + Ok(()) + } + .boxed() + }, + ) .unwrap() - .socket("error", |_req, _conn: Connection<(), (), _>, _state| { - async move { - Err(ServerError::catch_all( - StatusCode::InternalServerError, - "an error message".to_string(), - )) - } - .boxed() - }) + .socket( + "error", + |_req, _conn: Connection<(), (), _, 0, 1>, _state| { + async move { + Err(ServerError::catch_all( + StatusCode::InternalServerError, + "an error message".to_string(), + )) + } + .boxed() + }, + ) .unwrap(); } let port = pick_unused_port().unwrap(); @@ -1420,9 +1458,11 @@ mod test { ); // Send a binary message. - conn.send(Message::Binary(bincode::serialize("goodbye").unwrap())) - .await - .unwrap(); + conn.send(Message::Binary( + Serializer::<0, 1>::serialize("goodbye").unwrap(), + )) + .await + .unwrap(); assert_eq!( conn.next().await.unwrap().unwrap(), Message::Text(serde_json::to_string("goodbye").unwrap()) @@ -1441,16 +1481,18 @@ mod test { .unwrap(); assert_eq!( conn.next().await.unwrap().unwrap(), - Message::Binary(bincode::serialize("hello").unwrap()) + Message::Binary(Serializer::<0, 1>::serialize("hello").unwrap()) ); // Send a binary message. - conn.send(Message::Binary(bincode::serialize("goodbye").unwrap())) - .await - .unwrap(); + conn.send(Message::Binary( + Serializer::<0, 1>::serialize("goodbye").unwrap(), + )) + .await + .unwrap(); assert_eq!( conn.next().await.unwrap().unwrap(), - Message::Binary(bincode::serialize("goodbye").unwrap()) + Message::Binary(Serializer::<0, 1>::serialize("goodbye").unwrap()) ); // Test a stream that exits normally. @@ -1483,7 +1525,7 @@ mod test { #[async_std::test] async fn test_stream_endpoint() { - let mut app = App::<_, ServerError>::with_state(RwLock::new(())); + let mut app = App::<_, ServerError, 0, 1>::with_state(RwLock::new(())); let api_toml = toml! { [meta] FORMAT_VERSION = "0.1.0" @@ -1566,7 +1608,7 @@ mod test { #[async_std::test] async fn test_custom_healthcheck() { - let mut app = App::<_, ServerError>::with_state(HealthStatus::Available); + let mut app = App::<_, ServerError, 0, 1>::with_state(HealthStatus::Available); let api_toml = toml! { [meta] FORMAT_VERSION = "0.1.0" @@ -1610,7 +1652,7 @@ mod test { metrics.register(Box::new(counter.clone())).unwrap(); let state = State { metrics, counter }; - let mut app = App::<_, ServerError>::with_state(RwLock::new(state)); + let mut app = App::<_, ServerError, 0, 1>::with_state(RwLock::new(state)); let api_toml = toml! { [meta] FORMAT_VERSION = "0.1.0" diff --git a/src/app.rs b/src/app.rs index 40d98e30..1b97bbc4 100644 --- a/src/app.rs +++ b/src/app.rs @@ -44,9 +44,9 @@ pub use tide::listener::{Listener, ToListener}; /// constructing an [Api] for each module and calling [App::register_module]. Once all of the /// desired modules are registered, the app can be converted into an asynchronous server task using /// [App::serve]. -pub struct App { +pub struct App { // Map from base URL to module API. - apis: HashMap>, + apis: HashMap>, state: Arc, app_version: Option, } @@ -58,7 +58,9 @@ pub enum AppError { ModuleAlreadyExists, } -impl App { +impl + App +{ /// Create a new [App] with a given state. pub fn with_state(state: State) -> Self { Self { @@ -73,7 +75,7 @@ impl App { &'a mut self, base_url: &'a str, api: impl Into, - ) -> Result, AppError> + ) -> Result, AppError> where Error: From, ModuleError: 'static + Send + Sync, @@ -93,7 +95,7 @@ impl App { pub fn register_module( &mut self, base_url: &str, - api: Api, + api: Api, ) -> Result<&mut Self, AppError> where Error: From, @@ -133,7 +135,9 @@ impl App { /// is contained in the application crate, it should result in a reasonable version: /// /// ``` - /// # fn ex(app: &mut tide_disco::App<(), ()>) { + /// # const MAJOR: u16 = 0; + /// # const MINOR: u16 = 1; + /// # fn ex(app: &mut tide_disco::App<(), (), MAJOR, MINOR>) { /// app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap()); /// # } /// ``` @@ -209,7 +213,13 @@ lazy_static! { }; } -impl App { +impl< + State: Send + Sync + 'static, + Error: 'static + crate::Error, + const MAJOR: u16, + const MINOR: u16, + > App +{ /// Serve the [App] asynchronously. pub async fn serve>>(self, listener: L) -> io::Result<()> { let state = Arc::new(self); @@ -225,7 +235,7 @@ impl App); + server.with(add_error_body::<_, Error, MAJOR, MINOR>); server.with( CorsMiddleware::new() .allow_methods("GET, POST".parse::().unwrap()) @@ -298,9 +308,9 @@ impl App(err).into_tide_error() - }) + respond_with::<_, _, MAJOR, MINOR>(&accept, api.version()).map_err( + |err| Error::from_route_error::(err).into_tide_error(), + ) } }); } @@ -315,13 +325,13 @@ impl App(&accept, res)) }); server .at("version") .get(|req: tide::Request>| async move { let accept = RequestParams::accept_from_headers(&req)?; - respond_with(&accept, req.state().version()) + respond_with::<_, _, MAJOR, MINOR>(&accept, req.state().version()) .map_err(|err| Error::from_route_error::(err).into_tide_error()) }); @@ -374,7 +384,7 @@ impl App>, - route: &Route, + route: &Route, method: http::Method, ) { let name = route.name(); @@ -400,7 +410,7 @@ impl App>, - route: &Route, + route: &Route, ) { let name = route.name(); if route.has_handler() { @@ -425,7 +435,7 @@ impl App>, - route: &Route, + route: &Route, ) { let name = route.name(); if route.has_handler() { @@ -472,7 +482,7 @@ impl App>, - route: &Route, + route: &Route, ) { let name = route.name(); endpoint.all(move |req: tide::Request>| { @@ -503,15 +513,16 @@ impl MetricsMiddleware { } } -impl tide::Middleware>> for MetricsMiddleware +impl + tide::Middleware>> for MetricsMiddleware where State: Send + Sync + 'static, Error: 'static + crate::Error, { fn handle<'a, 'b, 't>( &'a self, - req: tide::Request>>, - next: tide::Next<'b, Arc>>, + req: tide::Request>>, + next: tide::Next<'b, Arc>>, ) -> BoxFuture<'t, tide::Result> where 'a: 't, @@ -528,7 +539,7 @@ where } // Look at the `Accept` header. If the requested content type is plaintext, we consider // it a metrics request. Other endpoints have typed responses yielding either JSON or - // bincode. + // binary. let accept = RequestParams::accept_from_headers(&req)?; let reponse_ty = best_response_type(&accept, &[mime::PLAIN, mime::JSON, mime::BYTE_STREAM])?; @@ -553,8 +564,8 @@ where } } -async fn request_params( - req: tide::Request>>, +async fn request_params( + req: tide::Request>>, params: &[RequestParam], ) -> Result { RequestParams::new(req, params) @@ -605,7 +616,12 @@ pub struct AppVersion { /// body of the response. /// /// If the response does not contain an error, it is passed through unchanged. -fn add_error_body( +fn add_error_body< + T: Clone + Send + Sync + 'static, + E: crate::Error, + const MAJOR: u16, + const MINOR: u16, +>( req: tide::Request, next: tide::Next, ) -> BoxFuture { @@ -618,7 +634,7 @@ fn add_error_body( // Try to add the error to the response body using a format accepted by the client. If // we cannot do that (for example, if the client requested a format that is incompatible // with a serialized error) just add the error as a string using plaintext. - let (body, content_type) = route::response_body::<_, E>(&accept, &error) + let (body, content_type) = route::response_body::<_, E, MAJOR, MINOR>(&accept, &error) .unwrap_or_else(|_| (error.to_string().into(), mime::PLAIN)); res.set_body(body); res.set_content_type(content_type); @@ -629,32 +645,34 @@ fn add_error_body( }) } -pub struct Module<'a, State, Error, ModuleError> +pub struct Module<'a, State, Error, ModuleError, const MAJOR: u16, const MINOR: u16> where State: 'static + Send + Sync, Error: 'static + From, ModuleError: 'static + Send + Sync, { - app: &'a mut App, + app: &'a mut App, base_url: &'a str, // This is only an [Option] so we can [take] out of it during [drop]. - api: Option>, + api: Option>, } -impl<'a, State, Error, ModuleError> Deref for Module<'a, State, Error, ModuleError> +impl<'a, State, Error, ModuleError, const MAJOR: u16, const MINOR: u16> Deref + for Module<'a, State, Error, ModuleError, MAJOR, MINOR> where State: 'static + Send + Sync, Error: 'static + From, ModuleError: 'static + Send + Sync, { - type Target = Api; + type Target = Api; fn deref(&self) -> &Self::Target { self.api.as_ref().unwrap() } } -impl<'a, State, Error, ModuleError> DerefMut for Module<'a, State, Error, ModuleError> +impl<'a, State, Error, ModuleError, const MAJOR: u16, const MINOR: u16> DerefMut + for Module<'a, State, Error, ModuleError, MAJOR, MINOR> where State: 'static + Send + Sync, Error: 'static + From, @@ -665,7 +683,8 @@ where } } -impl<'a, State, Error, ModuleError> Drop for Module<'a, State, Error, ModuleError> +impl<'a, State, Error, ModuleError, const MAJOR: u16, const MINOR: u16> Drop + for Module<'a, State, Error, ModuleError, MAJOR, MINOR> where State: 'static + Send + Sync, Error: 'static + From, @@ -691,6 +710,7 @@ mod test { use portpicker::pick_unused_port; use std::borrow::Cow; use toml::toml; + use versioned_binary_serialization::{BinarySerializer, Serializer}; #[derive(Clone, Copy, Debug)] struct FakeMetrics; @@ -708,7 +728,7 @@ mod test { async fn test_method_dispatch() { use crate::http::Method::*; - let mut app = App::<_, ServerError>::with_state(RwLock::new(FakeMetrics)); + let mut app = App::<_, ServerError, 0, 1>::with_state(RwLock::new(FakeMetrics)); let api_toml = toml! { [meta] FORMAT_VERSION = "0.1.0" @@ -757,7 +777,7 @@ mod test { .unwrap() .socket( "socket_test", - |_req, mut conn: Connection<_, (), _>, _state| { + |_req, mut conn: Connection<_, (), _, 0, 1>, _state| { async move { conn.send("SOCKET").await.unwrap(); Ok(()) @@ -819,7 +839,7 @@ mod test { let msg = conn.next().await.unwrap().unwrap(); let body: String = match msg { Message::Text(m) => serde_json::from_str(&m).unwrap(), - Message::Binary(m) => bincode::deserialize(&m).unwrap(), + Message::Binary(m) => Serializer::<0, 1>::deserialize(&m).unwrap(), m => panic!("expected Text or Binary message, but got {}", m), }; assert_eq!(body, "SOCKET"); @@ -828,7 +848,7 @@ mod test { /// Test route dispatching for routes with patterns containing different parmaeters #[async_std::test] async fn test_param_dispatch() { - let mut app = App::<_, ServerError>::with_state(RwLock::new(())); + let mut app = App::<_, ServerError, 0, 1>::with_state(RwLock::new(())); let api_toml = toml! { [meta] FORMAT_VERSION = "0.1.0" diff --git a/src/lib.rs b/src/lib.rs index ecc03634..01fb5ed6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,11 +50,13 @@ //! //! type State = (); //! type Error = ServerError; +//! const MAJOR: u16 = 0; +//! const MINOR: u16 = 1; //! //! let spec: toml::Value = toml::from_str( //! std::str::from_utf8(&std::fs::read("/path/to/api.toml").unwrap()).unwrap(), //! ).unwrap(); -//! let mut api = Api::::new(spec)?; +//! let mut api = Api::::new(spec)?; //! # Ok(()) //! # } //! ``` @@ -72,9 +74,11 @@ //! //! ```no_run //! # use tide_disco::Api; +//! # const MAJOR: u16 = 0; +//! # const MINOR: u16 = 1; //! # fn main() -> Result<(), tide_disco::api::ApiError> { //! # let spec: toml::Value = toml::from_str(std::str::from_utf8(&std::fs::read("/path/to/api.toml").unwrap()).unwrap()).unwrap(); -//! # let mut api = Api::<(), tide_disco::error::ServerError>::new(spec)?; +//! # let mut api = Api::<(), tide_disco::error::ServerError, MAJOR, MINOR>::new(spec)?; //! use futures::FutureExt; //! //! api.get("hello", |req, state| async move { Ok("Hello, world!") }.boxed())?; @@ -90,12 +94,17 @@ //! ```no_run //! # type State = (); //! # type Error = tide_disco::error::ServerError; +//! # const MAJOR: u16 = 0; +//! # const MINOR: u16 = 1; //! # #[async_std::main] async fn main() { //! # let spec: toml::Value = toml::from_str(std::str::from_utf8(&std::fs::read("/path/to/api.toml").unwrap()).unwrap()).unwrap(); -//! # let api = tide_disco::Api::::new(spec).unwrap(); +//! # let api = tide_disco::Api::::new(spec).unwrap(); //! use tide_disco::App; //! -//! let mut app = App::::with_state(()); +//! const MAJOR: u16 = 0; +//! const MINOR: u16 = 1; +//! +//! let mut app = App::::with_state(()); //! app.register_module("api", api); //! app.serve("http://localhost:8080").await; //! # } @@ -158,7 +167,7 @@ //! implements [Fn], not just static function pointers. Here is what we would _like_ to write: //! //! ```ignore -//! impl Api { +//! impl Api { //! pub fn at(&mut self, route: &str, handler: F) //! where //! F: for<'a> Fn<(RequestParams, &'a State)>, @@ -183,7 +192,7 @@ //! `F`. Here is the actual (partial) signature of [at](Api::at): //! //! ```ignore -//! impl Api { +//! impl Api { //! pub fn at(&mut self, route: &str, handler: F) //! where //! F: for<'a> Fn(RequestParams, &'a State) -> BoxFuture<'a, Result>, @@ -203,7 +212,10 @@ //! type State = RwLock; //! type Error = (); //! -//! fn define_routes(api: &mut Api) { +//! const MAJOR: u16 = 0; +//! const MINOR: u16 = 1; +//! +//! fn define_routes(api: &mut Api) { //! api.at("someroute", |_req, state: &State| async { //! Ok(*state.read().await) //! }.boxed()); @@ -221,12 +233,14 @@ //! //! type State = RwLock; //! type Error = (); +//! const MAJOR: u16 = 0; +//! const MINOR: u16 = 1; //! //! async fn handler(_req: RequestParams, state: &State) -> Result { //! Ok(*state.read().await) //! } //! -//! fn register(api: &mut Api) { +//! fn register(api: &mut Api) { //! api.at("someroute", |req, state: &State| handler(req, state).boxed()); //! } //! ``` diff --git a/src/metrics.rs b/src/metrics.rs index 2e15b3cd..5adcd8ca 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -54,7 +54,8 @@ impl Metrics for prometheus::Registry { pub(crate) struct Handler(F); #[async_trait] -impl route::Handler for Handler +impl + route::Handler for Handler where F: 'static + Send + Sync + Fn(RequestParams, &State::State) -> BoxFuture, Error>>, T: 'static + Clone + Metrics, diff --git a/src/request.rs b/src/request.rs index d212a1d8..150f26e4 100644 --- a/src/request.rs +++ b/src/request.rs @@ -13,6 +13,7 @@ use std::fmt::Display; use strum_macros::EnumString; use tagged_base64::TaggedBase64; use tide::http::{self, content::Accept, mime::Mime, Headers}; +use versioned_binary_serialization::{BinarySerializer, Serializer}; #[derive(Clone, Debug, Snafu, Deserialize, Serialize)] pub enum RequestError { @@ -35,8 +36,8 @@ pub enum RequestError { #[snafu(display("Unable to deserialize from JSON"))] Json, - #[snafu(display("Unable to deserialize from bincode"))] - Bincode, + #[snafu(display("Unable to deserialize from binary"))] + Binary, #[snafu(display("Unable to deserialise from tagged base 64: {}", reason))] TaggedBase64 { reason: String }, @@ -403,7 +404,7 @@ impl RequestParams { /// Deserialize the body of a request. /// /// The Content-Type header is used to determine the serialization format. - pub fn body_auto(&self) -> Result + pub fn body_auto(&self) -> Result where T: serde::de::DeserializeOwned, { @@ -412,7 +413,8 @@ impl RequestParams { "application/json" => self.body_json(), "application/octet-stream" => { let bytes = self.body_bytes(); - bincode::deserialize(&bytes).map_err(|_err| RequestError::Bincode {}) + Serializer::::deserialize(&bytes) + .map_err(|_err| RequestError::Binary {}) } _content_type => Err(RequestError::UnsupportedContentType {}), } diff --git a/src/route.rs b/src/route.rs index b1895642..19140c17 100644 --- a/src/route.rs +++ b/src/route.rs @@ -36,6 +36,7 @@ use tide::{ Body, }; use tide_websockets::WebSocketConnection; +use versioned_binary_serialization::{BinarySerializer, Serializer}; /// An error returned by a route handler. /// @@ -46,7 +47,7 @@ pub enum RouteError { AppSpecific(E), Request(RequestError), UnsupportedContentType, - Bincode(bincode::Error), + Binary(anyhow::Error), Json(serde_json::Error), Tide(tide::Error), ExportMetrics(String), @@ -59,7 +60,7 @@ impl Display for RouteError { Self::AppSpecific(err) => write!(f, "{}", err), Self::Request(err) => write!(f, "{}", err), Self::UnsupportedContentType => write!(f, "requested content type is not supported"), - Self::Bincode(err) => write!(f, "error creating byte stream: {}", err), + Self::Binary(err) => write!(f, "error creating byte stream: {}", err), Self::Json(err) => write!(f, "error creating JSON response: {}", err), Self::Tide(err) => write!(f, "{}", err), Self::ExportMetrics(msg) => write!(f, "error exporting metrics: {msg}"), @@ -85,7 +86,7 @@ impl RouteError { RouteError::AppSpecific(e) => RouteError::AppSpecific(f(e)), RouteError::Request(e) => RouteError::Request(e), RouteError::UnsupportedContentType => RouteError::UnsupportedContentType, - RouteError::Bincode(err) => RouteError::Bincode(err), + RouteError::Binary(err) => RouteError::Binary(err), RouteError::Json(err) => RouteError::Json(err), RouteError::Tide(err) => RouteError::Tide(err), RouteError::ExportMetrics(msg) => RouteError::ExportMetrics(msg), @@ -112,7 +113,9 @@ impl From for RouteError { /// return type of a handler function. The types which are preserved, `State` and `Error`, should be /// the same for all handlers in an API module. #[async_trait] -pub(crate) trait Handler: 'static + Send + Sync { +pub(crate) trait Handler: + 'static + Send + Sync +{ async fn handle( &self, req: RequestParams, @@ -138,7 +141,8 @@ pub(crate) trait Handler: 'static + Send + Sync { pub(crate) struct FnHandler(F); #[async_trait] -impl Handler for FnHandler +impl Handler + for FnHandler where F: 'static + Send + Sync + Fn(RequestParams, &State) -> BoxFuture<'_, Result>, T: Serialize, @@ -150,21 +154,26 @@ where state: &State, ) -> Result> { let accept = req.accept()?; - response_from_result(&accept, (self.0)(req, state).await) + response_from_result::<_, _, MAJOR, MINOR>(&accept, (self.0)(req, state).await) } } -pub(crate) fn response_from_result( +pub(crate) fn response_from_result( accept: &Accept, res: Result, ) -> Result> { res.map_err(RouteError::AppSpecific) - .and_then(|res| respond_with(accept, &res)) + .and_then(|res| respond_with::<_, _, MAJOR, MINOR>(accept, &res)) } #[async_trait] -impl, State: 'static + Send + Sync, Error> Handler - for Box +impl< + H: ?Sized + Handler, + State: 'static + Send + Sync, + Error, + const MAJOR: u16, + const MINOR: u16, + > Handler for Box { async fn handle( &self, @@ -175,24 +184,26 @@ impl, State: 'static + Send + Sync, Error> Han } } -enum RouteImplementation { +enum RouteImplementation { Http { method: http::Method, - handler: Option>>, + handler: Option>>, }, Socket { handler: Option>, }, Metrics { - handler: Option>>, + handler: Option>>, }, } -impl RouteImplementation { +impl + RouteImplementation +{ fn map_err( self, f: impl 'static + Send + Sync + Fn(Error) -> Error2, - ) -> RouteImplementation + ) -> RouteImplementation where State: 'static + Send + Sync, Error: 'static + Send + Sync, @@ -202,10 +213,12 @@ impl RouteImplementation { Self::Http { method, handler } => RouteImplementation::Http { method, handler: handler.map(|h| { - let h: Box> = - Box::new(MapErr::>, _, Error>::new( - h, f, - )); + let h: Box> = + Box::new(MapErr::< + Box>, + _, + Error, + >::new(h, f)); h }), }, @@ -214,10 +227,12 @@ impl RouteImplementation { }, Self::Metrics { handler } => RouteImplementation::Metrics { handler: handler.map(|h| { - let h: Box> = - Box::new(MapErr::>, _, Error>::new( - h, f, - )); + let h: Box> = + Box::new(MapErr::< + Box>, + _, + Error, + >::new(h, f)); h }), }, @@ -233,14 +248,14 @@ impl RouteImplementation { /// simply returns information about the route. #[derive(Derivative)] #[derivative(Debug(bound = ""))] -pub struct Route { +pub struct Route { name: String, patterns: Vec, params: Vec, doc: String, meta: Arc, #[derivative(Debug = "ignore")] - handler: RouteImplementation, + handler: RouteImplementation, } #[derive(Clone, Debug, Snafu)] @@ -258,7 +273,7 @@ pub enum RouteParseError { RouteMustBeTable, } -impl Route { +impl Route { /// Parse a [Route] from a TOML specification. /// /// The specification must be a table containing at least the following keys: @@ -381,7 +396,7 @@ impl Route { pub fn map_err( self, f: impl 'static + Send + Sync + Fn(Error) -> Error2, - ) -> Route + ) -> Route where State: 'static + Send + Sync, Error: 'static + Send + Sync, @@ -424,10 +439,10 @@ impl Route { } } -impl Route { +impl Route { pub(crate) fn set_handler( &mut self, - h: impl Handler, + h: impl Handler, ) -> Result<(), RouteError> { match &mut self.handler { RouteImplementation::Http { handler, .. } => { @@ -515,7 +530,8 @@ impl Route { } #[async_trait] -impl Handler for Route +impl Handler + for Route where Error: 'static, State: 'static + Send + Sync, @@ -555,9 +571,10 @@ impl MapErr { } #[async_trait] -impl Handler for MapErr +impl + Handler for MapErr where - H: Handler, + H: Handler, F: 'static + Send + Sync + Fn(Error1) -> Error2, State: 'static + Send + Sync, Error1: 'static + Send + Sync, @@ -584,10 +601,13 @@ where pub(crate) type HealthCheckHandler = Box BoxFuture<'_, tide::Response>>; -pub(crate) fn health_check_response(accept: &Accept, health: H) -> tide::Response { +pub(crate) fn health_check_response( + accept: &Accept, + health: H, +) -> tide::Response { let status = health.status(); - let (body, content_type) = - response_body::(accept, health).unwrap_or_else(|err| { + let (body, content_type) = response_body::(accept, health) + .unwrap_or_else(|err| { let msg = format!( "health status was {}, but there was an error generating the response: {}", status, err @@ -604,7 +624,7 @@ pub(crate) fn health_check_response(accept: &Accept, health: H) /// /// Given a handler, this function can be used to derive a new, type-erased [HealthCheckHandler] /// that takes only [RequestParams] and returns a generic [tide::Response]. -pub(crate) fn health_check_handler( +pub(crate) fn health_check_handler( handler: impl 'static + Send + Sync + Fn(&State) -> BoxFuture, ) -> HealthCheckHandler where @@ -622,19 +642,19 @@ where let future = handler(state); async move { let health = future.await; - health_check_response(&accept, health) + health_check_response::<_, MAJOR, MINOR>(&accept, health) } .boxed() }) } -pub(crate) fn response_body( +pub(crate) fn response_body( accept: &Accept, body: T, ) -> Result<(Body, Mime), RouteError> { let ty = best_response_type(accept, &[mime::JSON, mime::BYTE_STREAM])?; if ty == mime::BYTE_STREAM { - let bytes = bincode::serialize(&body).map_err(RouteError::Bincode)?; + let bytes = Serializer::::serialize(&body).map_err(RouteError::Binary)?; Ok((bytes.into(), mime::BYTE_STREAM)) } else if ty == mime::JSON { let json = serde_json::to_string(&body).map_err(RouteError::Json)?; @@ -644,11 +664,11 @@ pub(crate) fn response_body( } } -pub(crate) fn respond_with( +pub(crate) fn respond_with( accept: &Accept, body: T, ) -> Result> { - let (body, content_type) = response_body(accept, body)?; + let (body, content_type) = response_body::<_, _, MAJOR, MINOR>(accept, body)?; Ok(tide::Response::builder(StatusCode::Ok) .body(body) .content_type(content_type) diff --git a/src/socket.rs b/src/socket.rs index d4acbb0f..9c6e1e02 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -29,6 +29,7 @@ use tide_websockets::{ tungstenite::protocol::frame::{coding::CloseCode, CloseFrame}, Message, WebSocketConnection, }; +use versioned_binary_serialization::{BinarySerializer, Serializer}; /// An error returned by a socket handler. /// @@ -39,7 +40,7 @@ use tide_websockets::{ pub enum SocketError { AppSpecific(E), Request(RequestError), - Bincode(bincode::Error), + Binary(anyhow::Error), Json(serde_json::Error), WebSockets(tide_websockets::Error), UnsupportedMessageType, @@ -65,7 +66,7 @@ impl SocketError { match self { Self::AppSpecific(e) => SocketError::AppSpecific(f(e)), Self::Request(e) => SocketError::Request(e), - Self::Bincode(e) => SocketError::Bincode(e), + Self::Binary(e) => SocketError::Binary(e), Self::Json(e) => SocketError::Json(e), Self::WebSockets(e) => SocketError::WebSockets(e), Self::UnsupportedMessageType => SocketError::UnsupportedMessageType, @@ -82,7 +83,7 @@ impl Display for SocketError { match self { Self::AppSpecific(e) => write!(f, "{}", e), Self::Request(e) => write!(f, "{}", e), - Self::Bincode(e) => write!(f, "error creating byte stream: {}", e), + Self::Binary(e) => write!(f, "error creating byte stream: {}", e), Self::Json(e) => write!(f, "error creating JSON message: {}", e), Self::WebSockets(e) => write!(f, "WebSockets protocol error: {}", e), Self::UnsupportedMessageType => { @@ -104,9 +105,9 @@ impl From for SocketError { } } -impl From for SocketError { - fn from(err: bincode::Error) -> Self { - Self::Bincode(err) +impl From for SocketError { + fn from(err: anyhow::Error) -> Self { + Self::Binary(err) } } @@ -124,7 +125,7 @@ impl From for SocketError { #[derive(Clone, Copy, Debug)] enum MessageType { - Bincode, + Binary, Json, } @@ -132,7 +133,7 @@ enum MessageType { /// /// [Connection] implements [Stream], which can be used to receive `FromClient` messages from the /// client, and [Sink] which can be used to send `ToClient` messages to the client. -pub struct Connection { +pub struct Connection { conn: WebSocketConnection, // [Sink] wrapper around `conn` sink: Pin>>>, @@ -140,8 +141,8 @@ pub struct Connection { _phantom: PhantomData ()>, } -impl Stream - for Connection +impl Stream + for Connection { type Item = Result>; @@ -152,7 +153,9 @@ impl Stream Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))), Poll::Ready(Some(Ok(msg))) => Poll::Ready(Some(match msg { - Message::Binary(bytes) => bincode::deserialize(&bytes).map_err(SocketError::from), + Message::Binary(bytes) => { + Serializer::::deserialize(&bytes).map_err(SocketError::from) + } Message::Text(s) => serde_json::from_str(&s).map_err(SocketError::from), _ => Err(SocketError::UnsupportedMessageType), })), @@ -161,8 +164,8 @@ impl Stream } } -impl Sink<&ToClient> - for Connection +impl + Sink<&ToClient> for Connection { type Error = SocketError; @@ -172,7 +175,7 @@ impl Sink<&ToClient> fn start_send(mut self: Pin<&mut Self>, item: &ToClient) -> Result<(), Self::Error> { let msg = match self.accept { - MessageType::Bincode => Message::Binary(bincode::serialize(item)?), + MessageType::Binary => Message::Binary(Serializer::::serialize(item)?), MessageType::Json => Message::Text(serde_json::to_string(item)?), }; self.sink @@ -190,7 +193,9 @@ impl Sink<&ToClient> } } -impl Drop for Connection { +impl Drop + for Connection +{ fn drop(&mut self) { // This is the idiomatic way to implement [drop] for a type that uses pinning. Since [drop] // is implicitly called with `&mut self` even on types that were pinned, we place any @@ -206,21 +211,23 @@ impl Drop for Connection( - _this: Pin<&mut Connection>, + fn inner_drop( + _this: Pin<&mut Connection>, ) { // Any logic goes here. } } } -impl Connection { +impl + Connection +{ fn new(accept: &Accept, conn: WebSocketConnection) -> Result> { let ty = best_response_type(accept, &[mime::JSON, mime::BYTE_STREAM])?; let ty = if ty == mime::JSON { MessageType::Json } else if ty == mime::BYTE_STREAM { - MessageType::Bincode + MessageType::Binary } else { unreachable!() }; @@ -268,14 +275,16 @@ pub(crate) type Handler = Box< + Fn(RequestParams, WebSocketConnection, &State) -> BoxFuture>>, >; -pub(crate) fn handler(f: F) -> Handler +pub(crate) fn handler( + f: F, +) -> Handler where F: 'static + Send + Sync + Fn( RequestParams, - Connection, + Connection, &State, ) -> BoxFuture>, State: 'static + Send + Sync, @@ -290,13 +299,13 @@ where }) } -struct StreamHandler(F); +struct StreamHandler(F); -impl StreamHandler { +impl StreamHandler { fn handle<'a, State, Error, Msg>( &self, req: RequestParams, - mut conn: Connection, + mut conn: Connection, state: &'a State, ) -> BoxFuture<'a, Result<(), SocketError>> where @@ -316,25 +325,29 @@ impl StreamHandler { } } -pub(crate) fn stream_handler(f: F) -> Handler +pub(crate) fn stream_handler( + f: F, +) -> Handler where F: 'static + Send + Sync + Fn(RequestParams, &State) -> BoxStream>, State: 'static + Send + Sync, Msg: 'static + Serialize + Send + Sync, Error: 'static + Send + Display, { - let handler = StreamHandler(f); + let handler: StreamHandler = StreamHandler(f); raw_handler(move |req, conn, state| handler.handle(req, conn, state)) } -fn raw_handler(f: F) -> Handler +fn raw_handler( + f: F, +) -> Handler where F: 'static + Send + Sync + Fn( RequestParams, - Connection, + Connection, &State, ) -> BoxFuture>>, State: 'static + Send + Sync, diff --git a/src/status.rs b/src/status.rs index e4a9f5dc..27aeb52a 100644 --- a/src/status.rs +++ b/src/status.rs @@ -532,6 +532,7 @@ impl StatusCode { #[cfg(test)] mod test { use super::*; + use versioned_binary_serialization::{BinarySerializer, Serializer}; #[test] fn test_status_code() { @@ -547,10 +548,13 @@ mod test { ); assert_eq!(code, u16::from(status)); - // Test bincode round trip. + // Test binary round trip. assert_eq!( status, - bincode::deserialize::(&bincode::serialize(&status).unwrap()).unwrap() + Serializer::<0, 1>::deserialize::( + &Serializer::<0, 1>::serialize(&status).unwrap() + ) + .unwrap() ); // Test JSON round trip, readability, and backwards compatibility.