Skip to content

Commit

Permalink
impl dbaccess for redb
Browse files Browse the repository at this point in the history
  • Loading branch information
biryukovmaxim committed Sep 12, 2024
1 parent f547bb6 commit 38b3d43
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
1 change: 0 additions & 1 deletion database/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::prelude::DbKey;
use kaspa_hashes::Hash;
use rocksdb::Transaction;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down
36 changes: 24 additions & 12 deletions database/src/redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::errors::StoreError;
use crate::key::DbKey;
use crate::prelude::DbWriter;
use itertools::Either;
use redb::{ReadableTable, TableDefinition};
use redb::TableDefinition;
use std::error::Error;
use std::sync::atomic::{AtomicU64, Ordering};

Expand Down Expand Up @@ -32,21 +32,21 @@ impl DbAccess for Redb {
) -> impl Iterator<Item = Result<(impl AsRef<[u8]>, impl AsRef<[u8]>), Box<dyn Error>>> + '_ {
let prefix = prefix.into();
seek_from.as_ref().inspect(|seek_from| debug_assert!(seek_from.as_ref().starts_with(prefix.as_ref())));

let upper_bound = [&prefix[..prefix.len() - 1], &[prefix[prefix.len() - 1].saturating_add(1)]].concat();
let table = self.db.begin_read().unwrap().open_table(TABLE).unwrap(); // todo change interface to support errors
Iterator::map(
{
if let Some(seek_from) = seek_from {
Either::Left(table.range(seek_from.as_ref()..).unwrap()) // todo change interface to support errors
} else {
Either::Right(table.range(prefix.as_slice()..).unwrap()) // todo change interface to support errors
}
}
.take_while(move |r| r.as_ref().is_ok_and(|(k, _)| k.value().starts_with(&prefix))),
if let Some(seek_from) = seek_from {
Either::Left(table.range(seek_from.as_ref()..upper_bound.as_ref()).unwrap())
// todo change interface to support errors
} else {
Either::Right(table.range(prefix.as_slice()..upper_bound.as_ref()).unwrap())
// todo change interface to support errors
},
|r| r.map(|(k, v)| (k.value().to_vec(), v.value())).map_err(Into::into),
)
}

// todo writer
fn write(&self, _writer: &mut impl DbWriter, db_key: DbKey, data: Vec<u8>) -> Result<(), StoreError> {
let process = || -> Result<_, redb::Error> {
let write_tx = self.db.begin_write()?;
Expand All @@ -60,6 +60,7 @@ impl DbAccess for Redb {
Ok(res?)
}

// todo writer
fn delete(&self, _writer: &mut impl DbWriter, db_key: DbKey) -> Result<(), StoreError> {
let process = || -> Result<_, redb::Error> {
let write_tx = self.db.begin_write()?;
Expand All @@ -73,7 +74,18 @@ impl DbAccess for Redb {
Ok(res?)
}

fn delete_range_by_prefix(&self, writer: &mut impl DbWriter, prefix: &[u8]) -> Result<(), StoreError> {
todo!()
// todo writer
fn delete_range_by_prefix(&self, _writer: &mut impl DbWriter, prefix: &[u8]) -> Result<(), StoreError> {
let upper_bound = [&prefix[..prefix.len() - 1], &[prefix[prefix.len() - 1].saturating_add(1)]].concat();
let process = || -> Result<_, redb::Error> {
let write_tx = self.db.begin_write()?;
let mut table = write_tx.open_table(TABLE)?;
table.retain_in(prefix..upper_bound.as_ref(), |_, _| false)?;
Ok(())
};
self.write_queue_count.fetch_add(1, Ordering::Relaxed);
let res = process();
self.write_queue_count.fetch_sub(1, Ordering::Relaxed);
Ok(res?)
}
}

0 comments on commit 38b3d43

Please sign in to comment.