Skip to content

Commit

Permalink
compression & basic benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Mar 13, 2024
1 parent 0f049a6 commit e23fe15
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 19 deletions.
11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ name = "value_log"
path = "src/lib.rs"

[features]
default = []
default = ["compression"]
compression = ["dep:lz4_flex"]
serde = ["dep:serde"]

[dependencies]
byteorder = "1.5.0"
chrono = "0.4.34"
crc32fast = "1.4.0"
log = "0.4.20"
lz4_flex = { version = "0.11.2", optional = true }
min-max-heap = "1.3.0"
quick_cache = "0.4.1"
rand = "0.8.5"
Expand All @@ -37,6 +39,13 @@ serde = { version = "1.0.197", default-features = false, features = [
], optional = true }

[dev-dependencies]
criterion = "0.5.1"
env_logger = "0.11.2"
tempfile = "3.10.0"
test-log = "0.2.15"

[[bench]]
name = "value_log"
harness = false
path = "benches/value_log.rs"
required-features = ["compression"]
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
# value-log

Generic value log implementation for key-value separated storage, inspired by [RocksDB's BlobDB](https://github.com/facebook/rocksdb/wiki/BlobDB) and implemented in safe, stable Rust.
Generic value log implementation for key-value separated storage, inspired by RocksDB's BlobDB [[1]](#footnotes) and implemented in safe, stable Rust.

> This crate is intended for key-value separated LSM storage.
> You probably want to use https://github.com/fjall-rs/fjall instead.
## Features

- Thread-safe API
- 100% safe & stable Rust
- Supports generic index structures (LSM-tree, ...)
- Built-in per-blob compression (LZ4)
- In-memory blob cache for hot data

Keys are limited to 65536 bytes, values are limited to 2^32 bytes.

## Stable disk format

The disk format will be stable from 1.0.0 (oh, the dreaded 1.0.0...) onwards. Any breaking change after that will result in a major bump.

## License

All source code is licensed under MIT OR Apache-2.0.

All contributions are to be licensed as MIT OR Apache-2.0.

## Footnotes

[1] https://github.com/facebook/rocksdb/wiki/BlobDB
237 changes: 237 additions & 0 deletions benches/value_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
use criterion::{criterion_group, criterion_main, Criterion};
use rand::RngCore;
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
};
use value_log::{BlobCache, Config, Index, ValueHandle, ValueLog};

#[derive(Default)]
pub struct DebugIndex(RwLock<BTreeMap<Arc<[u8]>, ValueHandle>>);

impl Index for DebugIndex {
fn get(&self, key: &[u8]) -> std::io::Result<Option<ValueHandle>> {
Ok(self.0.read().expect("lock is poisoned").get(key).cloned())
}

fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> {
self.0
.write()
.expect("lock is poisoned")
.insert(key.into(), value);

Ok(())
}
}

fn load_value(c: &mut Criterion) {
let mut group = c.benchmark_group("load blob");

let sizes = [
128, // 128 B
512, // 512 B
1_024, // 1 KiB
4_096, // 4 KiB
16_000, // 16 KiB
64_000, // 64 KiB
128_000, // 128 KiB
256_000, // 256 KiB
512_000, // 512 KiB
1_024 * 1_024, // 1 MiB
4 * 1_024 * 1_024, // 4 MiB
];

{
let index = DebugIndex(RwLock::new(BTreeMap::<Arc<[u8]>, ValueHandle>::default()));
let index = Arc::new(index);

let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::new(
vl_path,
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
index.clone(),
)
.unwrap();

let mut writer = value_log.get_writer().unwrap();
let segment_id = writer.segment_id();

let mut rng = rand::thread_rng();

for size in sizes {
let key = size.to_string();
let offset = writer.offset(key.as_bytes());

index
.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)
.unwrap();

let mut data = vec![0u8; size];
rng.fill_bytes(&mut data);

writer.write(key.as_bytes(), &data).unwrap();
}

value_log.register(writer).unwrap();

for size in sizes {
let key = size.to_string();
let handle = index.get(key.as_bytes()).unwrap().unwrap();

group.bench_function(format!("{size} bytes (uncached)"), |b| {
b.iter(|| {
value_log.get(&handle).unwrap().unwrap();
})
});
}
}

{
let index = DebugIndex(RwLock::new(BTreeMap::<Arc<[u8]>, ValueHandle>::default()));
let index = Arc::new(index);

let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::new(
vl_path,
Config::default()
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))),
index.clone(),
)
.unwrap();

let mut writer = value_log.get_writer().unwrap();
let segment_id = writer.segment_id();

let mut rng = rand::thread_rng();

for size in sizes {
let key = size.to_string();
let offset = writer.offset(key.as_bytes());

index
.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)
.unwrap();

let mut data = vec![0u8; size];
rng.fill_bytes(&mut data);

writer.write(key.as_bytes(), &data).unwrap();
}

value_log.register(writer).unwrap();

for size in sizes {
let key = size.to_string();
let handle = index.get(key.as_bytes()).unwrap().unwrap();

// NOTE: Warm up cache
value_log.get(&handle).unwrap().unwrap();

group.bench_function(format!("{size} bytes (cached)"), |b| {
b.iter(|| {
value_log.get(&handle).unwrap().unwrap();
})
});
}
}
}

fn compression(c: &mut Criterion) {
let mut group = c.benchmark_group("compression");

let index = DebugIndex(RwLock::new(BTreeMap::<Arc<[u8]>, ValueHandle>::default()));
let index = Arc::new(index);

let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::new(
vl_path,
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
index.clone(),
)
.unwrap();

let mut writer = value_log.get_writer().unwrap();
let segment_id = writer.segment_id();

let mut rng = rand::thread_rng();

let size_mb = 16;

{
let key = "random";
let offset = writer.offset(key.as_bytes());

index
.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)
.unwrap();

let mut data = vec![0u8; size_mb * 1_024 * 1_024];
rng.fill_bytes(&mut data);

writer.write(key.as_bytes(), &data).unwrap();
}

{
let key = "good_compression";
let offset = writer.offset(key.as_bytes());

index
.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)
.unwrap();

let dummy = b"abcdefgh";
let data = dummy.repeat(size_mb * 1_024 * 1_024 / dummy.len());

writer.write(key.as_bytes(), &data).unwrap();
}

value_log.register(writer).unwrap();

let handle_random = index.get(b"random").unwrap().unwrap();
let handle_good_compression = index.get(b"good_compression").unwrap().unwrap();

group.bench_function("no compression", |b| {
b.iter(|| {
value_log.get(&handle_random).unwrap().unwrap();
})
});

group.bench_function("good compression", |b| {
b.iter(|| {
value_log.get(&handle_good_compression).unwrap().unwrap();
})
});
}

criterion_group!(benches, load_value, compression);
criterion_main!(benches);
14 changes: 10 additions & 4 deletions src/blob_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@ impl BlobCache {
}
}

pub fn insert(&self, handle: CacheKey, value: Arc<[u8]>) {
self.data.insert(handle, value);
pub(crate) fn insert(&self, handle: CacheKey, value: Arc<[u8]>) {
if self.capacity > 0 {
self.data.insert(handle, value);
}
}

pub fn get(&self, handle: &CacheKey) -> Option<Item> {
self.data.get(handle)
pub(crate) fn get(&self, handle: &CacheKey) -> Option<Item> {
if self.capacity > 0 {
self.data.get(handle)
} else {
None
}
}

/// Returns the cache capacity in bytes
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Generic value log implementation for key-value separated storage.
//!
//! > This crate is intended for key-value separated LSM storage.
//! > This crate is intended as a building block for key-value separated LSM storage.
//! > You probably want to use <https://github.com/fjall-rs/fjall> instead.
//!
//! The value log's contents are split into segments, each segment holds a sorted
Expand Down Expand Up @@ -49,6 +49,7 @@ mod value_log;
mod version;

pub use {
blob_cache::BlobCache,
config::Config,
error::{Error, Result},
handle::ValueHandle,
Expand Down
Loading

0 comments on commit e23fe15

Please sign in to comment.