diff --git a/Cargo.toml b/Cargo.toml index 4137f62..3de75e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,17 +26,25 @@ fs2 = { optional = true, version = "0.4.3" } tokio = { optional = true, features = ["rt", "sync"], version = "1.39.3" } async-std = { optional = true, version = "1.12.0" } +wasm-bindgen-futures = { optional = true, version = "0.4.24" } +getrandom = { optional = true, version = "0.2.15", features = ["js"] } [features] -default = ["fs"] +default = ["fs", "asyncdb-wasm-bindgen-futures"] async = ["asyncdb-tokio"] asyncdb-tokio = ["tokio"] asyncdb-async-std = ["async-std"] +asyncdb-wasm-bindgen-futures = [ + "wasm-bindgen-futures", + "async-std/async-channel", + "getrandom", +] fs = ["errno", "fs2"] [dev-dependencies] time-test = "0.2" bencher = "0.1" +wasm-bindgen-test = "0.3.0" [[bench]] name = "maps_bench" diff --git a/src/asyncdb.rs b/src/asyncdb.rs index 1f3a32c..aa65f2e 100644 --- a/src/asyncdb.rs +++ b/src/asyncdb.rs @@ -1,14 +1,14 @@ use std::collections::hash_map::HashMap; use crate::{ - send_response, send_response_result, AsyncDB, Message, Result, Status, StatusCode, WriteBatch, - DB, + send_response, send_response_result, snapshot::Snapshot, AsyncDB, Message, Result, Status, + StatusCode, WriteBatch, DB, }; pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32; #[derive(Clone, Copy)] -pub struct SnapshotRef(usize); +pub struct SnapshotRef(pub(crate) usize); /// A request sent to the database thread. pub(crate) enum Request { @@ -151,6 +151,78 @@ impl AsyncDB { } } + pub(crate) fn match_message( + db: &mut DB, + mut recv: impl ReceiverExt, + snapshots: &mut HashMap, + snapshot_counter: &mut usize, + message: Message, + ) { + match message.req { + Request::Close => { + send_response(message.resp_channel, Response::OK); + recv.close(); + return; + } + Request::Put { key, val } => { + let ok = db.put(&key, &val); + send_response_result(message.resp_channel, ok); + } + Request::Delete { key } => { + let ok = db.delete(&key); + send_response_result(message.resp_channel, ok); + } + Request::Write { batch, sync } => { + let ok = db.write(batch, sync); + send_response_result(message.resp_channel, ok); + } + Request::Flush => { + let ok = db.flush(); + send_response_result(message.resp_channel, ok); + } + Request::GetAt { snapshot, key } => { + let snapshot_id = snapshot.0; + if let Some(snapshot) = snapshots.get(&snapshot_id) { + let ok = db.get_at(snapshot, &key); + match ok { + Err(e) => { + send_response(message.resp_channel, Response::Error(e)); + } + Ok(v) => { + send_response(message.resp_channel, Response::Value(v)); + } + }; + } else { + send_response( + message.resp_channel, + Response::Error(Status { + code: StatusCode::AsyncError, + err: "Unknown snapshot reference: this is a bug".to_string(), + }), + ); + } + } + Request::Get { key } => { + let r = db.get(&key); + send_response(message.resp_channel, Response::Value(r)); + } + Request::GetSnapshot => { + snapshots.insert(*snapshot_counter, db.get_snapshot()); + let sref = SnapshotRef(*snapshot_counter); + *snapshot_counter += 1; + send_response(message.resp_channel, Response::Snapshot(sref)); + } + Request::DropSnapshot { snapshot } => { + snapshots.remove(&snapshot.0); + send_response_result(message.resp_channel, Ok(())); + } + Request::CompactRange { from, to } => { + let ok = db.compact_range(&from, &to); + send_response_result(message.resp_channel, ok); + } + } + } + pub(crate) fn run_server(mut db: DB, mut recv: impl ReceiverExt) { let mut snapshots = HashMap::new(); let mut snapshot_counter: usize = 0; @@ -225,5 +297,8 @@ impl AsyncDB { pub(crate) trait ReceiverExt { fn blocking_recv(&mut self) -> Option; + async fn recv(&mut self) -> Option { + self.blocking_recv() + } fn close(&mut self); } diff --git a/src/asyncdb_async_std.rs b/src/asyncdb_async_std.rs index 09e23f9..74f2523 100644 --- a/src/asyncdb_async_std.rs +++ b/src/asyncdb_async_std.rs @@ -11,6 +11,7 @@ pub(crate) struct Message { pub(crate) req: Request, pub(crate) resp_channel: channel::Sender, } + /// `AsyncDB` makes it easy to use LevelDB in a async-std runtime. /// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. #[derive(Clone)] diff --git a/src/asyncdb_wasm_bindgen_futures.rs b/src/asyncdb_wasm_bindgen_futures.rs new file mode 100644 index 0000000..977c9b6 --- /dev/null +++ b/src/asyncdb_wasm_bindgen_futures.rs @@ -0,0 +1,151 @@ +use std::collections::HashMap; +use std::path::Path; + +use async_std::channel::{self, TryRecvError}; +use wasm_bindgen_futures::spawn_local; + +use crate::asyncdb::{ReceiverExt, Request, Response, CHANNEL_BUFFER_SIZE}; +use crate::snapshot::Snapshot; +use crate::{Options, Result, Status, StatusCode, DB}; + +pub(crate) struct Message { + pub(crate) req: Request, + pub(crate) resp_channel: channel::Sender, +} + +/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime. +/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. +#[derive(Clone)] +pub struct AsyncDB { + shutdown: channel::Sender<()>, + send: channel::Sender, +} + +impl AsyncDB { + /// Create a new or open an existing database. + pub fn new>(name: P, opts: Options) -> Result { + let db = DB::open(name, opts)?; + + let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE); + let (shutdown, shutdown_recv) = channel::bounded(1); + + spawn_local(async move { + AsyncDB::run_server_async(db, recv, shutdown_recv, HashMap::new(), 0).await; + }); + + Ok(AsyncDB { shutdown, send }) + } + + pub(crate) async fn process_request(&self, req: Request) -> Result { + let (tx, rx) = channel::bounded(1); + + let m = Message { + req, + resp_channel: tx, + }; + if let Err(e) = self.send.send(m).await { + return Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }); + } + let resp = rx.recv().await; + match resp { + Err(e) => Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }), + Ok(r) => Ok(r), + } + } + + pub(crate) async fn run_server_async( + mut db: DB, + mut recv: impl ReceiverExt + Clone + 'static, + mut shutdown: impl ReceiverExt<()> + Clone + 'static, + mut snapshots: HashMap, + mut snapshot_counter: usize, + ) { + if let Some(message) = recv.recv().await { + Self::match_message( + &mut db, + recv.clone(), + &mut snapshots, + &mut snapshot_counter, + message, + ); + } + + spawn_local(async move { + // check shutdown + if let Some(()) = shutdown.recv().await { + return; + } else { + AsyncDB::run_server_async(db, recv, shutdown, snapshots, snapshot_counter).await + }; + }); + } + + pub(crate) async fn stop_server_async(&self) { + self.shutdown.close(); + } +} + +pub(crate) fn send_response_result(ch: channel::Sender, result: Result<()>) { + if let Err(e) = result { + ch.try_send(Response::Error(e)).ok(); + } else { + ch.try_send(Response::OK).ok(); + } +} + +pub(crate) fn send_response(ch: channel::Sender, res: Response) { + ch.send_blocking(res).ok(); +} + +impl ReceiverExt for channel::Receiver { + fn blocking_recv(&mut self) -> Option { + self.recv_blocking().ok() + } + + fn close(&mut self) { + channel::Receiver::close(self); + } + + async fn recv(&mut self) -> Option { + channel::Receiver::recv(&self).await.ok() + } +} + +impl ReceiverExt<()> for channel::Receiver<()> { + fn blocking_recv(&mut self) -> Option<()> { + self.recv_blocking().ok() + } + + fn close(&mut self) { + channel::Receiver::close(self); + } + + async fn recv(&mut self) -> Option<()> { + match channel::Receiver::try_recv(&self) { + Ok(_) => Some(()), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Closed) => Some(()), + } + } +} + +#[cfg(test)] +pub mod tests { + use crate::{in_memory, AsyncDB}; + use wasm_bindgen_test::wasm_bindgen_test; + + #[wasm_bindgen_test] + async fn test_asyncdb() { + let db = AsyncDB::new("test.db", in_memory()).unwrap(); + db.put(b"key".to_vec(), b"value".to_vec()).await.unwrap(); + let val = db.get(b"key".to_vec()).await.unwrap(); + assert_eq!(val, Some(b"value".to_vec())); + db.stop_server_async().await; + } +} diff --git a/src/lib.rs b/src/lib.rs index 33fd29e..fc4c5dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,7 +40,11 @@ extern crate time_test; #[macro_use] mod infolog; -#[cfg(any(feature = "asyncdb-tokio", feature = "asyncdb-async-std"))] +#[cfg(any( + feature = "asyncdb-tokio", + feature = "asyncdb-async-std", + feature = "asyncdb-wasm-bindgen-futures" +))] mod asyncdb; #[cfg(feature = "asyncdb-tokio")] @@ -53,6 +57,11 @@ mod asyncdb_async_std; #[cfg(feature = "asyncdb-async-std")] use asyncdb_async_std::{send_response, send_response_result, Message}; +#[cfg(feature = "asyncdb-wasm-bindgen-futures")] +mod asyncdb_wasm_bindgen_futures; +#[cfg(feature = "asyncdb-wasm-bindgen-futures")] +use self::asyncdb_wasm_bindgen_futures::{send_response, send_response_result, Message}; + mod block; mod block_builder; mod blockhandle; @@ -96,6 +105,8 @@ pub mod env; pub use asyncdb_async_std::AsyncDB; #[cfg(feature = "asyncdb-tokio")] pub use asyncdb_tokio::AsyncDB; +#[cfg(feature = "asyncdb-wasm-bindgen-futures")] +pub use asyncdb_wasm_bindgen_futures::AsyncDB; pub use cmp::{Cmp, DefaultCmp}; pub use compressor::{Compressor, CompressorId}; pub use db_impl::DB;