Skip to content

Commit

Permalink
Merge pull request #48 from enmand/feat/asyncdb-async-std
Browse files Browse the repository at this point in the history
feat: add async-std as an optional runtime for AsyncDB
  • Loading branch information
dermesser authored Sep 1, 2024
2 parents 0c7da61 + 7fb3e23 commit 812d49c
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 94 deletions.
12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ rand = "0.8.5"
snap = "1.0"

errno = { optional = true, version = "0.2" }
fs2 = {optional = true, version = "0.4.3"}
fs2 = { optional = true, version = "0.4.3" }

tokio = { optional = true, features = ["rt", "sync"], version = ">= 1.21" }
tokio = { optional = true, features = ["rt", "sync"], version = "1.39.3" }
async-std = { optional = true, version = "1.12.0" }

[features]
default = ["fs"]
async = ["tokio"]
async = ["asyncdb-tokio"]
asyncdb-tokio = ["tokio"]
asyncdb-async-std = ["async-std"]
fs = ["errno", "fs2"]

[dev-dependencies]
Expand All @@ -46,7 +49,8 @@ members = [
"examples/leveldb-tool",
"examples/word-analyze",
"examples/stresstest",
"examples/asyncdb",
"examples/asyncdb-tokio",
"examples/asyncdb-async-std",
"examples/mcpe",
"examples/kvserver",
]
10 changes: 10 additions & 0 deletions examples/asyncdb-async-std/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "asyncdb-async-std"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
rusty-leveldb = { path = "../../", features = ["asyncdb-async-std"] }
38 changes: 38 additions & 0 deletions examples/asyncdb-async-std/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use rusty_leveldb::{AsyncDB, Options, Status, StatusCode};

#[async_std::main]
async fn main() {
let adb = AsyncDB::new("testdb", Options::default()).unwrap();

adb.put("Hello".as_bytes().to_owned(), "World".as_bytes().to_owned())
.await
.expect("put()");

let r = adb.get("Hello".as_bytes().to_owned()).await;
assert_eq!(r, Ok(Some("World".as_bytes().to_owned())));

let snapshot = adb.get_snapshot().await.expect("get_snapshot()");

adb.delete("Hello".as_bytes().to_owned())
.await
.expect("delete()");

// A snapshot allows us to travel back in time before the deletion.
let r2 = adb.get_at(snapshot, "Hello".as_bytes().to_owned()).await;
assert_eq!(r2, Ok(Some("World".as_bytes().to_owned())));

// Once dropped, a snapshot cannot be used anymore.
adb.drop_snapshot(snapshot).await.expect("drop_snapshot()");

let r3 = adb.get_at(snapshot, "Hello".as_bytes().to_owned()).await;
assert_eq!(
r3,
Err(Status {
code: StatusCode::AsyncError,
err: "Unknown snapshot reference: this is a bug".to_string()
})
);

adb.flush().await.expect("flush()");
adb.close().await.expect("close()");
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "asyncdb"
name = "asyncdb-tokio"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.21", features = ["rt", "macros" ] }
rusty-leveldb = { path = "../../", features = ["async"] }
tokio = { version = "1.21", features = ["rt", "macros"] }
rusty-leveldb = { path = "../../", features = ["asyncdb-tokio"] }
File renamed without changes.
110 changes: 27 additions & 83 deletions src/asyncdb.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
use std::collections::hash_map::HashMap;
use std::path::Path;
use std::sync::Arc;

use crate::{Options, Result, Status, StatusCode, WriteBatch, DB};
use crate::{
send_response, send_response_result, AsyncDB, Message, Result, Status, StatusCode, WriteBatch,
DB,
};

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::{spawn_blocking, JoinHandle};

const CHANNEL_BUFFER_SIZE: usize = 32;
pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32;

#[derive(Clone, Copy)]
pub struct SnapshotRef(usize);

/// A request sent to the database thread.
enum Request {
pub(crate) enum Request {
Close,
Put { key: Vec<u8>, val: Vec<u8> },
Delete { key: Vec<u8> },
Expand All @@ -28,42 +25,14 @@ enum Request {
}

/// A response received from the database thread.
enum Response {
pub(crate) enum Response {
OK,
Error(Status),
Value(Option<Vec<u8>>),
Snapshot(SnapshotRef),
}

/// Contains both a request and a back-channel for the reply.
struct Message {
req: Request,
resp_channel: oneshot::Sender<Response>,
}

/// `AsyncDB` makes it easy to use LevelDB in a tokio runtime.
/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented.
///
/// TODO: Make it work in other runtimes as well. This is a matter of adapting the blocking thread
/// mechanism as well as the channel types.
#[derive(Clone)]
pub struct AsyncDB {
jh: Arc<JoinHandle<()>>,
send: mpsc::Sender<Message>,
}

impl AsyncDB {
/// Create a new or open an existing database.
pub fn new<P: AsRef<Path>>(name: P, opts: Options) -> Result<AsyncDB> {
let db = DB::open(name, opts)?;
let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let jh = spawn_blocking(move || AsyncDB::run_server(db, recv));
Ok(AsyncDB {
jh: Arc::new(jh),
send,
})
}

pub async fn close(&self) -> Result<()> {
let r = self.process_request(Request::Close).await?;
match r {
Expand Down Expand Up @@ -182,104 +151,79 @@ impl AsyncDB {
}
}

async fn process_request(&self, req: Request) -> Result<Response> {
let (tx, rx) = oneshot::channel();
let m = Message {
req,
resp_channel: tx,
};
if let Err(e) = self.send.send(m).await {
return Err(Status {
code: StatusCode::AsyncError,
err: e.to_string(),
});
}
let resp = rx.await;
match resp {
Err(e) => Err(Status {
code: StatusCode::AsyncError,
err: e.to_string(),
}),
Ok(r) => Ok(r),
}
}

fn run_server(mut db: DB, mut recv: mpsc::Receiver<Message>) {
pub(crate) fn run_server(mut db: DB, mut recv: impl ReceiverExt<Message>) {
let mut snapshots = HashMap::new();
let mut snapshot_counter: usize = 0;

while let Some(message) = recv.blocking_recv() {
match message.req {
Request::Close => {
message.resp_channel.send(Response::OK).ok();
send_response(message.resp_channel, Response::OK);
recv.close();
return;
}
Request::Put { key, val } => {
let ok = db.put(&key, &val);
send_response(message.resp_channel, ok);
send_response_result(message.resp_channel, ok);
}
Request::Delete { key } => {
let ok = db.delete(&key);
send_response(message.resp_channel, ok);
send_response_result(message.resp_channel, ok);
}
Request::Write { batch, sync } => {
let ok = db.write(batch, sync);
send_response(message.resp_channel, ok);
send_response_result(message.resp_channel, ok);
}
Request::Flush => {
let ok = db.flush();
send_response(message.resp_channel, ok);
send_response_result(message.resp_channel, ok);
}
Request::GetAt { snapshot, key } => {
let snapshot_id = snapshot.0;
if let Some(snapshot) = snapshots.get(&snapshot_id) {
let ok = db.get_at(snapshot, &key);
match ok {
Err(e) => {
message.resp_channel.send(Response::Error(e)).ok();
send_response(message.resp_channel, Response::Error(e));
}
Ok(v) => {
message.resp_channel.send(Response::Value(v)).ok();
send_response(message.resp_channel, Response::Value(v));
}
};
} else {
message
.resp_channel
.send(Response::Error(Status {
send_response(
message.resp_channel,
Response::Error(Status {
code: StatusCode::AsyncError,
err: "Unknown snapshot reference: this is a bug".to_string(),
}))
.ok();
}),
);
}
}
Request::Get { key } => {
let r = db.get(&key);
message.resp_channel.send(Response::Value(r)).ok();
send_response(message.resp_channel, Response::Value(r));
}
Request::GetSnapshot => {
snapshots.insert(snapshot_counter, db.get_snapshot());
let sref = SnapshotRef(snapshot_counter);
snapshot_counter += 1;
message.resp_channel.send(Response::Snapshot(sref)).ok();
send_response(message.resp_channel, Response::Snapshot(sref));
}
Request::DropSnapshot { snapshot } => {
snapshots.remove(&snapshot.0);
send_response(message.resp_channel, Ok(()));
send_response_result(message.resp_channel, Ok(()));
}
Request::CompactRange { from, to } => {
let ok = db.compact_range(&from, &to);
send_response(message.resp_channel, ok);
send_response_result(message.resp_channel, ok);
}
}
}
}
}

fn send_response(ch: oneshot::Sender<Response>, result: Result<()>) {
if let Err(e) = result {
ch.send(Response::Error(e)).ok();
} else {
ch.send(Response::OK).ok();
}
pub(crate) trait ReceiverExt<T> {
fn blocking_recv(&mut self) -> Option<T>;
fn close(&mut self);
}
79 changes: 79 additions & 0 deletions src/asyncdb_async_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::path::Path;
use std::sync::Arc;

use async_std::channel;
use async_std::task::{spawn_blocking, JoinHandle};

use crate::asyncdb::{ReceiverExt, Request, Response, CHANNEL_BUFFER_SIZE};
use crate::{Options, Result, Status, StatusCode, DB};

pub(crate) struct Message {
pub(crate) req: Request,
pub(crate) resp_channel: channel::Sender<Response>,
}
/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime.
/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented.
#[derive(Clone)]
pub struct AsyncDB {
jh: Arc<JoinHandle<()>>,
send: channel::Sender<Message>,
}

impl AsyncDB {
/// Create a new or open an existing database.
pub fn new<P: AsRef<Path>>(name: P, opts: Options) -> Result<AsyncDB> {
let db = DB::open(name, opts)?;

let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE);
let jh = spawn_blocking(move || AsyncDB::run_server(db, recv));
Ok(AsyncDB {
jh: Arc::new(jh),
send,
})
}

pub(crate) async fn process_request(&self, req: Request) -> Result<Response> {
let (tx, rx) = channel::bounded(1);

let m = Message {
req,
resp_channel: tx,
};
if let Err(e) = self.send.send(m).await {
return Err(Status {
code: StatusCode::AsyncError,
err: e.to_string(),
});
}
let resp = rx.recv().await;
match resp {
Err(e) => Err(Status {
code: StatusCode::AsyncError,
err: e.to_string(),
}),
Ok(r) => Ok(r),
}
}
}

pub(crate) fn send_response_result(ch: channel::Sender<Response>, result: Result<()>) {
if let Err(e) = result {
ch.try_send(Response::Error(e)).ok();
} else {
ch.try_send(Response::OK).ok();
}
}

pub(crate) fn send_response(ch: channel::Sender<Response>, res: Response) {
ch.send_blocking(res).ok();
}

impl<T> ReceiverExt<T> for channel::Receiver<T> {
fn blocking_recv(&mut self) -> Option<T> {
self.recv_blocking().ok()
}

fn close(&mut self) {
channel::Receiver::close(self);
}
}
Loading

0 comments on commit 812d49c

Please sign in to comment.