From 47a2c95a124e948925e74505d20ef9fe2f0fe0a9 Mon Sep 17 00:00:00 2001 From: Dan Enman Date: Tue, 27 Aug 2024 17:50:12 -0300 Subject: [PATCH 1/5] feat: add async-std as an optional runtime for AsyncDB --- Cargo.toml | 10 +- examples/asyncdb-async-std/Cargo.toml | 10 + .../src/main.rs | 0 .../{asyncdb => asyncdb-tokio}/Cargo.toml | 4 +- examples/asyncdb-tokio/src/main.rs | 40 ++++ src/asyncdb.rs | 217 ++++++++++++------ src/lib.rs | 4 +- 7 files changed, 213 insertions(+), 72 deletions(-) create mode 100644 examples/asyncdb-async-std/Cargo.toml rename examples/{asyncdb => asyncdb-async-std}/src/main.rs (100%) rename examples/{asyncdb => asyncdb-tokio}/Cargo.toml (58%) create mode 100644 examples/asyncdb-tokio/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 4c31396..f9b241a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,13 +22,17 @@ rand = "0.8.5" snap = "1.0" errno = { optional = true, version = "0.2" } -fs2 = {optional = true, version = "0.4.3"} +fs2 = { optional = true, version = "0.4.3" } -tokio = { optional = true, features = ["rt", "sync"], version = ">= 1.21" } +tokio = { optional = true, features = ["rt", "sync"], version = "1.39.3" } +async-std = { optional = true, version = "1.12.0" } [features] default = ["fs"] -async = ["tokio"] +async = ["asyncdb-tokio"] +asyncdb-tokio = ["dep:tokio", "asyncdb"] +asyncdb-async-std = ["dep:async-std", "asyncdb"] +asyncdb = [] fs = ["errno", "fs2"] [dev-dependencies] diff --git a/examples/asyncdb-async-std/Cargo.toml b/examples/asyncdb-async-std/Cargo.toml new file mode 100644 index 0000000..f6a7832 --- /dev/null +++ b/examples/asyncdb-async-std/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "asyncdb" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.21", features = ["rt", "macros"] } +rusty-leveldb = { path = "../../", features = ["asyncdb-async-std"] } diff --git a/examples/asyncdb/src/main.rs b/examples/asyncdb-async-std/src/main.rs similarity index 100% rename from examples/asyncdb/src/main.rs rename to examples/asyncdb-async-std/src/main.rs diff --git a/examples/asyncdb/Cargo.toml b/examples/asyncdb-tokio/Cargo.toml similarity index 58% rename from examples/asyncdb/Cargo.toml rename to examples/asyncdb-tokio/Cargo.toml index 89bee62..60cded1 100644 --- a/examples/asyncdb/Cargo.toml +++ b/examples/asyncdb-tokio/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.21", features = ["rt", "macros" ] } -rusty-leveldb = { path = "../../", features = ["async"] } +tokio = { version = "1.21", features = ["rt", "macros"] } +rusty-leveldb = { path = "../../", features = ["asyncdb-tokio"] } diff --git a/examples/asyncdb-tokio/src/main.rs b/examples/asyncdb-tokio/src/main.rs new file mode 100644 index 0000000..2bce147 --- /dev/null +++ b/examples/asyncdb-tokio/src/main.rs @@ -0,0 +1,40 @@ +use tokio::main; + +use rusty_leveldb::{AsyncDB, Options, Status, StatusCode}; + +#[main(flavor = "current_thread")] +async fn main() { + let adb = AsyncDB::new("testdb", Options::default()).unwrap(); + + adb.put("Hello".as_bytes().to_owned(), "World".as_bytes().to_owned()) + .await + .expect("put()"); + + let r = adb.get("Hello".as_bytes().to_owned()).await; + assert_eq!(r, Ok(Some("World".as_bytes().to_owned()))); + + let snapshot = adb.get_snapshot().await.expect("get_snapshot()"); + + adb.delete("Hello".as_bytes().to_owned()) + .await + .expect("delete()"); + + // A snapshot allows us to travel back in time before the deletion. + let r2 = adb.get_at(snapshot, "Hello".as_bytes().to_owned()).await; + assert_eq!(r2, Ok(Some("World".as_bytes().to_owned()))); + + // Once dropped, a snapshot cannot be used anymore. + adb.drop_snapshot(snapshot).await.expect("drop_snapshot()"); + + let r3 = adb.get_at(snapshot, "Hello".as_bytes().to_owned()).await; + assert_eq!( + r3, + Err(Status { + code: StatusCode::AsyncError, + err: "Unknown snapshot reference: this is a bug".to_string() + }) + ); + + adb.flush().await.expect("flush()"); + adb.close().await.expect("close()"); +} diff --git a/src/asyncdb.rs b/src/asyncdb.rs index ad02a85..de4d1b9 100644 --- a/src/asyncdb.rs +++ b/src/asyncdb.rs @@ -4,10 +4,18 @@ use std::sync::Arc; use crate::{Options, Result, Status, StatusCode, WriteBatch, DB}; +#[cfg(feature = "asyncdb-tokio")] use tokio::sync::mpsc; +#[cfg(feature = "asyncdb-tokio")] use tokio::sync::oneshot; +#[cfg(feature = "asyncdb-tokio")] use tokio::task::{spawn_blocking, JoinHandle}; +#[cfg(feature = "asyncdb-async-std")] +use async_std::channel; +#[cfg(feature = "asyncdb-async-std")] +use async_std::task::{spawn_blocking, JoinHandle}; + const CHANNEL_BUFFER_SIZE: usize = 32; #[derive(Clone, Copy)] @@ -38,7 +46,10 @@ enum Response { /// Contains both a request and a back-channel for the reply. struct Message { req: Request, + #[cfg(feature = "asyncdb-tokio")] resp_channel: oneshot::Sender, + #[cfg(feature = "asyncdb-async-std")] + resp_channel: channel::Sender, } /// `AsyncDB` makes it easy to use LevelDB in a tokio runtime. @@ -49,14 +60,22 @@ struct Message { #[derive(Clone)] pub struct AsyncDB { jh: Arc>, + #[cfg(feature = "asyncdb-tokio")] send: mpsc::Sender, + #[cfg(feature = "asyncdb-async-std")] + send: channel::Sender, } impl AsyncDB { /// Create a new or open an existing database. pub fn new>(name: P, opts: Options) -> Result { let db = DB::open(name, opts)?; + #[cfg(feature = "asyncdb-tokio")] let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE); + + #[cfg(feature = "asyncdb-async-std")] + let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE); + let jh = spawn_blocking(move || AsyncDB::run_server(db, recv)); Ok(AsyncDB { jh: Arc::new(jh), @@ -183,7 +202,12 @@ impl AsyncDB { } async fn process_request(&self, req: Request) -> Result { + #[cfg(feature = "asyncdb-tokio")] let (tx, rx) = oneshot::channel(); + + #[cfg(feature = "asyncdb-async-std")] + let (tx, rx) = channel::bounded(1); + let m = Message { req, resp_channel: tx, @@ -194,7 +218,11 @@ impl AsyncDB { err: e.to_string(), }); } + #[cfg(feature = "asyncdb-tokio")] let resp = rx.await; + + #[cfg(feature = "asyncdb-async-std")] + let resp = rx.recv().await; match resp { Err(e) => Err(Status { code: StatusCode::AsyncError, @@ -204,82 +232,141 @@ impl AsyncDB { } } - fn run_server(mut db: DB, mut recv: mpsc::Receiver) { - let mut snapshots = HashMap::new(); - let mut snapshot_counter: usize = 0; + fn _run_server(mut db: DB, mut recv: impl ReceiverExt) { + { + let mut snapshots = HashMap::new(); + let mut snapshot_counter: usize = 0; - while let Some(message) = recv.blocking_recv() { - match message.req { - Request::Close => { - message.resp_channel.send(Response::OK).ok(); - recv.close(); - return; - } - Request::Put { key, val } => { - let ok = db.put(&key, &val); - send_response(message.resp_channel, ok); - } - Request::Delete { key } => { - let ok = db.delete(&key); - send_response(message.resp_channel, ok); - } - Request::Write { batch, sync } => { - let ok = db.write(batch, sync); - send_response(message.resp_channel, ok); - } - Request::Flush => { - let ok = db.flush(); - send_response(message.resp_channel, ok); - } - Request::GetAt { snapshot, key } => { - let snapshot_id = snapshot.0; - if let Some(snapshot) = snapshots.get(&snapshot_id) { - let ok = db.get_at(snapshot, &key); - match ok { - Err(e) => { - message.resp_channel.send(Response::Error(e)).ok(); - } - Ok(v) => { - message.resp_channel.send(Response::Value(v)).ok(); - } - }; - } else { - message - .resp_channel - .send(Response::Error(Status { - code: StatusCode::AsyncError, - err: "Unknown snapshot reference: this is a bug".to_string(), - })) - .ok(); + while let Some(message) = recv.blocking_recv() { + match message.req { + Request::Close => { + send_response(message.resp_channel, Response::OK); + recv.close(); + return; + } + Request::Put { key, val } => { + let ok = db.put(&key, &val); + send_response_result(message.resp_channel, ok); + } + Request::Delete { key } => { + let ok = db.delete(&key); + send_response_result(message.resp_channel, ok); + } + Request::Write { batch, sync } => { + let ok = db.write(batch, sync); + send_response_result(message.resp_channel, ok); + } + Request::Flush => { + let ok = db.flush(); + send_response_result(message.resp_channel, ok); + } + Request::GetAt { snapshot, key } => { + let snapshot_id = snapshot.0; + if let Some(snapshot) = snapshots.get(&snapshot_id) { + let ok = db.get_at(snapshot, &key); + match ok { + Err(e) => { + send_response(message.resp_channel, Response::Error(e)); + } + Ok(v) => { + send_response(message.resp_channel, Response::Value(v)); + } + }; + } else { + send_response( + message.resp_channel, + Response::Error(Status { + code: StatusCode::AsyncError, + err: "Unknown snapshot reference: this is a bug".to_string(), + }), + ); + } + } + Request::Get { key } => { + let r = db.get(&key); + send_response(message.resp_channel, Response::Value(r)); + } + Request::GetSnapshot => { + snapshots.insert(snapshot_counter, db.get_snapshot()); + let sref = SnapshotRef(snapshot_counter); + snapshot_counter += 1; + send_response(message.resp_channel, Response::Snapshot(sref)); + } + Request::DropSnapshot { snapshot } => { + snapshots.remove(&snapshot.0); + send_response_result(message.resp_channel, Ok(())); + } + Request::CompactRange { from, to } => { + let ok = db.compact_range(&from, &to); + send_response_result(message.resp_channel, ok); } - } - Request::Get { key } => { - let r = db.get(&key); - message.resp_channel.send(Response::Value(r)).ok(); - } - Request::GetSnapshot => { - snapshots.insert(snapshot_counter, db.get_snapshot()); - let sref = SnapshotRef(snapshot_counter); - snapshot_counter += 1; - message.resp_channel.send(Response::Snapshot(sref)).ok(); - } - Request::DropSnapshot { snapshot } => { - snapshots.remove(&snapshot.0); - send_response(message.resp_channel, Ok(())); - } - Request::CompactRange { from, to } => { - let ok = db.compact_range(&from, &to); - send_response(message.resp_channel, ok); } } } } + + #[cfg(feature = "asyncdb-tokio")] + fn run_server(db: DB, recv: mpsc::Receiver) { + Self::_run_server(db, recv); + } + + #[cfg(feature = "asyncdb-async-std")] + fn run_server(db: DB, recv: channel::Receiver) { + Self::_run_server(db, recv); + } } -fn send_response(ch: oneshot::Sender, result: Result<()>) { +#[cfg(feature = "asyncdb-tokio")] +fn send_response_result(ch: oneshot::Sender, result: Result<()>) { if let Err(e) = result { ch.send(Response::Error(e)).ok(); } else { ch.send(Response::OK).ok(); } } + +#[cfg(feature = "asyncdb-async-std")] +fn send_response_result(ch: channel::Sender, result: Result<()>) { + if let Err(e) = result { + ch.try_send(Response::Error(e)).ok(); + } else { + ch.try_send(Response::OK).ok(); + } +} + +#[cfg(feature = "asyncdb-tokio")] +fn send_response(ch: oneshot::Sender, res: Response) { + ch.send(res).ok(); +} + +#[cfg(feature = "asyncdb-async-std")] +fn send_response(ch: channel::Sender, res: Response) { + ch.send_blocking(res).ok(); +} + +trait ReceiverExt { + fn blocking_recv(&mut self) -> Option; + fn close(&mut self); +} + +#[cfg(feature = "asyncdb-tokio")] +impl ReceiverExt for mpsc::Receiver { + fn blocking_recv(&mut self) -> Option { + self.blocking_recv() + } + + fn close(&mut self) { + self.close(); + } +} + +#[cfg(feature = "asyncdb-async-std")] +impl ReceiverExt for channel::Receiver { + fn blocking_recv(&mut self) -> Option { + self.recv_blocking().ok() + } + + fn close(&mut self) { + channel::Receiver::close(self); + } +} diff --git a/src/lib.rs b/src/lib.rs index 77a8de1..e467b79 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,7 +40,7 @@ extern crate time_test; #[macro_use] mod infolog; -#[cfg(feature = "async")] +#[cfg(feature = "asyncdb")] mod asyncdb; mod block; @@ -82,7 +82,7 @@ mod db_iter; pub mod compressor; pub mod env; -#[cfg(feature = "async")] +#[cfg(feature = "asyncdb")] pub use asyncdb::AsyncDB; pub use cmp::{Cmp, DefaultCmp}; From 15b1e5804183df910ca301cc570247d4029da3a9 Mon Sep 17 00:00:00 2001 From: Dan Enman Date: Tue, 27 Aug 2024 18:01:29 -0300 Subject: [PATCH 2/5] fix: asyncdb examples for runtimes in Cargo workspace --- Cargo.toml | 3 ++- examples/asyncdb-async-std/Cargo.toml | 2 +- examples/asyncdb-tokio/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f9b241a..4a09c29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,8 @@ members = [ "examples/leveldb-tool", "examples/word-analyze", "examples/stresstest", - "examples/asyncdb", + "examples/asyncdb-tokio", + "examples/asyncdb-async-std", "examples/mcpe", "examples/kvserver", ] diff --git a/examples/asyncdb-async-std/Cargo.toml b/examples/asyncdb-async-std/Cargo.toml index f6a7832..9003075 100644 --- a/examples/asyncdb-async-std/Cargo.toml +++ b/examples/asyncdb-async-std/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "asyncdb" +name = "asyncdb-tokio" version = "0.1.0" edition = "2021" diff --git a/examples/asyncdb-tokio/Cargo.toml b/examples/asyncdb-tokio/Cargo.toml index 60cded1..5b1ef47 100644 --- a/examples/asyncdb-tokio/Cargo.toml +++ b/examples/asyncdb-tokio/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "asyncdb" +name = "asyncdb-async-std" version = "0.1.0" edition = "2021" From 588bd54f5b4ef4b41a7bddc318f7b8221a068f3b Mon Sep 17 00:00:00 2001 From: Dan Enman Date: Tue, 27 Aug 2024 18:01:52 -0300 Subject: [PATCH 3/5] fix: super close receiver in extension trait --- src/asyncdb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/asyncdb.rs b/src/asyncdb.rs index de4d1b9..3c04241 100644 --- a/src/asyncdb.rs +++ b/src/asyncdb.rs @@ -356,7 +356,7 @@ impl ReceiverExt for mpsc::Receiver { } fn close(&mut self) { - self.close(); + mpsc::Receiver::close(self); } } From 67480882e3afd9d2ac500e8b8a60485ed7d8792e Mon Sep 17 00:00:00 2001 From: Dan Enman Date: Tue, 27 Aug 2024 21:02:23 -0300 Subject: [PATCH 4/5] chore: use async_std::main for async-std example --- examples/asyncdb-async-std/Cargo.toml | 4 ++-- examples/asyncdb-async-std/src/main.rs | 4 +--- examples/asyncdb-tokio/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/examples/asyncdb-async-std/Cargo.toml b/examples/asyncdb-async-std/Cargo.toml index 9003075..a48e0fe 100644 --- a/examples/asyncdb-async-std/Cargo.toml +++ b/examples/asyncdb-async-std/Cargo.toml @@ -1,10 +1,10 @@ [package] -name = "asyncdb-tokio" +name = "asyncdb-async-std" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.21", features = ["rt", "macros"] } +async-std = { version = "1.12.0", features = ["attributes"] } rusty-leveldb = { path = "../../", features = ["asyncdb-async-std"] } diff --git a/examples/asyncdb-async-std/src/main.rs b/examples/asyncdb-async-std/src/main.rs index 2bce147..8cf3b2d 100644 --- a/examples/asyncdb-async-std/src/main.rs +++ b/examples/asyncdb-async-std/src/main.rs @@ -1,8 +1,6 @@ -use tokio::main; - use rusty_leveldb::{AsyncDB, Options, Status, StatusCode}; -#[main(flavor = "current_thread")] +#[async_std::main] async fn main() { let adb = AsyncDB::new("testdb", Options::default()).unwrap(); diff --git a/examples/asyncdb-tokio/Cargo.toml b/examples/asyncdb-tokio/Cargo.toml index 5b1ef47..b04eb67 100644 --- a/examples/asyncdb-tokio/Cargo.toml +++ b/examples/asyncdb-tokio/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "asyncdb-async-std" +name = "asyncdb-tokio" version = "0.1.0" edition = "2021" From 7fb3e23f442f44a84785d2cafaf277d91dbba2f1 Mon Sep 17 00:00:00 2001 From: Dan Enman Date: Tue, 27 Aug 2024 21:55:26 -0300 Subject: [PATCH 5/5] chore: refactor async runtimes for AsyncDB --- Cargo.toml | 5 +- src/asyncdb.rs | 287 ++++++++++----------------------------- src/asyncdb_async_std.rs | 79 +++++++++++ src/asyncdb_tokio.rs | 83 +++++++++++ src/lib.rs | 19 ++- 5 files changed, 251 insertions(+), 222 deletions(-) create mode 100644 src/asyncdb_async_std.rs create mode 100644 src/asyncdb_tokio.rs diff --git a/Cargo.toml b/Cargo.toml index 4a09c29..4137f62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,9 +30,8 @@ async-std = { optional = true, version = "1.12.0" } [features] default = ["fs"] async = ["asyncdb-tokio"] -asyncdb-tokio = ["dep:tokio", "asyncdb"] -asyncdb-async-std = ["dep:async-std", "asyncdb"] -asyncdb = [] +asyncdb-tokio = ["tokio"] +asyncdb-async-std = ["async-std"] fs = ["errno", "fs2"] [dev-dependencies] diff --git a/src/asyncdb.rs b/src/asyncdb.rs index 3c04241..1f3a32c 100644 --- a/src/asyncdb.rs +++ b/src/asyncdb.rs @@ -1,28 +1,17 @@ use std::collections::hash_map::HashMap; -use std::path::Path; -use std::sync::Arc; -use crate::{Options, Result, Status, StatusCode, WriteBatch, DB}; +use crate::{ + send_response, send_response_result, AsyncDB, Message, Result, Status, StatusCode, WriteBatch, + DB, +}; -#[cfg(feature = "asyncdb-tokio")] -use tokio::sync::mpsc; -#[cfg(feature = "asyncdb-tokio")] -use tokio::sync::oneshot; -#[cfg(feature = "asyncdb-tokio")] -use tokio::task::{spawn_blocking, JoinHandle}; - -#[cfg(feature = "asyncdb-async-std")] -use async_std::channel; -#[cfg(feature = "asyncdb-async-std")] -use async_std::task::{spawn_blocking, JoinHandle}; - -const CHANNEL_BUFFER_SIZE: usize = 32; +pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32; #[derive(Clone, Copy)] pub struct SnapshotRef(usize); /// A request sent to the database thread. -enum Request { +pub(crate) enum Request { Close, Put { key: Vec, val: Vec }, Delete { key: Vec }, @@ -36,53 +25,14 @@ enum Request { } /// A response received from the database thread. -enum Response { +pub(crate) enum Response { OK, Error(Status), Value(Option>), Snapshot(SnapshotRef), } -/// Contains both a request and a back-channel for the reply. -struct Message { - req: Request, - #[cfg(feature = "asyncdb-tokio")] - resp_channel: oneshot::Sender, - #[cfg(feature = "asyncdb-async-std")] - resp_channel: channel::Sender, -} - -/// `AsyncDB` makes it easy to use LevelDB in a tokio runtime. -/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. -/// -/// TODO: Make it work in other runtimes as well. This is a matter of adapting the blocking thread -/// mechanism as well as the channel types. -#[derive(Clone)] -pub struct AsyncDB { - jh: Arc>, - #[cfg(feature = "asyncdb-tokio")] - send: mpsc::Sender, - #[cfg(feature = "asyncdb-async-std")] - send: channel::Sender, -} - impl AsyncDB { - /// Create a new or open an existing database. - pub fn new>(name: P, opts: Options) -> Result { - let db = DB::open(name, opts)?; - #[cfg(feature = "asyncdb-tokio")] - let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE); - - #[cfg(feature = "asyncdb-async-std")] - let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE); - - let jh = spawn_blocking(move || AsyncDB::run_server(db, recv)); - Ok(AsyncDB { - jh: Arc::new(jh), - send, - }) - } - pub async fn close(&self) -> Result<()> { let r = self.process_request(Request::Close).await?; match r { @@ -201,172 +151,79 @@ impl AsyncDB { } } - async fn process_request(&self, req: Request) -> Result { - #[cfg(feature = "asyncdb-tokio")] - let (tx, rx) = oneshot::channel(); - - #[cfg(feature = "asyncdb-async-std")] - let (tx, rx) = channel::bounded(1); - - let m = Message { - req, - resp_channel: tx, - }; - if let Err(e) = self.send.send(m).await { - return Err(Status { - code: StatusCode::AsyncError, - err: e.to_string(), - }); - } - #[cfg(feature = "asyncdb-tokio")] - let resp = rx.await; + pub(crate) fn run_server(mut db: DB, mut recv: impl ReceiverExt) { + let mut snapshots = HashMap::new(); + let mut snapshot_counter: usize = 0; - #[cfg(feature = "asyncdb-async-std")] - let resp = rx.recv().await; - match resp { - Err(e) => Err(Status { - code: StatusCode::AsyncError, - err: e.to_string(), - }), - Ok(r) => Ok(r), - } - } - - fn _run_server(mut db: DB, mut recv: impl ReceiverExt) { - { - let mut snapshots = HashMap::new(); - let mut snapshot_counter: usize = 0; - - while let Some(message) = recv.blocking_recv() { - match message.req { - Request::Close => { - send_response(message.resp_channel, Response::OK); - recv.close(); - return; - } - Request::Put { key, val } => { - let ok = db.put(&key, &val); - send_response_result(message.resp_channel, ok); - } - Request::Delete { key } => { - let ok = db.delete(&key); - send_response_result(message.resp_channel, ok); - } - Request::Write { batch, sync } => { - let ok = db.write(batch, sync); - send_response_result(message.resp_channel, ok); - } - Request::Flush => { - let ok = db.flush(); - send_response_result(message.resp_channel, ok); - } - Request::GetAt { snapshot, key } => { - let snapshot_id = snapshot.0; - if let Some(snapshot) = snapshots.get(&snapshot_id) { - let ok = db.get_at(snapshot, &key); - match ok { - Err(e) => { - send_response(message.resp_channel, Response::Error(e)); - } - Ok(v) => { - send_response(message.resp_channel, Response::Value(v)); - } - }; - } else { - send_response( - message.resp_channel, - Response::Error(Status { - code: StatusCode::AsyncError, - err: "Unknown snapshot reference: this is a bug".to_string(), - }), - ); - } - } - Request::Get { key } => { - let r = db.get(&key); - send_response(message.resp_channel, Response::Value(r)); - } - Request::GetSnapshot => { - snapshots.insert(snapshot_counter, db.get_snapshot()); - let sref = SnapshotRef(snapshot_counter); - snapshot_counter += 1; - send_response(message.resp_channel, Response::Snapshot(sref)); - } - Request::DropSnapshot { snapshot } => { - snapshots.remove(&snapshot.0); - send_response_result(message.resp_channel, Ok(())); - } - Request::CompactRange { from, to } => { - let ok = db.compact_range(&from, &to); - send_response_result(message.resp_channel, ok); + while let Some(message) = recv.blocking_recv() { + match message.req { + Request::Close => { + send_response(message.resp_channel, Response::OK); + recv.close(); + return; + } + Request::Put { key, val } => { + let ok = db.put(&key, &val); + send_response_result(message.resp_channel, ok); + } + Request::Delete { key } => { + let ok = db.delete(&key); + send_response_result(message.resp_channel, ok); + } + Request::Write { batch, sync } => { + let ok = db.write(batch, sync); + send_response_result(message.resp_channel, ok); + } + Request::Flush => { + let ok = db.flush(); + send_response_result(message.resp_channel, ok); + } + Request::GetAt { snapshot, key } => { + let snapshot_id = snapshot.0; + if let Some(snapshot) = snapshots.get(&snapshot_id) { + let ok = db.get_at(snapshot, &key); + match ok { + Err(e) => { + send_response(message.resp_channel, Response::Error(e)); + } + Ok(v) => { + send_response(message.resp_channel, Response::Value(v)); + } + }; + } else { + send_response( + message.resp_channel, + Response::Error(Status { + code: StatusCode::AsyncError, + err: "Unknown snapshot reference: this is a bug".to_string(), + }), + ); } } + Request::Get { key } => { + let r = db.get(&key); + send_response(message.resp_channel, Response::Value(r)); + } + Request::GetSnapshot => { + snapshots.insert(snapshot_counter, db.get_snapshot()); + let sref = SnapshotRef(snapshot_counter); + snapshot_counter += 1; + send_response(message.resp_channel, Response::Snapshot(sref)); + } + Request::DropSnapshot { snapshot } => { + snapshots.remove(&snapshot.0); + send_response_result(message.resp_channel, Ok(())); + } + Request::CompactRange { from, to } => { + let ok = db.compact_range(&from, &to); + send_response_result(message.resp_channel, ok); + } } } } - - #[cfg(feature = "asyncdb-tokio")] - fn run_server(db: DB, recv: mpsc::Receiver) { - Self::_run_server(db, recv); - } - - #[cfg(feature = "asyncdb-async-std")] - fn run_server(db: DB, recv: channel::Receiver) { - Self::_run_server(db, recv); - } -} - -#[cfg(feature = "asyncdb-tokio")] -fn send_response_result(ch: oneshot::Sender, result: Result<()>) { - if let Err(e) = result { - ch.send(Response::Error(e)).ok(); - } else { - ch.send(Response::OK).ok(); - } } -#[cfg(feature = "asyncdb-async-std")] -fn send_response_result(ch: channel::Sender, result: Result<()>) { - if let Err(e) = result { - ch.try_send(Response::Error(e)).ok(); - } else { - ch.try_send(Response::OK).ok(); - } -} - -#[cfg(feature = "asyncdb-tokio")] -fn send_response(ch: oneshot::Sender, res: Response) { - ch.send(res).ok(); -} - -#[cfg(feature = "asyncdb-async-std")] -fn send_response(ch: channel::Sender, res: Response) { - ch.send_blocking(res).ok(); -} - -trait ReceiverExt { +pub(crate) trait ReceiverExt { fn blocking_recv(&mut self) -> Option; fn close(&mut self); } - -#[cfg(feature = "asyncdb-tokio")] -impl ReceiverExt for mpsc::Receiver { - fn blocking_recv(&mut self) -> Option { - self.blocking_recv() - } - - fn close(&mut self) { - mpsc::Receiver::close(self); - } -} - -#[cfg(feature = "asyncdb-async-std")] -impl ReceiverExt for channel::Receiver { - fn blocking_recv(&mut self) -> Option { - self.recv_blocking().ok() - } - - fn close(&mut self) { - channel::Receiver::close(self); - } -} diff --git a/src/asyncdb_async_std.rs b/src/asyncdb_async_std.rs new file mode 100644 index 0000000..09e23f9 --- /dev/null +++ b/src/asyncdb_async_std.rs @@ -0,0 +1,79 @@ +use std::path::Path; +use std::sync::Arc; + +use async_std::channel; +use async_std::task::{spawn_blocking, JoinHandle}; + +use crate::asyncdb::{ReceiverExt, Request, Response, CHANNEL_BUFFER_SIZE}; +use crate::{Options, Result, Status, StatusCode, DB}; + +pub(crate) struct Message { + pub(crate) req: Request, + pub(crate) resp_channel: channel::Sender, +} +/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime. +/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. +#[derive(Clone)] +pub struct AsyncDB { + jh: Arc>, + send: channel::Sender, +} + +impl AsyncDB { + /// Create a new or open an existing database. + pub fn new>(name: P, opts: Options) -> Result { + let db = DB::open(name, opts)?; + + let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE); + let jh = spawn_blocking(move || AsyncDB::run_server(db, recv)); + Ok(AsyncDB { + jh: Arc::new(jh), + send, + }) + } + + pub(crate) async fn process_request(&self, req: Request) -> Result { + let (tx, rx) = channel::bounded(1); + + let m = Message { + req, + resp_channel: tx, + }; + if let Err(e) = self.send.send(m).await { + return Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }); + } + let resp = rx.recv().await; + match resp { + Err(e) => Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }), + Ok(r) => Ok(r), + } + } +} + +pub(crate) fn send_response_result(ch: channel::Sender, result: Result<()>) { + if let Err(e) = result { + ch.try_send(Response::Error(e)).ok(); + } else { + ch.try_send(Response::OK).ok(); + } +} + +pub(crate) fn send_response(ch: channel::Sender, res: Response) { + ch.send_blocking(res).ok(); +} + +impl ReceiverExt for channel::Receiver { + fn blocking_recv(&mut self) -> Option { + self.recv_blocking().ok() + } + + fn close(&mut self) { + channel::Receiver::close(self); + } +} diff --git a/src/asyncdb_tokio.rs b/src/asyncdb_tokio.rs new file mode 100644 index 0000000..2f2441d --- /dev/null +++ b/src/asyncdb_tokio.rs @@ -0,0 +1,83 @@ +use std::path::Path; +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::task::{spawn_blocking, JoinHandle}; + +use crate::asyncdb::ReceiverExt; +use crate::asyncdb::CHANNEL_BUFFER_SIZE; +use crate::asyncdb::{Request, Response}; +use crate::{Options, Result, Status, StatusCode, DB}; + +pub(crate) struct Message { + pub(crate) req: Request, + pub(crate) resp_channel: oneshot::Sender, +} + +/// `AsyncDB` makes it easy to use LevelDB in a tokio runtime. +/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. +#[derive(Clone)] +pub struct AsyncDB { + jh: Arc>, + send: mpsc::Sender, +} + +impl AsyncDB { + /// Create a new or open an existing database. + pub fn new>(name: P, opts: Options) -> Result { + let db = DB::open(name, opts)?; + let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE); + + let jh = spawn_blocking(move || AsyncDB::run_server(db, recv)); + Ok(AsyncDB { + jh: Arc::new(jh), + send, + }) + } + pub(crate) async fn process_request(&self, req: Request) -> Result { + let (tx, rx) = oneshot::channel(); + + let m = Message { + req, + resp_channel: tx, + }; + if let Err(e) = self.send.send(m).await { + return Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }); + } + let resp = rx.await; + + match resp { + Err(e) => Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }), + Ok(r) => Ok(r), + } + } +} + +pub(crate) fn send_response_result(ch: oneshot::Sender, result: Result<()>) { + if let Err(e) = result { + ch.send(Response::Error(e)).ok(); + } else { + ch.send(Response::OK).ok(); + } +} + +pub(crate) fn send_response(ch: oneshot::Sender, res: Response) { + ch.send(res).ok(); +} + +impl ReceiverExt for mpsc::Receiver { + fn blocking_recv(&mut self) -> Option { + self.blocking_recv() + } + + fn close(&mut self) { + mpsc::Receiver::close(self); + } +} diff --git a/src/lib.rs b/src/lib.rs index e467b79..33fd29e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,9 +40,19 @@ extern crate time_test; #[macro_use] mod infolog; -#[cfg(feature = "asyncdb")] +#[cfg(any(feature = "asyncdb-tokio", feature = "asyncdb-async-std"))] mod asyncdb; +#[cfg(feature = "asyncdb-tokio")] +mod asyncdb_tokio; +#[cfg(feature = "asyncdb-tokio")] +use asyncdb_tokio::{send_response, send_response_result, Message}; + +#[cfg(feature = "asyncdb-async-std")] +mod asyncdb_async_std; +#[cfg(feature = "asyncdb-async-std")] +use asyncdb_async_std::{send_response, send_response_result, Message}; + mod block; mod block_builder; mod blockhandle; @@ -82,9 +92,10 @@ mod db_iter; pub mod compressor; pub mod env; -#[cfg(feature = "asyncdb")] -pub use asyncdb::AsyncDB; - +#[cfg(feature = "asyncdb-async-std")] +pub use asyncdb_async_std::AsyncDB; +#[cfg(feature = "asyncdb-tokio")] +pub use asyncdb_tokio::AsyncDB; pub use cmp::{Cmp, DefaultCmp}; pub use compressor::{Compressor, CompressorId}; pub use db_impl::DB;