Skip to content

Commit

Permalink
Implemented client-side timeouts
Browse files Browse the repository at this point in the history
Refs #296, #297

This commit adds timeout settings for connections and requests. These
timeouts are separate, meaning that if the initial request takes X
seconds to connect, the request timeout will be timed in addition to
those X seconds.

This should be unit-tested, and the wasm client changes haven't been
tested at all beyond ensuring compilation.
  • Loading branch information
ecton committed May 16, 2023
1 parent 872a741 commit 11d6b80
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 43 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`delete_in_transaction()` which are new helpers that help writing
transactional code easier by creating and pushing the `Operations` in one
step.
- `Client`'s builder now has two additional settings: `request_timeout` and
`connect_timeout`. If not specified, both timeouts are 60 seconds. Thank you
to @phantie for requesting these settings in #296.

### Changed

Expand Down
3 changes: 2 additions & 1 deletion crates/bonsaidb-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ web-sys = { version = "0.3", features = [
] }
wasm-bindgen-futures = "0.4"
wasm-bindgen = "0.2"
wasm-timer = "0.2.5"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
fabruic = { version = "0.0.1-dev.6" }
tokio = { version = "1.16.1", features = ["sync", "macros"] }
tokio = { version = "1.16.1", features = ["sync", "macros", "time"] }
tokio-tungstenite = { version = "0.18", optional = true, features = [
"rustls-tls-native-roots",
] }
Expand Down
24 changes: 24 additions & 0 deletions crates/bonsaidb-client/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use bonsaidb_core::api;
use bonsaidb_core::api::ApiName;
Expand All @@ -26,6 +27,8 @@ pub struct Builder<AsyncMode> {
url: Url,
protocol_version: &'static str,
custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
#[cfg(not(target_arch = "wasm32"))]
certificate: Option<fabruic::Certificate>,
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -40,6 +43,8 @@ impl<AsyncMode> Builder<AsyncMode> {
url,
protocol_version: CURRENT_PROTOCOL_VERSION,
custom_apis: HashMap::new(),
request_timeout: None,
connect_timeout: None,
#[cfg(not(target_arch = "wasm32"))]
certificate: None,
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -90,11 +95,30 @@ impl<AsyncMode> Builder<AsyncMode> {
self
}

/// Sets the request timeout for the client.
///
/// If not specified, requests will time out after 60 seconds.
pub fn with_request_timeout(mut self, timeout: impl Into<Duration>) -> Self {
self.request_timeout = Some(timeout.into());
self
}

/// Sets the connection timeout for the client.
///
/// If not specified, the client will time out after 60 seconds if a
/// connection cannot be established.
pub fn with_connect_timeout(mut self, timeout: impl Into<Duration>) -> Self {
self.connect_timeout = Some(timeout.into());
self
}

fn finish_internal(self) -> Result<AsyncClient, Error> {
AsyncClient::new_from_parts(
self.url,
self.protocol_version,
self.custom_apis,
self.connect_timeout,
self.request_timeout,
#[cfg(not(target_arch = "wasm32"))]
self.certificate,
#[cfg(not(target_arch = "wasm32"))]
Expand Down
107 changes: 87 additions & 20 deletions crates/bonsaidb-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::ops::Deref;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
Expand Down Expand Up @@ -228,6 +229,7 @@ pub type WebSocketError = wasm_websocket_worker::WebSocketError;
pub struct AsyncClient {
pub(crate) data: Arc<Data>,
session: ClientSession,
request_timeout: Duration,
}

impl Drop for AsyncClient {
Expand Down Expand Up @@ -286,6 +288,8 @@ impl AsyncClient {
url,
CURRENT_PROTOCOL_VERSION,
HashMap::default(),
None,
None,
#[cfg(not(target_arch = "wasm32"))]
None,
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -312,6 +316,8 @@ impl AsyncClient {
url: Url,
protocol_version: &'static str,
mut custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
#[cfg(not(target_arch = "wasm32"))] certificate: Option<fabruic::Certificate>,
#[cfg(not(target_arch = "wasm32"))] tokio: Option<Handle>,
) -> Result<Self, Error> {
Expand Down Expand Up @@ -339,49 +345,54 @@ impl AsyncClient {
},
))),
);
match url.scheme() {
// Default timeouts to 1 minute.
let connection = ConnectionInfo {
url,
subscribers,
connect_timeout: connect_timeout.unwrap_or(Duration::from_secs(60)),
request_timeout: request_timeout.unwrap_or(Duration::from_secs(60)),
};
match connection.url.scheme() {
#[cfg(not(target_arch = "wasm32"))]
"bonsaidb" => Ok(Self::new_bonsai_client(
url,
connection,
protocol_version,
certificate,
custom_apis,
tokio,
subscribers,
)),
#[cfg(feature = "websockets")]
"wss" | "ws" => Ok(Self::new_websocket_client(
url,
connection,
protocol_version,
custom_apis,
#[cfg(not(target_arch = "wasm32"))]
tokio,
subscribers,
)),
other => Err(Error::InvalidUrl(format!("unsupported scheme {other}"))),
}
}

#[cfg(not(target_arch = "wasm32"))]
fn new_bonsai_client(
url: Url,
server: ConnectionInfo,
protocol_version: &'static str,
certificate: Option<fabruic::Certificate>,
custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
tokio: Option<Handle>,
subscribers: SubscriberMap,
) -> Self {
let (request_sender, request_receiver) = flume::unbounded();
let connection_counter = Arc::new(AtomicU32::default());
let request_timeout = server.request_timeout;
let subscribers = server.subscribers.clone();

let worker = sync::spawn_client(
quic_worker::reconnecting_client_loop(
url,
server,
protocol_version,
certificate,
request_receiver,
Arc::new(custom_apis),
subscribers.clone(),
connection_counter.clone(),
),
tokio,
Expand All @@ -407,27 +418,28 @@ impl AsyncClient {
background_task_running,
}),
session: ClientSession::default(),
request_timeout,
}
}

#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
fn new_websocket_client(
url: Url,
server: ConnectionInfo,
protocol_version: &'static str,
custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
tokio: Option<Handle>,
subscribers: SubscriberMap,
) -> Self {
let (request_sender, request_receiver) = flume::unbounded();
let connection_counter = Arc::new(AtomicU32::default());
let request_timeout = server.request_timeout;
let subscribers = server.subscribers.clone();

let worker = sync::spawn_client(
tungstenite_worker::reconnecting_client_loop(
url,
server,
protocol_version,
request_receiver,
Arc::new(custom_apis),
subscribers.clone(),
connection_counter.clone(),
),
tokio,
Expand All @@ -454,27 +466,28 @@ impl AsyncClient {
background_task_running,
}),
session: ClientSession::default(),
request_timeout,
}
}

#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
fn new_websocket_client(
url: Url,
server: ConnectionInfo,
protocol_version: &'static str,
custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
subscribers: SubscriberMap,
) -> Self {
let (request_sender, request_receiver) = flume::unbounded();
let connection_counter = Arc::new(AtomicU32::default());

wasm_websocket_worker::spawn_client(
Arc::new(url),
Arc::new(server.url),
protocol_version,
request_receiver,
Arc::new(custom_apis),
subscribers.clone(),
server.subscribers.clone(),
connection_counter.clone(),
None,
server.connect_timeout,
);

#[cfg(feature = "test-util")]
Expand All @@ -493,11 +506,12 @@ impl AsyncClient {
request_id: AtomicU32::default(),
connection_counter,
effective_permissions: Mutex::default(),
subscribers,
subscribers: server.subscribers,
#[cfg(feature = "test-util")]
background_task_running,
}),
session: ClientSession::default(),
request_timeout: server.request_timeout,
}
}

Expand All @@ -524,14 +538,49 @@ impl AsyncClient {
async fn send_request_async(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
let result_receiver = self.send_request_without_confirmation(name, bytes)?;

result_receiver.recv_async().await?
#[cfg(target_arch = "wasm32")]
let result = {
use std::pin::pin;
// wasm_timer has a weird quirk in how TryFuture is implemented to
// try to combine errors. We don't want this behavior, so we're just
// going to wrap the result in another result via this adapter.
struct FlumeWrapper<F>(F);
impl<F> Future for FlumeWrapper<F>
where
F: Future + Unpin,
{
type Output = Result<F::Output, std::io::Error>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match pin!(&mut self.0).poll(cx) {
std::task::Poll::Ready(result) => std::task::Poll::Ready(Ok(result)),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
wasm_timer::ext::TryFutureExt::timeout(
FlumeWrapper(result_receiver.recv_async()),
self.request_timeout,
)
.await
};
#[cfg(not(target_arch = "wasm32"))]
let result = tokio::time::timeout(self.request_timeout, result_receiver.recv_async()).await;

match result {
Ok(response) => response?,
Err(_) => Err(Error::RequestTimeout),
}
}

#[cfg(not(target_arch = "wasm32"))]
fn send_request(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
let result_receiver = self.send_request_without_confirmation(name, bytes)?;

result_receiver.recv()?
result_receiver.recv_timeout(self.request_timeout)?
}

/// Sends an api `request`.
Expand Down Expand Up @@ -631,6 +680,14 @@ impl AsyncClient {
self.session.session.id.is_none()
|| self.data.connection_counter.load(Ordering::SeqCst) == self.session.connection_id
}

/// Sets this instance's request timeout.
///
/// Each client has its own timeout. When cloning a client, this timeout
/// setting will be copied to the clone.
pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
self.request_timeout = timeout.into();
}
}

impl HasSession for AsyncClient {
Expand Down Expand Up @@ -735,6 +792,7 @@ impl AsyncStorageConnection for AsyncClient {
session: Arc::new(session),
connection_id: self.data.connection_counter.load(Ordering::SeqCst),
},
request_timeout: self.request_timeout,
})
}

Expand All @@ -751,6 +809,7 @@ impl AsyncStorageConnection for AsyncClient {
session: Arc::new(session),
connection_id: self.data.connection_counter.load(Ordering::SeqCst),
},
request_timeout: self.request_timeout,
})
}

Expand Down Expand Up @@ -986,3 +1045,11 @@ async fn disconnect_pending_requests(
);
}
}

struct ConnectionInfo {
pub url: Url,
pub subscribers: SubscriberMap,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub connect_timeout: Duration,
pub request_timeout: Duration,
}
Loading

0 comments on commit 11d6b80

Please sign in to comment.