diff --git a/Cargo.toml b/Cargo.toml index 3bc0de7..23c0ffc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,8 @@ name = "value_log" path = "src/lib.rs" [features] -default = [] +default = ["compression"] +compression = ["dep:lz4_flex"] serde = ["dep:serde"] [dependencies] @@ -27,6 +28,7 @@ byteorder = "1.5.0" chrono = "0.4.34" crc32fast = "1.4.0" log = "0.4.20" +lz4_flex = { version = "0.11.2", optional = true } min-max-heap = "1.3.0" quick_cache = "0.4.1" rand = "0.8.5" @@ -37,6 +39,13 @@ serde = { version = "1.0.197", default-features = false, features = [ ], optional = true } [dev-dependencies] +criterion = "0.5.1" env_logger = "0.11.2" tempfile = "3.10.0" test-log = "0.2.15" + +[[bench]] +name = "value_log" +harness = false +path = "benches/value_log.rs" +required-features = ["compression"] diff --git a/README.md b/README.md index e4d600d..8fafc1c 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,30 @@ # value-log -Generic value log implementation for key-value separated storage, inspired by [RocksDB's BlobDB](https://github.com/facebook/rocksdb/wiki/BlobDB) and implemented in safe, stable Rust. +Generic value log implementation for key-value separated storage, inspired by RocksDB's BlobDB [[1]](#footnotes) and implemented in safe, stable Rust. > This crate is intended for key-value separated LSM storage. > You probably want to use https://github.com/fjall-rs/fjall instead. + +## Features + +- Thread-safe API +- 100% safe & stable Rust +- Supports generic index structures (LSM-tree, ...) +- Built-in per-blob compression (LZ4) +- In-memory blob cache for hot data + +Keys are limited to 65536 bytes, values are limited to 2^32 bytes. + +## Stable disk format + +The disk format will be stable from 1.0.0 (oh, the dreaded 1.0.0...) onwards. Any breaking change after that will result in a major bump. + +## License + +All source code is licensed under MIT OR Apache-2.0. + +All contributions are to be licensed as MIT OR Apache-2.0. + +## Footnotes + +[1] https://github.com/facebook/rocksdb/wiki/BlobDB diff --git a/benches/value_log.rs b/benches/value_log.rs new file mode 100644 index 0000000..5a64781 --- /dev/null +++ b/benches/value_log.rs @@ -0,0 +1,237 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::RngCore; +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; +use value_log::{BlobCache, Config, Index, ValueHandle, ValueLog}; + +#[derive(Default)] +pub struct DebugIndex(RwLock, ValueHandle>>); + +impl Index for DebugIndex { + fn get(&self, key: &[u8]) -> std::io::Result> { + Ok(self.0.read().expect("lock is poisoned").get(key).cloned()) + } + + fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()> { + self.0 + .write() + .expect("lock is poisoned") + .insert(key.into(), value); + + Ok(()) + } +} + +fn load_value(c: &mut Criterion) { + let mut group = c.benchmark_group("load blob"); + + let sizes = [ + 128, // 128 B + 512, // 512 B + 1_024, // 1 KiB + 4_096, // 4 KiB + 16_000, // 16 KiB + 64_000, // 64 KiB + 128_000, // 128 KiB + 256_000, // 256 KiB + 512_000, // 512 KiB + 1_024 * 1_024, // 1 MiB + 4 * 1_024 * 1_024, // 4 MiB + ]; + + { + let index = DebugIndex(RwLock::new(BTreeMap::, ValueHandle>::default())); + let index = Arc::new(index); + + let folder = tempfile::tempdir().unwrap(); + let vl_path = folder.path(); + + let value_log = ValueLog::new( + vl_path, + Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))), + index.clone(), + ) + .unwrap(); + + let mut writer = value_log.get_writer().unwrap(); + let segment_id = writer.segment_id(); + + let mut rng = rand::thread_rng(); + + for size in sizes { + let key = size.to_string(); + let offset = writer.offset(key.as_bytes()); + + index + .insert_indirection( + key.as_bytes(), + ValueHandle { + offset, + segment_id: segment_id.clone(), + }, + ) + .unwrap(); + + let mut data = vec![0u8; size]; + rng.fill_bytes(&mut data); + + writer.write(key.as_bytes(), &data).unwrap(); + } + + value_log.register(writer).unwrap(); + + for size in sizes { + let key = size.to_string(); + let handle = index.get(key.as_bytes()).unwrap().unwrap(); + + group.bench_function(format!("{size} bytes (uncached)"), |b| { + b.iter(|| { + value_log.get(&handle).unwrap().unwrap(); + }) + }); + } + } + + { + let index = DebugIndex(RwLock::new(BTreeMap::, ValueHandle>::default())); + let index = Arc::new(index); + + let folder = tempfile::tempdir().unwrap(); + let vl_path = folder.path(); + + let value_log = ValueLog::new( + vl_path, + Config::default() + .blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))), + index.clone(), + ) + .unwrap(); + + let mut writer = value_log.get_writer().unwrap(); + let segment_id = writer.segment_id(); + + let mut rng = rand::thread_rng(); + + for size in sizes { + let key = size.to_string(); + let offset = writer.offset(key.as_bytes()); + + index + .insert_indirection( + key.as_bytes(), + ValueHandle { + offset, + segment_id: segment_id.clone(), + }, + ) + .unwrap(); + + let mut data = vec![0u8; size]; + rng.fill_bytes(&mut data); + + writer.write(key.as_bytes(), &data).unwrap(); + } + + value_log.register(writer).unwrap(); + + for size in sizes { + let key = size.to_string(); + let handle = index.get(key.as_bytes()).unwrap().unwrap(); + + // NOTE: Warm up cache + value_log.get(&handle).unwrap().unwrap(); + + group.bench_function(format!("{size} bytes (cached)"), |b| { + b.iter(|| { + value_log.get(&handle).unwrap().unwrap(); + }) + }); + } + } +} + +fn compression(c: &mut Criterion) { + let mut group = c.benchmark_group("compression"); + + let index = DebugIndex(RwLock::new(BTreeMap::, ValueHandle>::default())); + let index = Arc::new(index); + + let folder = tempfile::tempdir().unwrap(); + let vl_path = folder.path(); + + let value_log = ValueLog::new( + vl_path, + Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))), + index.clone(), + ) + .unwrap(); + + let mut writer = value_log.get_writer().unwrap(); + let segment_id = writer.segment_id(); + + let mut rng = rand::thread_rng(); + + let size_mb = 16; + + { + let key = "random"; + let offset = writer.offset(key.as_bytes()); + + index + .insert_indirection( + key.as_bytes(), + ValueHandle { + offset, + segment_id: segment_id.clone(), + }, + ) + .unwrap(); + + let mut data = vec![0u8; size_mb * 1_024 * 1_024]; + rng.fill_bytes(&mut data); + + writer.write(key.as_bytes(), &data).unwrap(); + } + + { + let key = "good_compression"; + let offset = writer.offset(key.as_bytes()); + + index + .insert_indirection( + key.as_bytes(), + ValueHandle { + offset, + segment_id: segment_id.clone(), + }, + ) + .unwrap(); + + let dummy = b"abcdefgh"; + let data = dummy.repeat(size_mb * 1_024 * 1_024 / dummy.len()); + + writer.write(key.as_bytes(), &data).unwrap(); + } + + value_log.register(writer).unwrap(); + + let handle_random = index.get(b"random").unwrap().unwrap(); + let handle_good_compression = index.get(b"good_compression").unwrap().unwrap(); + + group.bench_function("no compression", |b| { + b.iter(|| { + value_log.get(&handle_random).unwrap().unwrap(); + }) + }); + + group.bench_function("good compression", |b| { + b.iter(|| { + value_log.get(&handle_good_compression).unwrap().unwrap(); + }) + }); +} + +criterion_group!(benches, load_value, compression); +criterion_main!(benches); diff --git a/src/blob_cache.rs b/src/blob_cache.rs index 02974e6..a339222 100644 --- a/src/blob_cache.rs +++ b/src/blob_cache.rs @@ -41,12 +41,18 @@ impl BlobCache { } } - pub fn insert(&self, handle: CacheKey, value: Arc<[u8]>) { - self.data.insert(handle, value); + pub(crate) fn insert(&self, handle: CacheKey, value: Arc<[u8]>) { + if self.capacity > 0 { + self.data.insert(handle, value); + } } - pub fn get(&self, handle: &CacheKey) -> Option { - self.data.get(handle) + pub(crate) fn get(&self, handle: &CacheKey) -> Option { + if self.capacity > 0 { + self.data.get(handle) + } else { + None + } } /// Returns the cache capacity in bytes diff --git a/src/lib.rs b/src/lib.rs index 93ffa91..bee7734 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ //! Generic value log implementation for key-value separated storage. //! -//! > This crate is intended for key-value separated LSM storage. +//! > This crate is intended as a building block for key-value separated LSM storage. //! > You probably want to use instead. //! //! The value log's contents are split into segments, each segment holds a sorted @@ -49,6 +49,7 @@ mod value_log; mod version; pub use { + blob_cache::BlobCache, config::Config, error::{Error, Result}, handle::ValueHandle, diff --git a/src/main.rs b/src/main.rs index 801022d..6eb91d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,6 +69,50 @@ fn main() -> value_log::Result<()> { value_log.register(writer)?; } + { + let mut writer = value_log.get_writer()?; + let segment_id = writer.segment_id(); + + let key = "html"; + let offset = writer.offset(key.as_bytes()); + + index.insert_indirection( + key.as_bytes(), + ValueHandle { + offset, + segment_id: segment_id.clone(), + }, + )?; + + writer.write( + key.as_bytes(), + b" + +
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+ ", + )?; + + value_log.register(writer)?; + } + /* { let mut writer = value_log.get_writer()?; let segment_id = writer.segment_id(); @@ -199,6 +243,12 @@ fn main() -> value_log::Result<()> { eprintln!("{:#?}", value_log.segments.read().unwrap()); + let handle = index.get(b"html")?.unwrap(); + eprintln!( + "{}", + String::from_utf8_lossy(&value_log.get(&handle)?.unwrap()) + ); + for _ in 0..10 { let value_handle = ValueHandle { segment_id: value_log.list_segments().first().unwrap().clone(), diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index 3ecdd8f..e7dc51e 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -78,18 +78,18 @@ impl MultiWriter { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn write(&mut self, key: &[u8], value: &[u8]) -> crate::Result<()> { + pub fn write(&mut self, key: &[u8], value: &[u8]) -> crate::Result { let target_size = self.target_size; let writer = self.get_active_writer_mut(); - writer.write(key, value)?; + let bytes_written = writer.write(key, value)?; if writer.offset() >= target_size { writer.flush()?; self.rotate()?; } - Ok(()) + Ok(bytes_written) } pub(crate) fn finish(mut self) -> crate::Result> { diff --git a/src/segment/writer.rs b/src/segment/writer.rs index 985a15f..72822fb 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -80,16 +80,19 @@ impl Writer { /// # Errors /// /// Will return `Err` if an IO error occurs. - pub fn write(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> { + pub fn write(&mut self, key: &[u8], value: &[u8]) -> std::io::Result { + #[cfg(feature = "compression")] + let value = lz4_flex::compress_prepend_size(value); + let mut hasher = crc32fast::Hasher::new(); - hasher.update(value); + hasher.update(&value); let crc = hasher.finalize(); self.inner.write_u16::(key.len() as u16)?; self.inner.write_all(key)?; self.inner.write_u32::(crc)?; self.inner.write_u32::(value.len() as u32)?; - self.inner.write_all(value)?; + self.inner.write_all(&value)?; // Key self.offset += std::mem::size_of::() as u64; @@ -98,15 +101,13 @@ impl Writer { // CRC self.offset += std::mem::size_of::() as u64; - // TODO: compress - // Value self.offset += std::mem::size_of::() as u64; self.offset += value.len() as u64; self.item_count += 1; - Ok(()) + Ok(value.len() as u32) } pub(crate) fn flush(&mut self) -> std::io::Result<()> { diff --git a/src/value_log.rs b/src/value_log.rs index fd5984b..bd560fb 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -157,16 +157,19 @@ impl ValueLog { let mut reader = BufReader::new(File::open(&segment.path)?); reader.seek(std::io::SeekFrom::Start(handle.offset))?; - // TODO: handle CRC let _crc = reader.read_u32::()?; let val_len = reader.read_u32::()?; let mut val = vec![0; val_len as usize]; reader.read_exact(&mut val)?; - let val: Arc<[u8]> = val.into(); - // TODO: decompress + #[cfg(feature = "compression")] + let val = lz4_flex::decompress_size_prepended(&val).expect("should decompress"); + + // TODO: handle CRC + + let val: Arc<[u8]> = val.into(); self.blob_cache.insert(handle.clone(), val.clone());