From 16c61e4df9e26cc6e4c4b4f346688c928c0e67c5 Mon Sep 17 00:00:00 2001
From: Lou Ting <louting.t@alibaba-inc.com>
Date: Tue, 11 Jul 2023 16:57:21 +0800
Subject: [PATCH 1/4] add library target

---
 src/lib.rs | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 129 insertions(+)
 create mode 100644 src/lib.rs

diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 00000000..d722828c
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,129 @@
+// Sonic
+//
+// Fast, lightweight and schema-less search backend
+// Copyright: 2019, Valerian Saliou <valerian@valeriansaliou.name>
+// License: Mozilla Public License v2.0 (MPL v2.0)
+
+#![cfg_attr(feature = "benchmark", feature(test))]
+#![deny(unstable_features, unused_imports, unused_qualifications, clippy::all)]
+
+#[macro_use]
+extern crate log;
+#[macro_use]
+extern crate lazy_static;
+#[macro_use]
+extern crate serde_derive;
+
+pub mod config;
+mod executor;
+mod lexer;
+pub mod query;
+mod stopwords;
+pub mod store;
+
+use std::ops::Deref;
+
+use config::options::Config;
+use config::reader::ConfigReader;
+use query::actions::Query;
+use store::fst::StoreFSTPool;
+use store::kv::StoreKVPool;
+use store::operation::StoreOperationDispatch;
+
+struct AppArgs {
+    config: String,
+}
+
+#[cfg(unix)]
+#[cfg(feature = "allocator-jemalloc")]
+#[global_allocator]
+static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
+
+lazy_static! {
+    static ref APP_ARGS: AppArgs = AppArgs{config: "".to_string()};
+    static ref APP_CONF: Config = ConfigReader::make();
+}
+
+/// called when startup
+pub fn sonic_init(config_path: &str) {
+    // Ensure all statics are valid (a `deref` is enough to lazily initialize them)
+    let app_args: &AppArgs =  APP_ARGS.deref();
+    let p = app_args as *const AppArgs as *mut AppArgs;
+    unsafe {
+        (*p).config = config_path.to_string();
+    }
+
+    let _ = APP_CONF.deref();
+}
+
+/// called when exit
+pub fn sonic_exit() {
+    // Perform a KV flush (ensures all in-memory changes are synced on-disk before shutdown)
+    StoreKVPool::flush(true);
+
+    // Perform a FST consolidation (ensures all in-memory items are synced on-disk before \
+    //   shutdown; otherwise we would lose all non-consolidated FST changes)
+    StoreFSTPool::consolidate(true);
+}
+
+/// called every 10 seconds
+pub fn sonic_tick() {
+    // #1: Janitors
+    StoreKVPool::janitor();
+    StoreFSTPool::janitor();
+
+    // #2: Others
+    StoreKVPool::flush(false);
+    StoreFSTPool::consolidate(false);
+}
+
+/// execute a query
+/// ```ignore
+/// let query = QueryBuilder::push("home", "book", "3-body", "hello 3-body world!", None).unwrap();
+/// let ret = execute_query(query).unwrap();
+/// ```
+pub fn execute_query(query: Query) -> Result<Option<String>, ()> {
+    StoreOperationDispatch::dispatch(query)
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::query::builder::QueryBuilder;
+
+    use super::*;
+
+    #[test]
+    fn test_lib() {
+        // init
+        sonic_init("./config.cfg");
+
+        // push
+        let query = QueryBuilder::push("home", "book", "3-body", "hello 3-body world!", None).unwrap();
+        let ret = execute_query(query).unwrap();
+        println!("push return: {:?}", ret);
+        let query = QueryBuilder::push("home", "book", "sonic-inside", "hello sonic!", None).unwrap();
+        let ret = execute_query(query).unwrap();
+        println!("push return: {:?}", ret);
+        sonic_tick();
+
+        // pop
+        let query = QueryBuilder::pop("home", "book", "sonic inside", "hello sonic!").unwrap();
+        let ret = execute_query(query).unwrap();
+        println!("pop return: {:?}", ret);
+
+        // query
+        let query = QueryBuilder::search("query_id", "home", "book", "hello", 10, 0, None).unwrap();
+        let ret = execute_query(query).unwrap();
+        println!("search return: {:?}", ret);
+        sonic_tick();
+
+        // list
+        let query = QueryBuilder::list("query_id", "home", "book", 10, 0).unwrap();
+        let ret = execute_query(query).unwrap();
+        println!("list return: {:?}", ret);
+        sonic_tick();
+
+        // exit
+        sonic_exit();
+    }
+}
\ No newline at end of file

From 056106a11d3a87fb907fd9da77490e117278764e Mon Sep 17 00:00:00 2001
From: Lou Ting <louting.t@alibaba-inc.com>
Date: Tue, 11 Jul 2023 17:06:59 +0800
Subject: [PATCH 2/4] fmt

---
 src/lib.rs | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/src/lib.rs b/src/lib.rs
index d722828c..fb6a70c3 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -40,14 +40,16 @@ struct AppArgs {
 static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
 
 lazy_static! {
-    static ref APP_ARGS: AppArgs = AppArgs{config: "".to_string()};
+    static ref APP_ARGS: AppArgs = AppArgs {
+        config: "".to_string()
+    };
     static ref APP_CONF: Config = ConfigReader::make();
 }
 
 /// called when startup
 pub fn sonic_init(config_path: &str) {
     // Ensure all statics are valid (a `deref` is enough to lazily initialize them)
-    let app_args: &AppArgs =  APP_ARGS.deref();
+    let app_args: &AppArgs = APP_ARGS.deref();
     let p = app_args as *const AppArgs as *mut AppArgs;
     unsafe {
         (*p).config = config_path.to_string();
@@ -98,19 +100,16 @@ mod tests {
         sonic_init("./config.cfg");
 
         // push
-        let query = QueryBuilder::push("home", "book", "3-body", "hello 3-body world!", None).unwrap();
+        let query =
+            QueryBuilder::push("home", "book", "3-body", "hello 3-body world!", None).unwrap();
         let ret = execute_query(query).unwrap();
         println!("push return: {:?}", ret);
-        let query = QueryBuilder::push("home", "book", "sonic-inside", "hello sonic!", None).unwrap();
+        let query =
+            QueryBuilder::push("home", "book", "sonic-inside", "hello sonic!", None).unwrap();
         let ret = execute_query(query).unwrap();
         println!("push return: {:?}", ret);
         sonic_tick();
 
-        // pop
-        let query = QueryBuilder::pop("home", "book", "sonic inside", "hello sonic!").unwrap();
-        let ret = execute_query(query).unwrap();
-        println!("pop return: {:?}", ret);
-
         // query
         let query = QueryBuilder::search("query_id", "home", "book", "hello", 10, 0, None).unwrap();
         let ret = execute_query(query).unwrap();
@@ -126,4 +125,4 @@ mod tests {
         // exit
         sonic_exit();
     }
-}
\ No newline at end of file
+}

From bd4fab0020f448f21236b70adf9c3e3beaeb288f Mon Sep 17 00:00:00 2001
From: Lou Ting <louting.t@alibaba-inc.com>
Date: Mon, 18 Dec 2023 20:46:54 +0800
Subject: [PATCH 3/4] storekv redb driver

---
 Cargo.lock           |   48 +-
 Cargo.toml           |   14 +-
 src/lib.rs           |    3 +-
 src/store/keyer.rs   |    1 +
 src/store/kv_redb.rs | 1146 ++++++++++++++++++++++++++++++++++++++++++
 src/store/mod.rs     |    7 +-
 6 files changed, 1203 insertions(+), 16 deletions(-)
 create mode 100644 src/store/kv_redb.rs

diff --git a/Cargo.lock b/Cargo.lock
index 10df365c..fdf91305 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -470,12 +470,6 @@ dependencies = [
  "percent-encoding",
 ]
 
-[[package]]
-name = "fs_extra"
-version = "1.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
-
 [[package]]
 name = "fst"
 version = "0.3.5"
@@ -1161,6 +1155,15 @@ dependencies = [
  "getrandom",
 ]
 
+[[package]]
+name = "redb"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08837f9a129bde83c51953b8c96cbb3422b940166b730caa954836106eb1dfd2"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "regex"
 version = "1.10.2"
@@ -1377,11 +1380,14 @@ dependencies = [
  "nix",
  "radix",
  "rand",
+ "redb",
  "regex",
  "regex-syntax 0.8.2",
  "rocksdb",
  "serde",
  "serde_derive",
+ "test-log",
+ "thiserror",
  "tikv-jemallocator",
  "toml",
  "twox-hash",
@@ -1434,6 +1440,27 @@ dependencies = [
  "winapi-util",
 ]
 
+[[package]]
+name = "test-log"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6159ab4116165c99fc88cce31f99fa2c9dbe08d3691cb38da02fc3b45f357d2b"
+dependencies = [
+ "env_logger",
+ "test-log-macros",
+]
+
+[[package]]
+name = "test-log-macros"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ba277e77219e9eea169e8508942db1bf5d8a41ff2db9b20aab5a5aadc9fa25d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "textwrap"
 version = "0.16.0"
@@ -1462,20 +1489,19 @@ dependencies = [
 
 [[package]]
 name = "tikv-jemalloc-sys"
-version = "0.4.3+5.2.1-patched.2"
+version = "0.5.4+5.3.0-patched"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a1792ccb507d955b46af42c123ea8863668fae24d03721e40cad6a41773dbb49"
+checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
 dependencies = [
  "cc",
- "fs_extra",
  "libc",
 ]
 
 [[package]]
 name = "tikv-jemallocator"
-version = "0.4.3"
+version = "0.5.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a5b7bcecfafe4998587d636f9ae9d55eb9d0499877b88757767c346875067098"
+checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
 dependencies = [
  "libc",
  "tikv-jemalloc-sys",
diff --git a/Cargo.toml b/Cargo.toml
index a8684ee3..430f48be 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -26,7 +26,9 @@ serde_derive = "1.0"
 rand = "0.8"
 unicode-segmentation = "1.6"
 radix = "0.6"
-rocksdb = { version = "0.21", features = ["zstd"] }
+rocksdb = { version = "0.21", features = ["zstd"], optional = true }
+redb = { version = "1", optional = true }
+thiserror = "1.0"
 fst = "0.3"
 fst-levenshtein = "0.3"
 fst-regex = "0.3"
@@ -44,18 +46,24 @@ lindera-tokenizer = { version = "0.27", features = ["unidic"], optional = true }
 
 [target.'cfg(unix)'.dependencies]
 nix = "0.18"
-tikv-jemallocator = { version = "0.4", optional = true }
+tikv-jemallocator = { version = "0.5", optional = true }
 
 [target.'cfg(windows)'.dependencies]
 winapi = { version = "0.3", features = ["minwindef", "consoleapi"] }
 
+[dev-dependencies]
+test-log = "0.2"
+
 [features]
-default = ["allocator-jemalloc", "tokenizer-chinese"]
+default = ["allocator-jemalloc", "tokenizer-chinese", "redb"]
 allocator-jemalloc = ["tikv-jemallocator"]
 tokenizer-chinese = ["jieba-rs"]
 tokenizer-japanese = ["lindera-core", "lindera-dictionary", "lindera-tokenizer"]
 benchmark = []
 
+rocksdb = ["dep:rocksdb"]
+redb = ["dep:redb"]
+
 [profile.dev]
 opt-level = 0
 debug = true
diff --git a/src/lib.rs b/src/lib.rs
index fb6a70c3..a41b9e25 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -94,7 +94,7 @@ mod tests {
 
     use super::*;
 
-    #[test]
+    #[test_log::test]
     fn test_lib() {
         // init
         sonic_init("./config.cfg");
@@ -104,6 +104,7 @@ mod tests {
             QueryBuilder::push("home", "book", "3-body", "hello 3-body world!", None).unwrap();
         let ret = execute_query(query).unwrap();
         println!("push return: {:?}", ret);
+
         let query =
             QueryBuilder::push("home", "book", "sonic-inside", "hello sonic!", None).unwrap();
         let ret = execute_query(query).unwrap();
diff --git a/src/store/keyer.rs b/src/store/keyer.rs
index 28a613e5..720ada78 100644
--- a/src/store/keyer.rs
+++ b/src/store/keyer.rs
@@ -14,6 +14,7 @@ use super::identifiers::*;
 
 pub struct StoreKeyerBuilder;
 
+#[repr(transparent)]
 pub struct StoreKeyer {
     key: StoreKeyerKey,
 }
diff --git a/src/store/kv_redb.rs b/src/store/kv_redb.rs
new file mode 100644
index 00000000..75787659
--- /dev/null
+++ b/src/store/kv_redb.rs
@@ -0,0 +1,1146 @@
+// Sonic
+//
+// Fast, lightweight and schema-less search backend
+// Copyright: 2019, Valerian Saliou <valerian@valeriansaliou.name>
+// License: Mozilla Public License v2.0 (MPL v2.0)
+
+use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
+use hashbrown::HashMap;
+
+use redb::{Builder, Database, ReadableTable, TableDefinition};
+
+use std::fmt;
+use std::fs;
+use std::io::{self, Cursor};
+use std::path::{Path, PathBuf};
+use std::str;
+use std::sync::{Arc, Mutex, RwLock};
+use std::thread;
+use std::time::{Duration, SystemTime};
+use std::vec::Drain;
+
+use super::generic::{
+    StoreGeneric, StoreGenericActionBuilder, StoreGenericBuilder, StoreGenericPool,
+};
+use super::identifiers::*;
+use super::item::StoreItemPart;
+use super::keyer::{StoreKeyerBuilder, StoreKeyerHasher, StoreKeyerKey, StoreKeyerPrefix};
+use crate::APP_CONF;
+
+pub struct StoreKVPool;
+pub struct StoreKVBuilder;
+
+pub struct StoreKV {
+    database: Database,
+    last_used: Arc<RwLock<SystemTime>>,
+    last_flushed: Arc<RwLock<SystemTime>>,
+    pub lock: RwLock<bool>,
+}
+
+pub struct StoreKVActionBuilder;
+
+pub struct StoreKVAction<'a> {
+    store: Option<StoreKVBox>,
+    bucket: StoreItemPart<'a>,
+}
+
+#[derive(PartialEq, Eq, Hash, Clone, Copy)]
+pub struct StoreKVKey {
+    collection_hash: StoreKVAtom,
+}
+
+#[derive(PartialEq)]
+pub enum StoreKVAcquireMode {
+    Any,
+    OpenOnly,
+}
+
+type StoreKVAtom = u32;
+type StoreKVBox = Arc<StoreKV>;
+
+// const ATOM_HASH_RADIX: usize = 16;
+
+lazy_static! {
+    pub static ref STORE_ACCESS_LOCK: Arc<RwLock<bool>> = Arc::new(RwLock::new(false));
+    static ref STORE_ACQUIRE_LOCK: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
+    static ref STORE_FLUSH_LOCK: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
+    static ref STORE_POOL: Arc<RwLock<HashMap<StoreKVKey, StoreKVBox>>> =
+        Arc::new(RwLock::new(HashMap::new()));
+}
+
+const TABLE: TableDefinition<&[u8], Vec<u8>> = TableDefinition::new("sonic");
+
+#[derive(Debug, thiserror::Error)]
+pub enum RedbStoreKVError {
+    #[error("{0}")]
+    DatabaseError(#[from] redb::DatabaseError),
+
+    #[error("{0}")]
+    StorageError(#[from] redb::StorageError),
+
+    #[error("{0}")]
+    TransactionError(#[from] redb::TransactionError),
+
+    #[error("{0}")]
+    CommitError(#[from] redb::CommitError),
+
+    #[error("{0}")]
+    TableError(#[from] redb::TableError),
+}
+
+struct StoreKeyerRange {
+    start: StoreKeyerKey,
+    end: StoreKeyerKey,
+}
+impl std::ops::RangeBounds<StoreKeyerKey> for StoreKeyerRange {
+    fn start_bound(&self) -> std::ops::Bound<&StoreKeyerKey> {
+        std::ops::Bound::Included(&self.start)
+    }
+
+    fn end_bound(&self) -> std::ops::Bound<&StoreKeyerKey> {
+        std::ops::Bound::Included(&self.end)
+    }
+}
+
+impl StoreKVPool {
+    pub fn count() -> usize {
+        STORE_POOL.read().unwrap().len()
+    }
+
+    pub fn acquire<'a, T: Into<&'a str>>(
+        mode: StoreKVAcquireMode,
+        collection: T,
+    ) -> Result<Option<StoreKVBox>, ()> {
+        let collection_str = collection.into();
+        let pool_key = StoreKVKey::from_str(collection_str);
+
+        // Freeze acquire lock, and reference it in context
+        // Notice: this prevents two databases on the same collection to be opened at the same time.
+        let _acquire = STORE_ACQUIRE_LOCK.lock().unwrap();
+
+        // Acquire a thread-safe store pool reference in read mode
+        let store_pool_read = STORE_POOL.read().unwrap();
+
+        if let Some(store_kv) = store_pool_read.get(&pool_key) {
+            Self::proceed_acquire_cache("kv", collection_str, pool_key, store_kv).map(Some)
+        } else {
+            info!(
+                "kv store not in pool for collection: {} {}, opening it",
+                collection_str, pool_key
+            );
+
+            // Important: we need to drop the read reference first, to avoid \
+            //   dead-locking when acquiring the RWLock in write mode in this block.
+            drop(store_pool_read);
+
+            // Check if can open database?
+            let can_open_db = if mode == StoreKVAcquireMode::OpenOnly {
+                StoreKVBuilder::path(pool_key.collection_hash).exists()
+            } else {
+                true
+            };
+
+            // Open KV database? (ie. we do not need to create a new KV database file tree if \
+            //   the database does not exist yet on disk and we are just looking to read data from \
+            //   it)
+            if can_open_db {
+                Self::proceed_acquire_open("kv", collection_str, pool_key, &*STORE_POOL).map(Some)
+            } else {
+                Ok(None)
+            }
+        }
+    }
+
+    pub fn janitor() {
+        Self::proceed_janitor(
+            "kv",
+            &*STORE_POOL,
+            APP_CONF.store.kv.pool.inactive_after,
+            &*STORE_ACCESS_LOCK,
+        )
+    }
+
+    pub fn backup(path: &Path) -> Result<(), io::Error> {
+        debug!("backing up all kv stores to path: {:?}", path);
+
+        // Create backup directory (full path)
+        fs::create_dir_all(path)?;
+
+        // Proceed dump action (backup)
+        Self::dump_action("backup", &*APP_CONF.store.kv.path, path, &Self::backup_item)
+    }
+
+    pub fn restore(path: &Path) -> Result<(), io::Error> {
+        debug!("restoring all kv stores from path: {:?}", path);
+
+        // Proceed dump action (restore)
+        Self::dump_action(
+            "restore",
+            path,
+            &*APP_CONF.store.kv.path,
+            &Self::restore_item,
+        )
+    }
+
+    pub fn flush(force: bool) {
+        debug!("scanning for kv store pool items to flush to disk");
+
+        // Acquire flush lock, and reference it in context
+        // Notice: this prevents two flush operations to be executed at the same time.
+        let _flush = STORE_FLUSH_LOCK.lock().unwrap();
+
+        // Step 1: List keys to be flushed
+        let mut keys_flush: Vec<StoreKVKey> = Vec::new();
+
+        {
+            // Acquire access lock (in blocking write mode), and reference it in context
+            // Notice: this prevents store to be acquired from any context
+            let _access = STORE_ACCESS_LOCK.write().unwrap();
+
+            let store_pool_read = STORE_POOL.read().unwrap();
+
+            for (key, store) in &*store_pool_read {
+                // Important: be lenient with system clock going back to a past duration, since \
+                //   we may be running in a virtualized environment where clock is not guaranteed \
+                //   to be monotonic. This is done to avoid poisoning associated mutexes by \
+                //   crashing on unwrap().
+                let not_flushed_for = store
+                    .last_flushed
+                    .read()
+                    .unwrap()
+                    .elapsed()
+                    .unwrap_or_else(|err| {
+                        error!(
+                            "kv key: {} last flush duration clock issue, zeroing: {}",
+                            key, err
+                        );
+
+                        // Assuming a zero seconds fallback duration
+                        Duration::from_secs(0)
+                    })
+                    .as_secs();
+
+                if force || not_flushed_for >= APP_CONF.store.kv.database.flush_after {
+                    info!(
+                        "kv key: {} not flushed for: {} seconds, may flush",
+                        key, not_flushed_for
+                    );
+
+                    keys_flush.push(*key);
+                } else {
+                    debug!(
+                        "kv key: {} not flushed for: {} seconds, no flush",
+                        key, not_flushed_for
+                    );
+                }
+            }
+        }
+
+        // Exit trap: Nothing to flush yet? Abort there.
+        if keys_flush.is_empty() {
+            info!("no kv store pool items need to be flushed at the moment");
+
+            return;
+        }
+
+        // Step 2: Flush KVs, one-by-one (sequential locking; this avoids global locks)
+        let mut count_flushed = 0;
+
+        {
+            for key in &keys_flush {
+                {
+                    // Acquire access lock (in blocking write mode), and reference it in context
+                    // Notice: this prevents store to be acquired from any context
+                    let _access = STORE_ACCESS_LOCK.write().unwrap();
+
+                    if let Some(store) = STORE_POOL.read().unwrap().get(key) {
+                        debug!("kv key: {} flush started", key);
+
+                        if let Err(err) = store.flush() {
+                            error!("kv key: {} flush failed: {}", key, err);
+                        } else {
+                            count_flushed += 1;
+
+                            debug!("kv key: {} flush complete", key);
+                        }
+
+                        // Bump 'last flushed' time
+                        *store.last_flushed.write().unwrap() = SystemTime::now();
+                    }
+                }
+
+                // Give a bit of time to other threads before continuing
+                thread::yield_now();
+            }
+        }
+
+        info!(
+            "done scanning for kv store pool items to flush to disk (flushed: {})",
+            count_flushed
+        );
+    }
+
+    fn dump_action(
+        action: &str,
+        read_path: &Path,
+        write_path: &Path,
+        fn_item: &dyn Fn(&Path, &Path, &str) -> Result<(), io::Error>,
+    ) -> Result<(), io::Error> {
+        // Iterate on KV collections
+        for collection in fs::read_dir(read_path)? {
+            let collection = collection?;
+
+            // Actual collection found?
+            if let (Ok(collection_file_type), Some(collection_name)) =
+                (collection.file_type(), collection.file_name().to_str())
+            {
+                if collection_file_type.is_dir() {
+                    debug!("kv collection ongoing {}: {}", action, collection_name);
+
+                    fn_item(write_path, &collection.path(), collection_name)?;
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    fn backup_item(
+        _backup_path: &Path,
+        _origin_path: &Path,
+        _collection_name: &str,
+    ) -> Result<(), io::Error> {
+        Ok(())
+    }
+
+    fn restore_item(
+        _backup_path: &Path,
+        _origin_path: &Path,
+        _collection_name: &str,
+    ) -> Result<(), io::Error> {
+        Ok(())
+    }
+}
+
+impl StoreGenericPool<StoreKVKey, StoreKV, StoreKVBuilder> for StoreKVPool {}
+
+impl StoreKVBuilder {
+    fn open(collection_hash: StoreKVAtom) -> Result<Database, RedbStoreKVError> {
+        debug!(
+            "opening key-value database for collection: <{:x?}>",
+            collection_hash
+        );
+
+        // Configure database options
+        let builder = Self::configure();
+
+        // Open database at path for collection
+        let db = builder.create(Self::path(collection_hash))?;
+        let write_txn = db.begin_write()?;
+        {
+            let mut _table = write_txn.open_table(TABLE)?;
+            // let key: StoreKeyerKey = [0, 0, 0, 0, 0, 0, 0, 0, 0];
+            // let k1: &[u8] = &key;
+            // table.insert(k1, key.to_vec())?;
+            // table.remove(k1)?;
+        }
+        write_txn.commit()?;
+
+        Ok(db)
+    }
+
+    fn close(collection_hash: StoreKVAtom) {
+        debug!(
+            "closing key-value database for collection: <{:x?}>",
+            collection_hash
+        );
+
+        let mut store_pool_write = STORE_POOL.write().unwrap();
+
+        let collection_target = StoreKVKey::from_atom(collection_hash);
+
+        store_pool_write.remove(&collection_target);
+    }
+
+    fn path(collection_hash: StoreKVAtom) -> PathBuf {
+        APP_CONF
+            .store
+            .kv
+            .path
+            .join(format!("{:x?}.redb", collection_hash))
+    }
+
+    fn configure() -> Builder {
+        debug!("configuring key-value database");
+
+        // Make database options
+        let mut db_options = Builder::new();
+
+        db_options.set_cache_size(APP_CONF.store.kv.database.write_buffer * 1024);
+
+        db_options
+    }
+}
+
+impl StoreGenericBuilder<StoreKVKey, StoreKV> for StoreKVBuilder {
+    fn build(pool_key: StoreKVKey) -> Result<StoreKV, ()> {
+        Self::open(pool_key.collection_hash)
+            .map(|db| {
+                let now = SystemTime::now();
+
+                StoreKV {
+                    database: db,
+                    last_used: Arc::new(RwLock::new(now)),
+                    last_flushed: Arc::new(RwLock::new(now)),
+                    lock: RwLock::new(false),
+                }
+            })
+            .map_err(|err| {
+                error!("failed opening kv: {}", err);
+            })
+    }
+}
+
+impl StoreKV {
+    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, RedbStoreKVError> {
+        let read_txn = self.database.begin_read()?;
+        let table = read_txn.open_table(TABLE)?;
+        let v = table.get(key)?.map(|v| v.value());
+        Ok(v)
+    }
+
+    pub fn put(&self, key: &[u8], data: &[u8]) -> Result<(), RedbStoreKVError> {
+        let write_txn = self.database.begin_write()?;
+        {
+            let mut table = write_txn.open_table(TABLE)?;
+            let _v = table.insert(key, data.to_vec())?;
+        }
+        write_txn.commit()?;
+        Ok(())
+    }
+
+    pub fn delete(&self, key: &[u8]) -> Result<(), RedbStoreKVError> {
+        let write_txn = self.database.begin_write()?;
+        {
+            let mut table = write_txn.open_table(TABLE)?;
+            let _v = table.remove(key)?;
+        }
+        write_txn.commit()?;
+        Ok(())
+    }
+
+    pub fn delete_range(
+        &self,
+        key_start: StoreKeyerKey,
+        key_end: StoreKeyerKey,
+    ) -> Result<(), RedbStoreKVError> {
+        let write_txn = self.database.begin_write()?;
+        let mut _table = write_txn.open_table(TABLE)?;
+        let _range = StoreKeyerRange {
+            start: key_start,
+            end: key_end,
+        };
+        // let v = table.drain(range)?;
+        Ok(())
+    }
+
+    fn flush(&self) -> Result<(), RedbStoreKVError> {
+        Ok(())
+    }
+}
+
+impl StoreGeneric for StoreKV {
+    fn ref_last_used(&self) -> &RwLock<SystemTime> {
+        &self.last_used
+    }
+}
+
+impl StoreKVActionBuilder {
+    pub fn access(bucket: StoreItemPart, store: Option<StoreKVBox>) -> StoreKVAction {
+        Self::build(bucket, store)
+    }
+
+    pub fn erase<'a, T: Into<&'a str>>(collection: T, bucket: Option<T>) -> Result<u32, ()> {
+        Self::dispatch_erase("kv", collection, bucket)
+    }
+
+    fn build(bucket: StoreItemPart, store: Option<StoreKVBox>) -> StoreKVAction {
+        StoreKVAction { store, bucket }
+    }
+}
+
+impl StoreGenericActionBuilder for StoreKVActionBuilder {
+    fn proceed_erase_collection(collection_str: &str) -> Result<u32, ()> {
+        let collection_atom = StoreKeyerHasher::to_compact(collection_str);
+        let collection_path = StoreKVBuilder::path(collection_atom);
+
+        // Force a KV store close
+        StoreKVBuilder::close(collection_atom);
+
+        if collection_path.exists() {
+            debug!(
+                "kv collection store exists, erasing: {}/* at path: {:?}",
+                collection_str, &collection_path
+            );
+
+            // Remove KV store storage from filesystem
+            let erase_result = fs::remove_dir_all(&collection_path);
+
+            if erase_result.is_ok() {
+                debug!("done with kv collection erasure");
+
+                Ok(1)
+            } else {
+                Err(())
+            }
+        } else {
+            debug!(
+                "kv collection store does not exist, consider already erased: {}/* at path: {:?}",
+                collection_str, &collection_path
+            );
+
+            Ok(0)
+        }
+    }
+
+    fn proceed_erase_bucket(_collection: &str, _bucket: &str) -> Result<u32, ()> {
+        // This one is not implemented, as we need to acquire the collection; which would cause \
+        //   a party-killer dead-lock.
+        Err(())
+    }
+}
+
+impl<'a> StoreKVAction<'a> {
+    /// Meta-to-Value mapper
+    ///
+    /// [IDX=0] ((meta)) ~> ((value))
+    pub fn get_meta_to_value(&self, meta: StoreMetaKey) -> Result<Option<StoreMetaValue>, ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::meta_to_value(self.bucket.as_str(), &meta);
+
+            debug!("store get meta-to-value: {}", store_key);
+
+            match store.get(&store_key.as_bytes()) {
+                Ok(Some(value)) => {
+                    debug!("got meta-to-value: {}", store_key);
+
+                    Ok(if let Ok(value) = str::from_utf8(&value) {
+                        match meta {
+                            StoreMetaKey::IIDIncr => value
+                                .parse::<StoreObjectIID>()
+                                .ok()
+                                .map(StoreMetaValue::IIDIncr)
+                                .or(None),
+                        }
+                    } else {
+                        None
+                    })
+                }
+                Ok(None) => {
+                    debug!("no meta-to-value found: {}", store_key);
+
+                    Ok(None)
+                }
+                Err(err) => {
+                    error!(
+                        "error getting meta-to-value: {} with trace: {}",
+                        store_key, err
+                    );
+
+                    Err(())
+                }
+            }
+        } else {
+            Ok(None)
+        }
+    }
+
+    pub fn set_meta_to_value(&self, meta: StoreMetaKey, value: StoreMetaValue) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::meta_to_value(self.bucket.as_str(), &meta);
+
+            debug!("store set meta-to-value: {}", store_key);
+
+            let value_string = match value {
+                StoreMetaValue::IIDIncr(iid_incr) => iid_incr.to_string(),
+            };
+
+            store
+                .put(&store_key.as_bytes(), value_string.as_bytes())
+                .or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    /// Term-to-IIDs mapper
+    ///
+    /// [IDX=1] ((term)) ~> [((iid))]
+    pub fn get_term_to_iids(
+        &self,
+        term_hashed: StoreTermHashed,
+    ) -> Result<Option<Vec<StoreObjectIID>>, ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::term_to_iids(self.bucket.as_str(), term_hashed);
+
+            debug!("store get term-to-iids: {}", store_key);
+
+            match store.get(&store_key.as_bytes()) {
+                Ok(Some(value)) => {
+                    debug!(
+                        "got term-to-iids: {} with encoded value: {:?}",
+                        store_key, &*value
+                    );
+
+                    Self::decode_u32_list(&*value)
+                        .or(Err(()))
+                        .map(|value_decoded| {
+                            debug!(
+                                "got term-to-iids: {} with decoded value: {:?}",
+                                store_key, &value_decoded
+                            );
+
+                            Some(value_decoded)
+                        })
+                }
+                Ok(None) => {
+                    debug!("no term-to-iids found: {}", store_key);
+
+                    Ok(None)
+                }
+                Err(err) => {
+                    error!(
+                        "error getting term-to-iids: {} with trace: {}",
+                        store_key, err
+                    );
+
+                    Err(())
+                }
+            }
+        } else {
+            Ok(None)
+        }
+    }
+
+    pub fn set_term_to_iids(
+        &self,
+        term_hashed: StoreTermHashed,
+        iids: &[StoreObjectIID],
+    ) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::term_to_iids(self.bucket.as_str(), term_hashed);
+
+            debug!("store set term-to-iids: {}", store_key);
+
+            // Encode IID list into storage serialized format
+            let iids_encoded = Self::encode_u32_list(iids);
+
+            debug!(
+                "store set term-to-iids: {} with encoded value: {:?}",
+                store_key, iids_encoded
+            );
+
+            store.put(&store_key.as_bytes(), &iids_encoded).or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    pub fn delete_term_to_iids(&self, term_hashed: StoreTermHashed) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::term_to_iids(self.bucket.as_str(), term_hashed);
+
+            debug!("store delete term-to-iids: {}", store_key);
+
+            store.delete(&store_key.as_bytes()).or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    /// OID-to-IID mapper
+    ///
+    /// [IDX=2] ((oid)) ~> ((iid))
+    pub fn get_oid_to_iid(&self, oid: StoreObjectOID<'a>) -> Result<Option<StoreObjectIID>, ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::oid_to_iid(self.bucket.as_str(), oid);
+
+            debug!("store get oid-to-iid: {}", store_key);
+
+            match store.get(&store_key.as_bytes()) {
+                Ok(Some(value)) => {
+                    debug!(
+                        "got oid-to-iid: {} with encoded value: {:?}",
+                        store_key, &*value
+                    );
+
+                    Self::decode_u32(&*value).or(Err(())).map(|value_decoded| {
+                        debug!(
+                            "got oid-to-iid: {} with decoded value: {:?}",
+                            store_key, &value_decoded
+                        );
+
+                        Some(value_decoded)
+                    })
+                }
+                Ok(None) => {
+                    debug!("no oid-to-iid found: {}", store_key);
+
+                    Ok(None)
+                }
+                Err(err) => {
+                    error!(
+                        "error getting oid-to-iid: {} with trace: {}",
+                        store_key, err
+                    );
+
+                    Err(())
+                }
+            }
+        } else {
+            Ok(None)
+        }
+    }
+
+    pub fn set_oid_to_iid(&self, oid: StoreObjectOID<'a>, iid: StoreObjectIID) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::oid_to_iid(self.bucket.as_str(), oid);
+
+            debug!("store set oid-to-iid: {}", store_key);
+
+            // Encode IID
+            let iid_encoded = Self::encode_u32(iid);
+
+            debug!(
+                "store set oid-to-iid: {} with encoded value: {:?}",
+                store_key, iid_encoded
+            );
+
+            store.put(&store_key.as_bytes(), &iid_encoded).or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    pub fn delete_oid_to_iid(&self, oid: StoreObjectOID<'a>) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::oid_to_iid(self.bucket.as_str(), oid);
+
+            debug!("store delete oid-to-iid: {}", store_key);
+
+            store.delete(&store_key.as_bytes()).or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    /// IID-to-OID mapper
+    ///
+    /// [IDX=3] ((iid)) ~> ((oid))
+    pub fn get_iid_to_oid(&self, iid: StoreObjectIID) -> Result<Option<String>, ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::iid_to_oid(self.bucket.as_str(), iid);
+
+            debug!("store get iid-to-oid: {}", store_key);
+
+            match store.get(&store_key.as_bytes()) {
+                Ok(Some(value)) => Ok(str::from_utf8(&value).ok().map(|value| value.to_string())),
+                Ok(None) => Ok(None),
+                Err(_) => Err(()),
+            }
+        } else {
+            Ok(None)
+        }
+    }
+
+    pub fn set_iid_to_oid(&self, iid: StoreObjectIID, oid: StoreObjectOID<'a>) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::iid_to_oid(self.bucket.as_str(), iid);
+
+            debug!("store set iid-to-oid: {}", store_key);
+
+            store.put(&store_key.as_bytes(), oid.as_bytes()).or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    pub fn delete_iid_to_oid(&self, iid: StoreObjectIID) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::iid_to_oid(self.bucket.as_str(), iid);
+
+            debug!("store delete iid-to-oid: {}", store_key);
+
+            store.delete(&store_key.as_bytes()).or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    /// IID-to-Terms mapper
+    ///
+    /// [IDX=4] ((iid)) ~> [((term))]
+    pub fn get_iid_to_terms(
+        &self,
+        iid: StoreObjectIID,
+    ) -> Result<Option<Vec<StoreTermHashed>>, ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::iid_to_terms(self.bucket.as_str(), iid);
+
+            debug!("store get iid-to-terms: {}", store_key);
+
+            match store.get(&store_key.as_bytes()) {
+                Ok(Some(value)) => {
+                    debug!(
+                        "got iid-to-terms: {} with encoded value: {:?}",
+                        store_key, &*value
+                    );
+
+                    Self::decode_u32_list(&*value)
+                        .or(Err(()))
+                        .map(|value_decoded| {
+                            debug!(
+                                "got iid-to-terms: {} with decoded value: {:?}",
+                                store_key, &value_decoded
+                            );
+
+                            if !value_decoded.is_empty() {
+                                Some(value_decoded)
+                            } else {
+                                None
+                            }
+                        })
+                }
+                Ok(None) => Ok(None),
+                Err(_) => Err(()),
+            }
+        } else {
+            Ok(None)
+        }
+    }
+
+    pub fn set_iid_to_terms(
+        &self,
+        iid: StoreObjectIID,
+        terms_hashed: &[StoreTermHashed],
+    ) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::iid_to_terms(self.bucket.as_str(), iid);
+
+            debug!("store set iid-to-terms: {}", store_key);
+
+            // Encode term list into storage serialized format
+            let terms_hashed_encoded = Self::encode_u32_list(terms_hashed);
+
+            debug!(
+                "store set iid-to-terms: {} with encoded value: {:?}",
+                store_key, terms_hashed_encoded
+            );
+
+            store
+                .put(&store_key.as_bytes(), &terms_hashed_encoded)
+                .or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    pub fn delete_iid_to_terms(&self, iid: StoreObjectIID) -> Result<(), ()> {
+        if let Some(ref store) = self.store {
+            let store_key = StoreKeyerBuilder::iid_to_terms(self.bucket.as_str(), iid);
+
+            debug!("store delete iid-to-terms: {}", store_key);
+
+            store.delete(&store_key.as_bytes()).or(Err(()))
+        } else {
+            Err(())
+        }
+    }
+
+    pub fn batch_flush_bucket(
+        &self,
+        iid: StoreObjectIID,
+        oid: StoreObjectOID<'a>,
+        iid_terms_hashed: &[StoreTermHashed],
+    ) -> Result<u32, ()> {
+        let mut count = 0;
+
+        debug!(
+            "store batch flush bucket: {} with hashed terms: {:?}",
+            iid, iid_terms_hashed
+        );
+
+        // Delete OID <> IID association
+        match (
+            self.delete_oid_to_iid(oid),
+            self.delete_iid_to_oid(iid),
+            self.delete_iid_to_terms(iid),
+        ) {
+            (Ok(_), Ok(_), Ok(_)) => {
+                // Delete IID from each associated term
+                for iid_term in iid_terms_hashed {
+                    if let Ok(Some(mut iid_term_iids)) = self.get_term_to_iids(*iid_term) {
+                        if iid_term_iids.contains(&iid) {
+                            count += 1;
+
+                            // Remove IID from list of IIDs
+                            iid_term_iids.retain(|cur_iid| cur_iid != &iid);
+                        }
+
+                        let is_ok = if iid_term_iids.is_empty() {
+                            self.delete_term_to_iids(*iid_term).is_ok()
+                        } else {
+                            self.set_term_to_iids(*iid_term, &iid_term_iids).is_ok()
+                        };
+
+                        if !is_ok {
+                            return Err(());
+                        }
+                    }
+                }
+
+                Ok(count)
+            }
+            _ => Err(()),
+        }
+    }
+
+    pub fn batch_truncate_object(
+        &self,
+        term_hashed: StoreTermHashed,
+        term_iids_drain: Drain<StoreObjectIID>,
+    ) -> Result<u32, ()> {
+        let mut count = 0;
+
+        for term_iid_drain in term_iids_drain {
+            debug!("store batch truncate object iid: {}", term_iid_drain);
+
+            // Nuke term in IID to Terms list
+            if let Ok(Some(mut term_iid_drain_terms)) = self.get_iid_to_terms(term_iid_drain) {
+                count += 1;
+
+                term_iid_drain_terms.retain(|cur_term| cur_term != &term_hashed);
+
+                // IID to Terms list is empty? Flush whole object.
+                if term_iid_drain_terms.is_empty() {
+                    // Acquire OID for this drained IID
+                    if let Ok(Some(term_iid_drain_oid)) = self.get_iid_to_oid(term_iid_drain) {
+                        if self
+                            .batch_flush_bucket(term_iid_drain, &term_iid_drain_oid, &Vec::new())
+                            .is_err()
+                        {
+                            error!(
+                                "failed executing store batch truncate object batch-flush-bucket"
+                            );
+                        }
+                    } else {
+                        error!("failed getting store batch truncate object iid-to-oid");
+                    }
+                } else {
+                    // Update IID to Terms list
+                    if self
+                        .set_iid_to_terms(term_iid_drain, &term_iid_drain_terms)
+                        .is_err()
+                    {
+                        error!("failed setting store batch truncate object iid-to-terms");
+                    }
+                }
+            }
+        }
+
+        Ok(count)
+    }
+
+    pub fn batch_erase_bucket(&self) -> Result<u32, ()> {
+        if let Some(ref store) = self.store {
+            // Generate all key prefix values (with dummy post-prefix values; we dont care)
+            let (k_meta_to_value, k_term_to_iids, k_oid_to_iid, k_iid_to_oid, k_iid_to_terms) = (
+                StoreKeyerBuilder::meta_to_value(self.bucket.as_str(), &StoreMetaKey::IIDIncr),
+                StoreKeyerBuilder::term_to_iids(self.bucket.as_str(), 0),
+                StoreKeyerBuilder::oid_to_iid(self.bucket.as_str(), &String::new()),
+                StoreKeyerBuilder::iid_to_oid(self.bucket.as_str(), 0),
+                StoreKeyerBuilder::iid_to_terms(self.bucket.as_str(), 0),
+            );
+
+            let key_prefixes: [StoreKeyerPrefix; 5] = [
+                k_meta_to_value.as_prefix(),
+                k_term_to_iids.as_prefix(),
+                k_oid_to_iid.as_prefix(),
+                k_iid_to_oid.as_prefix(),
+                k_iid_to_terms.as_prefix(),
+            ];
+
+            // Scan all keys per-prefix and nuke them right away
+            for key_prefix in &key_prefixes {
+                debug!(
+                    "store batch erase bucket: {} for prefix: {:?}",
+                    self.bucket.as_str(),
+                    key_prefix
+                );
+
+                // Generate start and end prefix for batch delete (in other words, the minimum \
+                //   key value possible, and the highest key value possible)
+                let key_prefix_start: StoreKeyerKey = [
+                    key_prefix[0],
+                    key_prefix[1],
+                    key_prefix[2],
+                    key_prefix[3],
+                    key_prefix[4],
+                    0,
+                    0,
+                    0,
+                    0,
+                ];
+                let key_prefix_end: StoreKeyerKey = [
+                    key_prefix[0],
+                    key_prefix[1],
+                    key_prefix[2],
+                    key_prefix[3],
+                    key_prefix[4],
+                    255,
+                    255,
+                    255,
+                    255,
+                ];
+
+                // Batch-delete keys matching range
+                let _ = store.delete_range(key_prefix_start, key_prefix_end);
+            }
+
+            info!(
+                "done processing store batch erase bucket: {}",
+                self.bucket.as_str()
+            );
+
+            Ok(1)
+        } else {
+            Err(())
+        }
+    }
+
+    fn encode_u32(decoded: u32) -> [u8; 4] {
+        let mut encoded = [0; 4];
+
+        LittleEndian::write_u32(&mut encoded, decoded);
+
+        encoded
+    }
+
+    fn decode_u32(encoded: &[u8]) -> Result<u32, ()> {
+        Cursor::new(encoded).read_u32::<LittleEndian>().or(Err(()))
+    }
+
+    fn encode_u32_list(decoded: &[u32]) -> Vec<u8> {
+        // Pre-reserve required capacity as to avoid heap resizes (50% performance gain relative \
+        //   to initializing this with a zero-capacity)
+        let mut encoded = Vec::with_capacity(decoded.len() * 4);
+
+        for decoded_item in decoded {
+            encoded.extend(&Self::encode_u32(*decoded_item))
+        }
+
+        encoded
+    }
+
+    fn decode_u32_list(encoded: &[u8]) -> Result<Vec<u32>, ()> {
+        // Pre-reserve required capacity as to avoid heap resizes (50% performance gain relative \
+        //   to initializing this with a zero-capacity)
+        let mut decoded = Vec::with_capacity(encoded.len() / 4);
+
+        for encoded_chunk in encoded.chunks(4) {
+            if let Ok(decoded_chunk) = Self::decode_u32(encoded_chunk) {
+                decoded.push(decoded_chunk);
+            } else {
+                return Err(());
+            }
+        }
+
+        Ok(decoded)
+    }
+}
+
+impl StoreKVKey {
+    pub fn from_atom(collection_hash: StoreKVAtom) -> StoreKVKey {
+        StoreKVKey { collection_hash }
+    }
+
+    pub fn from_str(collection_str: &str) -> StoreKVKey {
+        StoreKVKey {
+            collection_hash: StoreKeyerHasher::to_compact(collection_str),
+        }
+    }
+}
+
+impl fmt::Display for StoreKVKey {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "<{:x?}>", self.collection_hash)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::ops::Deref;
+
+    use crate::{AppArgs, APP_ARGS};
+
+    use super::*;
+
+    #[test_log::test]
+    fn redb() {
+        // sonic_init("./config.cfg");
+
+        // let db = StoreKVPool::acquire(StoreKVAcquireMode::Any, "c:test:3")
+        //     .unwrap()
+        //     .unwrap()
+        //     .database;
+        // let write_txn = db.begin_write().unwrap();
+        // {
+        //     let mut table = write_txn.open_table(TABLE).unwrap();
+        //     let key: StoreKeyerKey = [0, 0, 0, 0, 0, 0, 0, 0, 0];
+        //     let k1: &[u8] = &key;
+        //     table.insert(k1, key.to_vec()).unwrap();
+        // }
+    }
+
+    fn sonic_init(config_path: &str) {
+        // Ensure all statics are valid (a `deref` is enough to lazily initialize them)
+        let app_args: &AppArgs = APP_ARGS.deref();
+        let p = app_args as *const AppArgs as *mut AppArgs;
+        unsafe {
+            (*p).config = config_path.to_string();
+        }
+
+        let _ = APP_CONF.deref();
+    }
+
+    #[test_log::test]
+    fn it_proceeds_actions() {
+        // init
+        sonic_init("./config.cfg");
+
+        let store = StoreKVPool::acquire(StoreKVAcquireMode::Any, "c:test:3").unwrap();
+        let action =
+            StoreKVActionBuilder::access(StoreItemPart::from_str("b:test:3").unwrap(), store);
+
+        assert!(action.get_meta_to_value(StoreMetaKey::IIDIncr).is_ok());
+        assert!(action
+            .set_meta_to_value(StoreMetaKey::IIDIncr, StoreMetaValue::IIDIncr(1))
+            .is_ok());
+
+        assert!(action.get_term_to_iids(1).is_ok());
+        assert!(action.set_term_to_iids(1, &[0, 1, 2]).is_ok());
+        assert!(action.delete_term_to_iids(1).is_ok());
+
+        assert!(action.get_oid_to_iid(&"s".to_string()).is_ok());
+        assert!(action.set_oid_to_iid(&"s".to_string(), 4).is_ok());
+        assert!(action.delete_oid_to_iid(&"s".to_string()).is_ok());
+
+        assert!(action.get_iid_to_oid(4).is_ok());
+        assert!(action.set_iid_to_oid(4, &"s".to_string()).is_ok());
+        assert!(action.delete_iid_to_oid(4).is_ok());
+
+        assert!(action.get_iid_to_terms(4).is_ok());
+        assert!(action.set_iid_to_terms(4, &[45402]).is_ok());
+        assert!(action.delete_iid_to_terms(4).is_ok());
+    }
+}
diff --git a/src/store/mod.rs b/src/store/mod.rs
index 008e1567..daa95a57 100644
--- a/src/store/mod.rs
+++ b/src/store/mod.rs
@@ -13,5 +13,10 @@ mod keyer;
 pub mod fst;
 pub mod identifiers;
 pub mod item;
-pub mod kv;
 pub mod operation;
+
+#[cfg(not(feature = "redb"))]
+pub mod kv;
+#[cfg(feature = "redb")]
+#[path ="kv_redb.rs"]
+pub mod kv;
\ No newline at end of file

From 0b006106bd967b698324021d251440b51446aa56 Mon Sep 17 00:00:00 2001
From: Lou Ting <louting.t@alibaba-inc.com>
Date: Tue, 19 Dec 2023 09:27:24 +0800
Subject: [PATCH 4/4] redb delete range

---
 Cargo.toml           |  2 +-
 src/store/kv_redb.rs | 32 ++++++++------------------------
 src/store/mod.rs     | 18 +++++++++++++-----
 3 files changed, 22 insertions(+), 30 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 430f48be..f873ed02 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -55,7 +55,7 @@ winapi = { version = "0.3", features = ["minwindef", "consoleapi"] }
 test-log = "0.2"
 
 [features]
-default = ["allocator-jemalloc", "tokenizer-chinese", "redb"]
+default = ["tokenizer-chinese", "redb"]
 allocator-jemalloc = ["tikv-jemallocator"]
 tokenizer-chinese = ["jieba-rs"]
 tokenizer-japanese = ["lindera-core", "lindera-dictionary", "lindera-tokenizer"]
diff --git a/src/store/kv_redb.rs b/src/store/kv_redb.rs
index 75787659..bea22550 100644
--- a/src/store/kv_redb.rs
+++ b/src/store/kv_redb.rs
@@ -88,20 +88,6 @@ pub enum RedbStoreKVError {
     TableError(#[from] redb::TableError),
 }
 
-struct StoreKeyerRange {
-    start: StoreKeyerKey,
-    end: StoreKeyerKey,
-}
-impl std::ops::RangeBounds<StoreKeyerKey> for StoreKeyerRange {
-    fn start_bound(&self) -> std::ops::Bound<&StoreKeyerKey> {
-        std::ops::Bound::Included(&self.start)
-    }
-
-    fn end_bound(&self) -> std::ops::Bound<&StoreKeyerKey> {
-        std::ops::Bound::Included(&self.end)
-    }
-}
-
 impl StoreKVPool {
     pub fn count() -> usize {
         STORE_POOL.read().unwrap().len()
@@ -339,10 +325,6 @@ impl StoreKVBuilder {
         let write_txn = db.begin_write()?;
         {
             let mut _table = write_txn.open_table(TABLE)?;
-            // let key: StoreKeyerKey = [0, 0, 0, 0, 0, 0, 0, 0, 0];
-            // let k1: &[u8] = &key;
-            // table.insert(k1, key.to_vec())?;
-            // table.remove(k1)?;
         }
         write_txn.commit()?;
 
@@ -435,12 +417,14 @@ impl StoreKV {
         key_end: StoreKeyerKey,
     ) -> Result<(), RedbStoreKVError> {
         let write_txn = self.database.begin_write()?;
-        let mut _table = write_txn.open_table(TABLE)?;
-        let _range = StoreKeyerRange {
-            start: key_start,
-            end: key_end,
-        };
-        // let v = table.drain(range)?;
+        {
+            let mut table = write_txn.open_table(TABLE)?;
+            let start: &[u8] = &key_start;
+            let end: &[u8] = &key_end;
+            let _v = table.drain::<&[u8]>((std::ops::Bound::Included(start), std::ops::Bound::Included(end)))?;
+        }
+        write_txn.commit()?;
+
         Ok(())
     }
 
diff --git a/src/store/mod.rs b/src/store/mod.rs
index daa95a57..b98daa30 100644
--- a/src/store/mod.rs
+++ b/src/store/mod.rs
@@ -4,6 +4,18 @@
 // Copyright: 2019, Valerian Saliou <valerian@valeriansaliou.name>
 // License: Mozilla Public License v2.0 (MPL v2.0)
 
+#[cfg(all(feature = "redb", feature = "rocksdb"))]
+compile_error!("Features `redb` and `rocksdb` are mutually exclusive and cannot be enabled together.");
+
+#[cfg(not(any(feature = "redb", feature = "rocksdb")))]
+compile_error!("Features `redb` or `rocksdb` should be enabled at least one.");
+
+#[cfg(feature = "rocksdb")]
+pub mod kv;
+#[cfg(feature = "redb")]
+#[path ="kv_redb.rs"]
+pub mod kv;
+
 #[macro_use]
 mod macros;
 
@@ -15,8 +27,4 @@ pub mod identifiers;
 pub mod item;
 pub mod operation;
 
-#[cfg(not(feature = "redb"))]
-pub mod kv;
-#[cfg(feature = "redb")]
-#[path ="kv_redb.rs"]
-pub mod kv;
\ No newline at end of file
+