diff --git a/crates/bonsaidb-client/src/client.rs b/crates/bonsaidb-client/src/client.rs index 3e9adf1e2f..4b61b434ff 100644 --- a/crates/bonsaidb-client/src/client.rs +++ b/crates/bonsaidb-client/src/client.rs @@ -572,7 +572,7 @@ impl AsyncClient { match result { Ok(response) => response?, - Err(_) => Err(Error::RequestTimeout), + Err(_) => Err(Error::request_timeout()), } } @@ -1041,7 +1041,7 @@ async fn disconnect_pending_requests( drop( pending .responder - .send(Err(pending_error.take().unwrap_or(Error::Disconnected))), + .send(Err(pending_error.take().unwrap_or(Error::disconnected()))), ); } } diff --git a/crates/bonsaidb-client/src/client/quic_worker.rs b/crates/bonsaidb-client/src/client/quic_worker.rs index b3ac210b9e..7c3dcf0726 100644 --- a/crates/bonsaidb-client/src/client/quic_worker.rs +++ b/crates/bonsaidb-client/src/client/quic_worker.rs @@ -77,7 +77,7 @@ async fn connect_and_process( { Ok(Ok(result)) => result, Ok(Err(err)) => return Err((Some(initial_request), Some(err))), - Err(_) => return Err((Some(initial_request), Some(Error::ConnectTimeout))), + Err(_) => return Err((Some(initial_request), Some(Error::connect_timeout()))), }; let outstanding_requests = OutstandingRequestMapHandle::default(); @@ -108,7 +108,7 @@ async fn connect_and_process( request_receiver, payload_sender ), - async { request_processor.await.map_err(|_| Error::Disconnected)? } + async { request_processor.await.map_err(|_| Error::disconnected())? } ) { let mut pending_error = Some(err); // Our socket was disconnected, clear the outstanding requests before returning. @@ -136,7 +136,7 @@ async fn process_requests( drop(payload_sender.finish()); // Return an error to make sure try_join returns. - Err(Error::Disconnected) + Err(Error::disconnected()) } pub async fn process( @@ -149,7 +149,7 @@ pub async fn process( super::process_response_payload(payload, &outstanding_requests, &custom_apis).await; } - Err(Error::Disconnected) + Err(Error::disconnected()) } async fn connect( diff --git a/crates/bonsaidb-client/src/client/tungstenite_worker.rs b/crates/bonsaidb-client/src/client/tungstenite_worker.rs index 2f4e0ca4a9..7437822b0d 100644 --- a/crates/bonsaidb-client/src/client/tungstenite_worker.rs +++ b/crates/bonsaidb-client/src/client/tungstenite_worker.rs @@ -61,7 +61,7 @@ pub(super) async fn reconnecting_client_loop( continue; } Err(_) => { - drop(request.responder.send(Err(Error::ConnectTimeout))); + drop(request.responder.send(Err(Error::connect_timeout()))); continue; } }; @@ -115,7 +115,7 @@ async fn request_sender( ); } - Err(Error::Disconnected) + Err(Error::disconnected()) } #[allow(clippy::collapsible_else_if)] // not possible due to cfg statement diff --git a/crates/bonsaidb-client/src/error.rs b/crates/bonsaidb-client/src/error.rs index 7e3f426242..bd19442376 100644 --- a/crates/bonsaidb-client/src/error.rs +++ b/crates/bonsaidb-client/src/error.rs @@ -1,4 +1,5 @@ use bonsaidb_core::arc_bytes::serde::Bytes; +use bonsaidb_core::networking; use bonsaidb_core::schema::Name; /// Errors related to working with the BonsaiDb client. @@ -17,10 +18,6 @@ pub enum Error { #[error("invalid url: '{0}'")] InvalidUrl(String), - /// The connection was interrupted. - #[error("unexpected disconnection")] - Disconnected, - /// The connection was interrupted. #[error("unexpected disconnection")] Core(#[from] bonsaidb_core::Error), @@ -38,33 +35,46 @@ pub enum Error { /// The server is incompatible with this version of the client. #[error("server incompatible with client protocol version")] ProtocolVersionMismatch, +} - /// A timeout occurred while connecting to the server. - #[error("connection to server timed out")] - ConnectTimeout, - /// A timeout occurred while waiting for a response from the server. - #[error("request timed out")] - RequestTimeout, +impl Error { + pub(crate) fn disconnected() -> Self { + Self::Core(bonsaidb_core::Error::Networking( + networking::Error::Disconnected, + )) + } + + pub(crate) fn request_timeout() -> Self { + Self::Core(bonsaidb_core::Error::Networking( + networking::Error::RequestTimeout, + )) + } + + pub(crate) fn connect_timeout() -> Self { + Self::Core(bonsaidb_core::Error::Networking( + networking::Error::ConnectTimeout, + )) + } } impl From> for Error { fn from(_: flume::SendError) -> Self { - Self::Disconnected + Self::disconnected() } } impl From for Error { fn from(err: flume::RecvTimeoutError) -> Self { match err { - flume::RecvTimeoutError::Timeout => Self::RequestTimeout, - flume::RecvTimeoutError::Disconnected => Self::Disconnected, + flume::RecvTimeoutError::Timeout => Self::request_timeout(), + flume::RecvTimeoutError::Disconnected => Self::disconnected(), } } } impl From for Error { fn from(_: flume::RecvError) -> Self { - Self::Disconnected + Self::disconnected() } } diff --git a/crates/bonsaidb-core/src/networking.rs b/crates/bonsaidb-core/src/networking.rs index 824b176af7..b7311469a2 100644 --- a/crates/bonsaidb-core/src/networking.rs +++ b/crates/bonsaidb-core/src/networking.rs @@ -684,6 +684,14 @@ pub enum Error { #[error("unexpected response: {0}")] UnexpectedResponse(String), + /// A timeout occurred while connecting to the server. + #[error("connection timeout")] + ConnectTimeout, + + /// A timeout occurred waiting on a request to complete. + #[error("request timeout")] + RequestTimeout, + /// The connection was interrupted. #[error("unexpected disconnection")] Disconnected, diff --git a/crates/bonsaidb/tests/timeouts.rs b/crates/bonsaidb/tests/timeouts.rs new file mode 100644 index 0000000000..16995b2f64 --- /dev/null +++ b/crates/bonsaidb/tests/timeouts.rs @@ -0,0 +1,239 @@ +//! Tests request and connection timeouts + +use std::net::UdpSocket; +use std::time::{Duration, Instant}; + +use bonsaidb::client::url::Url; +use bonsaidb::client::AsyncClient; +use bonsaidb_client::fabruic::Certificate; +use bonsaidb_client::{ApiError, BlockingClient}; +use bonsaidb_core::api::Api; +use bonsaidb_core::async_trait::async_trait; +use bonsaidb_core::connection::{AsyncStorageConnection, StorageConnection}; +use bonsaidb_core::networking; +use bonsaidb_core::test_util::{Basic, TestDirectory}; +use bonsaidb_local::config::Builder; +use bonsaidb_server::api::{Handler, HandlerResult, HandlerSession}; +use bonsaidb_server::{DefaultPermissions, Server, ServerConfiguration}; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +#[tokio::test] +#[cfg(feature = "websockets")] +async fn ws_connect_timeout() -> anyhow::Result<()> { + use std::net::TcpListener; + + let start = Instant::now(); + let tcp = TcpListener::bind("0.0.0.0:0")?; + let port = tcp.local_addr()?.port(); + let client = AsyncClient::build(Url::parse(&format!("ws://127.0.0.1:{port}"))?) + .with_connect_timeout(Duration::from_secs(1)) + .build()?; + + match tokio::time::timeout(Duration::from_secs(60), client.list_databases()).await { + Ok(Err(bonsaidb_core::Error::Networking(networking::Error::ConnectTimeout))) => { + assert!(start.elapsed() < Duration::from_secs(5)); + Ok(()) + } + other => unreachable!("expected connect timeout, got {other:?}"), + } +} + +#[tokio::test] +async fn quic_connect_timeout() -> anyhow::Result<()> { + let start = Instant::now(); + let udp = UdpSocket::bind("0.0.0.0:0")?; + let port = udp.local_addr()?.port(); + let client = AsyncClient::build(Url::parse(&format!("bonsaidb://127.0.0.1:{port}"))?) + .with_connect_timeout(Duration::from_secs(1)) + .build()?; + + match tokio::time::timeout(Duration::from_secs(60), client.list_databases()).await { + Ok(Err(bonsaidb_core::Error::Networking(networking::Error::ConnectTimeout))) => { + assert!(start.elapsed() < Duration::from_secs(5)); + Ok(()) + } + other => unreachable!("expected connect timeout, got {other:?}"), + } +} + +#[test] +#[cfg(feature = "websockets")] +fn blocking_ws_connect_timeout() -> anyhow::Result<()> { + use std::net::TcpListener; + + let start = Instant::now(); + let tcp = TcpListener::bind("0.0.0.0:0")?; + let port = tcp.local_addr()?.port(); + let client = BlockingClient::build(Url::parse(&format!("ws://127.0.0.1:{port}"))?) + .with_connect_timeout(Duration::from_secs(1)) + .build()?; + + match client.list_databases() { + Err(bonsaidb_core::Error::Networking(networking::Error::ConnectTimeout)) => { + assert!(start.elapsed() < Duration::from_secs(5)); + Ok(()) + } + other => unreachable!("expected connect timeout, got {other:?}"), + } +} + +#[test] +fn blocking_quic_connect_timeout() -> anyhow::Result<()> { + let start = Instant::now(); + let udp = UdpSocket::bind("0.0.0.0:0")?; + let port = udp.local_addr()?.port(); + let client = BlockingClient::build(Url::parse(&format!("bonsaidb://127.0.0.1:{port}"))?) + .with_connect_timeout(Duration::from_secs(1)) + .build()?; + + match client.list_databases() { + Err(bonsaidb_core::Error::Networking(networking::Error::ConnectTimeout)) => { + assert!(start.elapsed() < Duration::from_secs(5)); + Ok(()) + } + other => unreachable!("expected connect timeout, got {other:?}"), + } +} + +#[derive(Api, Debug, Serialize, Deserialize, Clone)] +#[api(name = "long-call")] +struct LongCall; + +#[async_trait] +impl Handler for LongCall { + async fn handle(_session: HandlerSession<'_>, _request: LongCall) -> HandlerResult { + tokio::time::sleep(Duration::from_secs(10)).await; + Ok(()) + } +} + +fn shared_server() -> &'static Certificate { + static SHARED_SERVER: Lazy = Lazy::new(|| { + drop(env_logger::try_init()); + let dir = TestDirectory::new("timeouts.bonsaidb"); + + let (server_sender, server_receiver) = tokio::sync::oneshot::channel(); + + std::thread::spawn(move || { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let server = Server::open( + ServerConfiguration::new(&dir) + .default_permissions(DefaultPermissions::AllowAll) + .with_schema::() + .unwrap() + .with_api::() + .unwrap(), + ) + .await + .unwrap(); + server.install_self_signed_certificate(false).await.unwrap(); + server_sender + .send( + server + .certificate_chain() + .await + .unwrap() + .into_end_entity_certificate(), + ) + .unwrap(); + tokio::task::spawn({ + let server = server.clone(); + async move { server.listen_for_websockets_on("0.0.0.0:7023", false).await } + }); + + server.listen_on(7024).await + }) + }); + + server_receiver.blocking_recv().unwrap() + }); + + &SHARED_SERVER +} + +#[tokio::test] +async fn ws_request_timeout() { + shared_server(); + // Give the server a moment to actually start up. + tokio::time::sleep(Duration::from_millis(100)).await; + + let start = Instant::now(); + let client = AsyncClient::build(Url::parse("ws://127.0.0.1:7023").unwrap()) + .with_request_timeout(Duration::from_secs(1)) + .build() + .unwrap(); + match client.send_api_request(&LongCall).await { + Err(ApiError::Client(bonsaidb_client::Error::Core(bonsaidb_core::Error::Networking( + networking::Error::RequestTimeout, + )))) => { + assert!(start.elapsed() < Duration::from_secs(5)); + } + other => unreachable!("expected request timeout, got {other:?}"), + } +} + +#[test] +fn blocking_ws_request_timeout() { + shared_server(); + // Give the server a moment to actually start up. + std::thread::sleep(Duration::from_millis(100)); + + let start = Instant::now(); + let client = BlockingClient::build(Url::parse("ws://127.0.0.1:7023").unwrap()) + .with_request_timeout(Duration::from_secs(1)) + .build() + .unwrap(); + match client.send_api_request(&LongCall) { + Err(ApiError::Client(bonsaidb_client::Error::Core(bonsaidb_core::Error::Networking( + networking::Error::RequestTimeout, + )))) => { + assert!(start.elapsed() < Duration::from_secs(5)); + } + other => unreachable!("expected request timeout, got {other:?}"), + } +} + +#[tokio::test] +async fn quic_request_timeout() { + let cert_chain = shared_server(); + // Give the server a moment to actually start up. + tokio::time::sleep(Duration::from_millis(100)).await; + + let start = Instant::now(); + let client = AsyncClient::build(Url::parse("bonsaidb://127.0.0.1:7024").unwrap()) + .with_request_timeout(Duration::from_secs(1)) + .with_certificate(cert_chain.clone()) + .build() + .unwrap(); + match client.send_api_request(&LongCall).await { + Err(ApiError::Client(bonsaidb_client::Error::Core(bonsaidb_core::Error::Networking( + networking::Error::RequestTimeout, + )))) => { + assert!(start.elapsed() < Duration::from_secs(5)); + } + other => unreachable!("expected request timeout, got {other:?}"), + } +} + +#[test] +fn blocking_quic_request_timeout() { + let cert_chain = shared_server(); + // Give the server a moment to actually start up. + std::thread::sleep(Duration::from_millis(100)); + + let start = Instant::now(); + let client = BlockingClient::build(Url::parse("bonsaidb://127.0.0.1:7024").unwrap()) + .with_request_timeout(Duration::from_secs(1)) + .with_certificate(cert_chain.clone()) + .build() + .unwrap(); + match client.send_api_request(&LongCall) { + Err(ApiError::Client(bonsaidb_client::Error::Core(bonsaidb_core::Error::Networking( + networking::Error::RequestTimeout, + )))) => { + assert!(start.elapsed() < Duration::from_secs(5)); + } + other => unreachable!("expected request timeout, got {other:?}"), + } +}