From 4c5d905944ad0845a6e93c5861e3b33efc251f8f Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Tue, 19 Nov 2024 14:43:16 +0800 Subject: [PATCH] refactor: use parquet lru reader as trait object --- bindings/python/src/db.rs | 2 +- examples/datafusion.rs | 8 ++-- parquet-lru/Cargo.toml | 3 +- parquet-lru/src/foyer.rs | 25 ++++------ parquet-lru/src/lib.rs | 77 ++++++++++++++---------------- src/compaction/mod.rs | 42 +++++++---------- src/lib.rs | 90 +++++++++++++++++------------------- src/ondisk/sstable.rs | 28 ++++++----- src/snapshot.rs | 19 ++++---- src/stream/level.rs | 30 ++++++------ src/stream/mem_projection.rs | 22 +++------ src/stream/merge.rs | 43 +++++++---------- src/stream/mod.rs | 32 +++++-------- src/stream/package.rs | 19 +++----- src/transaction.rs | 20 ++++---- src/version/mod.rs | 34 +++++--------- tests/data_integrity.rs | 3 +- 17 files changed, 206 insertions(+), 291 deletions(-) diff --git a/bindings/python/src/db.rs b/bindings/python/src/db.rs index f788d5c8..99d3d4da 100644 --- a/bindings/python/src/db.rs +++ b/bindings/python/src/db.rs @@ -30,7 +30,7 @@ type PyExecutor = TokioExecutor; pub struct TonboDB { desc: Arc>, primary_key_index: usize, - db: Arc>>, + db: Arc>, } #[pymethods] diff --git a/examples/datafusion.rs b/examples/datafusion.rs index d320de4e..cd171416 100644 --- a/examples/datafusion.rs +++ b/examples/datafusion.rs @@ -26,7 +26,7 @@ use datafusion::{ use fusio::path::Path; use futures_core::Stream; use futures_util::StreamExt; -use parquet_lru::NoopCache; +use parquet_lru::NoCache; use tokio::fs; use tonbo::{ executor::tokio::TokioExecutor, fs::FileId, inmem::immutable::ArrowArrays, record::Record, @@ -43,12 +43,12 @@ pub struct Music { } struct MusicProvider { - db: Arc>>, + db: Arc>>, } struct MusicExec { cache: PlanProperties, - db: Arc>>, + db: Arc>>, projection: Option>, limit: Option, range: (Bound<::Key>, Bound<::Key>), @@ -98,7 +98,7 @@ impl TableProvider for MusicProvider { impl MusicExec { fn new( - db: Arc>>, + db: Arc>>, projection: Option<&Vec>, ) -> Self { let schema = Music::arrow_schema(); diff --git a/parquet-lru/Cargo.toml b/parquet-lru/Cargo.toml index 81d852cf..32451e60 100644 --- a/parquet-lru/Cargo.toml +++ b/parquet-lru/Cargo.toml @@ -7,8 +7,8 @@ name = "parquet-lru" version = "0.1.0" [features] +default = ["foyer"] foyer = ["dep:foyer", "dep:serde"] -full = ["foyer"] [dependencies] bytes = { version = "1.8.0", features = ["serde"] } @@ -17,4 +17,3 @@ futures-core = "0.3.31" futures-util = "0.3.31" parquet = { version = "53.2.0", features = ["async"] } serde = { version = "1.0.214", optional = true } -thiserror = "2.0.3" diff --git a/parquet-lru/src/foyer.rs b/parquet-lru/src/foyer.rs index 1dc6d082..566922f6 100644 --- a/parquet-lru/src/foyer.rs +++ b/parquet-lru/src/foyer.rs @@ -10,7 +10,7 @@ use parquet::{ }; use serde::{Deserialize, Serialize}; -use crate::{Error, LruCache, Options}; +use crate::LruCache; #[derive(Clone)] pub struct FoyerCache @@ -32,23 +32,14 @@ impl LruCache for FoyerCache where for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static, { - type LruReader = FoyerReader; + type LruReader = FoyerReader + where + R: AsyncFileReader + 'static; - async fn new(options: Options) -> Result { - Ok(Self { - inner: Arc::new(FoyerCacheInner { - meta: foyer::CacheBuilder::new(options.meta_capacity).build(), - data: foyer::HybridCacheBuilder::new() - .memory(options.data_capacity) - .storage(foyer::Engine::Large) - .build() - .await - .map_err(|e| Error::External(e.into()))?, - }), - }) - } - - async fn get_reader(&self, key: K, reader: R) -> FoyerReader { + async fn get_reader(&self, key: K, reader: R) -> FoyerReader + where + R: AsyncFileReader, + { FoyerReader::new(self.clone(), key, reader) } } diff --git a/parquet-lru/src/lib.rs b/parquet-lru/src/lib.rs index 00b38b7b..182a0724 100644 --- a/parquet-lru/src/lib.rs +++ b/parquet-lru/src/lib.rs @@ -1,63 +1,56 @@ +mod r#dyn; #[cfg(feature = "foyer")] pub mod foyer; use std::{future::Future, marker::PhantomData}; -use parquet::{arrow::async_reader::AsyncFileReader, errors::Result}; -use thiserror::Error; +use parquet::arrow::async_reader::AsyncFileReader; -#[derive(Default)] -pub struct Options { - meta_capacity: usize, - data_capacity: usize, -} - -impl Options { - pub fn meta_capacity(mut self, meta_capacity: usize) -> Self { - self.meta_capacity = meta_capacity; - self - } - - pub fn data_capacity(mut self, data_capacity: usize) -> Self { - self.data_capacity = data_capacity; - self - } -} +pub use crate::r#dyn::*; -pub trait LruCache: Clone + Send + Sync + 'static { - type LruReader: AsyncFileReader + 'static; - - fn new(options: Options) -> impl Future> + Send; +pub trait LruCache +where + K: 'static, +{ + type LruReader: AsyncFileReader + 'static + where + R: AsyncFileReader + 'static; fn get_reader(&self, key: K, reader: R) -> impl Future> + Send where R: AsyncFileReader + 'static; } -#[derive(Clone, Default)] -pub struct NoopCache { +#[derive(Default)] +pub struct NoCache { _phantom: PhantomData, } -impl LruCache for NoopCache -where - K: Send + Sync + Clone + 'static, -{ - type LruReader = R; - - async fn new(_options: Options) -> Result { - Ok(Self { +impl Clone for NoCache { + fn clone(&self) -> Self { + Self { _phantom: PhantomData, - }) - } - - async fn get_reader(&self, _key: K, reader: R) -> R { - reader + } } } -#[derive(Debug, Error)] -pub enum Error { - #[error("External lru implementation error: {0}")] - External(#[from] Box), +unsafe impl Send for NoCache {} + +unsafe impl Sync for NoCache {} + +impl LruCache for NoCache +where + K: 'static, +{ + type LruReader = R + where + R: AsyncFileReader + 'static; + + #[allow(clippy::manual_async_fn)] + fn get_reader(&self, _key: K, reader: R) -> impl Future + Send + where + R: AsyncFileReader, + { + async move { reader } + } } diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index c9b2738c..13e077c8 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -5,10 +5,10 @@ use fusio::DynFs; use fusio_parquet::writer::AsyncWriter; use futures_util::StreamExt; use parquet::arrow::{AsyncArrowWriter, ProjectionMask}; -use parquet_lru::LruCache; use thiserror::Error; use tokio::sync::oneshot; +use crate::ParquetLru; use crate::{ fs::{manager::StoreManager, FileId, FileType}, inmem::{ @@ -60,13 +60,10 @@ where } } - pub(crate) async fn check_then_compaction( + pub(crate) async fn check_then_compaction( &mut self, - parquet_lru_cache: C, - ) -> Result<(), CompactionError> - where - C: LruCache + Unpin, - { + parquet_lru_cache: ParquetLru, + ) -> Result<(), CompactionError> { let mut guard = self.schema.write().await; guard.trigger.reset(); @@ -114,7 +111,7 @@ where &mut delete_gens, &guard.record_instance, &self.manager, - &parquet_lru_cache, + parquet_lru_cache, ) .await?; } @@ -193,7 +190,7 @@ where } #[allow(clippy::too_many_arguments)] - pub(crate) async fn major_compaction( + pub(crate) async fn major_compaction( version: &Version, option: &DbOption, mut min: &R::Key, @@ -202,11 +199,8 @@ where delete_gens: &mut Vec<(FileId, usize)>, instance: &RecordInstance, manager: &StoreManager, - parquet_cache: &C, - ) -> Result<(), CompactionError> - where - C: LruCache + Unpin, - { + parquet_cache: ParquetLru, + ) -> Result<(), CompactionError> { let mut level = 0; while level < MAX_LEVEL - 2 { @@ -391,18 +385,15 @@ where (meet_scopes_l, start_l, end_l - 1) } - async fn build_tables<'scan, C>( + async fn build_tables<'scan>( option: &DbOption, version_edits: &mut Vec::Key>>, level: usize, - streams: Vec>, + streams: Vec>, instance: &RecordInstance, fs: &Arc, - ) -> Result<(), CompactionError> - where - C: LruCache + Unpin, - { - let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; + ) -> Result<(), CompactionError> { + let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; // Kould: is the capacity parameter necessary? let mut builder = R::Columns::builder(&instance.arrow_schema::(), 8192); @@ -529,8 +520,9 @@ pub(crate) mod tests { use fusio_dispatch::FsOptions; use fusio_parquet::writer::AsyncWriter; use parquet::arrow::AsyncArrowWriter; - use parquet_lru::NoopCache; + use parquet_lru::{DynLruCache, NoCache}; use tempfile::TempDir; + use ulid::Ulid; use crate::{ compaction::Compactor, @@ -827,7 +819,7 @@ pub(crate) mod tests { &mut vec![], &RecordInstance::Normal, &manager, - &NoopCache::default(), + Arc::new(NoCache::default()), ) .await .unwrap(); @@ -1219,7 +1211,7 @@ pub(crate) mod tests { &mut vec![], &RecordInstance::Normal, &manager, - &NoopCache::default(), + Arc::new(NoCache::default()), ) .await .unwrap(); @@ -1240,7 +1232,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for i in 5..9 { let item = Test { diff --git a/src/lib.rs b/src/lib.rs index 93dd4db5..bd58f054 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,7 +148,7 @@ use parquet::{ arrow::{arrow_to_parquet_schema, ProjectionMask}, errors::ParquetError, }; -use parquet_lru::{LruCache, NoopCache}; +use parquet_lru::{DynLruCache, NoCache}; use record::{ColumnDesc, DynRecord, Record, RecordInstance}; use thiserror::Error; use timestamp::{Timestamp, TimestampedRef}; @@ -175,21 +175,20 @@ use crate::{ wal::{log::LogType, RecoverError, WalFile}, }; -pub struct DB +pub struct DB where R: Record, E: Executor, - C: LruCache, { schema: Arc>>, version_set: VersionSet, lock_map: LockMap, manager: Arc, - parquet_lru_cache: C, + parquet_lru_cache: ParquetLru, _p: PhantomData, } -impl DB> +impl DB where E: Executor + Send + Sync + 'static, { @@ -205,11 +204,11 @@ where let instance = RecordInstance::Runtime(DynRecord::empty_record(column_descs, primary_index)); - Self::build(option, executor, instance, Default::default()).await + Self::build(option, executor, instance, Arc::new(NoCache::default())).await } } -impl DB> +impl DB where R: Record + Send + Sync, R::Columns: Send + Sync, @@ -225,24 +224,23 @@ where Arc::new(option), executor, RecordInstance::Normal, - Default::default(), + Arc::new(NoCache::default()), ) .await } } -impl DB +impl DB where R: Record + Send + Sync, R::Columns: Send + Sync, E: Executor + Send + Sync + 'static, - C: LruCache + Unpin, { async fn build( option: Arc>, executor: E, instance: RecordInstance, - lru_cache: C, + lru_cache: ParquetLru, ) -> Result> { let manager = Arc::new(StoreManager::new( option.base_fs.clone(), @@ -318,11 +316,11 @@ where } /// open an optimistic ACID transaction - pub async fn transaction(&self) -> Transaction<'_, R, C> { + pub async fn transaction(&self) -> Transaction<'_, R> { Transaction::new(self.snapshot().await, self.lock_map.clone()) } - pub async fn snapshot(&self) -> Snapshot<'_, R, C> { + pub async fn snapshot(&self) -> Snapshot<'_, R> { Snapshot::new( self.schema.read().await, self.version_set.current().await, @@ -592,18 +590,15 @@ where self.mutable.append(None, key, ts, value).await } - async fn get<'get, C>( + async fn get<'get>( &'get self, version: &'get Version, manager: &StoreManager, key: &'get R::Key, ts: Timestamp, projection: Projection, - parquet_cache: C, - ) -> Result>, DbError> - where - C: LruCache, - { + parquet_cache: ParquetLru, + ) -> Result>, DbError> { if let Some(entry) = self.mutable.get(key, ts) { return Ok(Some(Entry::Mutable(entry))); } @@ -658,10 +653,9 @@ where } /// scan configuration intermediate structure -pub struct Scan<'scan, 'range, R, C> +pub struct Scan<'scan, 'range, R> where R: Record, - C: LruCache, 'range: 'scan, { schema: &'scan Schema, @@ -672,19 +666,18 @@ where version: &'scan Version, fn_pre_stream: - Box) -> Option> + Send + 'scan>, + Box) -> Option> + Send + 'scan>, limit: Option, projection_indices: Option>, projection: ProjectionMask, - parquet_cache: C, + parquet_cache: ParquetLru, } -impl<'scan, 'range, R, C> Scan<'scan, 'range, R, C> +impl<'scan, 'range, R> Scan<'scan, 'range, R> where R: Record + Send, - C: LruCache + Unpin, { fn new( schema: &'scan Schema, @@ -693,9 +686,9 @@ where ts: Timestamp, version: &'scan Version, fn_pre_stream: Box< - dyn FnOnce(Option) -> Option> + Send + 'scan, + dyn FnOnce(Option) -> Option> + Send + 'scan, >, - parquet_cache: C, + parquet_cache: ParquetLru, ) -> Self { Self { schema, @@ -883,6 +876,8 @@ pub enum Projection { Parts(Vec), } +pub type ParquetLru = Arc + Send + Sync>; + #[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { use std::{ @@ -902,10 +897,9 @@ pub(crate) mod tests { use futures::StreamExt; use once_cell::sync::Lazy; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; - use parquet_lru::NoopCache; + use parquet_lru::NoCache; use tempfile::TempDir; use tracing::error; - use ulid::Ulid; use crate::{ compaction::{CompactTask, CompactionError, Compactor}, @@ -1143,7 +1137,7 @@ pub(crate) mod tests { option: DbOption, executor: E, ) -> RecordBatch { - let db: DB = DB::new(option.clone(), executor).await.unwrap(); + let db: DB = DB::new(option.clone(), executor).await.unwrap(); let base_fs = db.manager.base_fs(); db.write( @@ -1295,7 +1289,7 @@ pub(crate) mod tests { schema: crate::Schema, version: Version, manager: Arc, - ) -> Result>, DbError> + ) -> Result, DbError> where R: Record + Send + Sync, R::Columns: Send + Sync, @@ -1329,11 +1323,14 @@ pub(crate) mod tests { while let Ok(task) = compaction_rx.recv_async().await { if let Err(err) = match task { CompactTask::Freeze => { - compactor.check_then_compaction(NoopCache::default()).await + compactor + .check_then_compaction(Arc::new(NoCache::default())) + .await } CompactTask::Flush(option_tx) => { - let mut result = - compactor.check_then_compaction(NoopCache::default()).await; + let mut result = compactor + .check_then_compaction(Arc::new(NoCache::default())) + .await; if let Some(tx) = option_tx { let channel_result = tx.send(()).map_err(|_| CompactionError::ChannelClose); @@ -1354,7 +1351,7 @@ pub(crate) mod tests { version_set, lock_map: Arc::new(Default::default()), manager, - parquet_lru_cache: Default::default(), + parquet_lru_cache: Arc::new(NoCache::default()), _p: Default::default(), }) } @@ -1583,7 +1580,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for (i, item) in test_items().into_iter().enumerate() { db.write(item, 0.into()).await.unwrap(); @@ -1619,7 +1616,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for item in &test_items()[0..10] { db.write(item.clone(), 0.into()).await.unwrap(); @@ -1668,10 +1665,9 @@ pub(crate) mod tests { schema.flush_wal().await.unwrap(); drop(schema); - let db: DB = - DB::new(option.as_ref().to_owned(), TokioExecutor::new()) - .await - .unwrap(); + let db: DB = DB::new(option.as_ref().to_owned(), TokioExecutor::new()) + .await + .unwrap(); let mut sort_items = BTreeMap::new(); for item in test_items() { @@ -1741,7 +1737,7 @@ pub(crate) mod tests { "id".to_owned(), primary_key_index, ); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index) .await .unwrap(); @@ -1781,7 +1777,7 @@ pub(crate) mod tests { option.major_threshold_with_sst_size = 3; option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for (idx, item) in test_items().into_iter().enumerate() { if idx % 2 == 0 { @@ -1823,7 +1819,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); @@ -2053,7 +2049,7 @@ pub(crate) mod tests { option3.major_default_oldest_table_num = 1; option3.trigger_type = TriggerType::Length(5); - let db1: DB = DB::with_schema( + let db1: DB = DB::with_schema( option, TokioExecutor::new(), cols_desc.clone(), @@ -2061,7 +2057,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db2: DB = DB::with_schema( + let db2: DB = DB::with_schema( option2, TokioExecutor::new(), cols_desc.clone(), @@ -2069,7 +2065,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db3: DB = + let db3: DB = DB::with_schema(option3, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index ca03d92a..836ef8d0 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{marker::PhantomData, ops::Bound}; use fusio::{dynamic::DynFile, DynRead}; @@ -11,7 +12,7 @@ use parquet::{ }, errors::Result as ParquetResult, }; -use parquet_lru::LruCache; +use parquet_lru::{BoxedFileReader, DynLruCache}; use ulid::Ulid; use super::{arrows::get_range_filter, scan::SsTableScan}; @@ -21,22 +22,20 @@ use crate::{ timestamp::{Timestamp, TimestampedRef}, }; -pub(crate) struct SsTable +pub(crate) struct SsTable where R: Record, - C: LruCache, { - reader: C::LruReader, + reader: BoxedFileReader, _marker: PhantomData, } -impl SsTable +impl SsTable where R: Record, - C: LruCache, { pub(crate) async fn open( - lru_cache: C, + lru_cache: Arc + Send + Sync>, id: Ulid, file: Box, ) -> Result { @@ -44,7 +43,10 @@ where Ok(SsTable { reader: lru_cache - .get_reader(id, AsyncReader::new(file, size).await?) + .get_reader( + id, + BoxedFileReader::new(AsyncReader::new(file, size).await?), + ) .await, _marker: PhantomData, }) @@ -127,8 +129,7 @@ pub(crate) mod tests { basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; - use parquet_lru::NoopCache; - use ulid::Ulid; + use parquet_lru::NoCache; use super::SsTable; use crate::{ @@ -167,15 +168,12 @@ pub(crate) mod tests { Ok(()) } - pub(crate) async fn open_sstable( - store: &Arc, - path: &Path, - ) -> SsTable> + pub(crate) async fn open_sstable(store: &Arc, path: &Path) -> SsTable where R: Record, { SsTable::open( - Default::default(), + Arc::new(NoCache::default()), Default::default(), store .open_options(path, FileType::Parquet.open_options(true)) diff --git a/src/snapshot.rs b/src/snapshot.rs index 47d4984c..07a3918b 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -2,10 +2,10 @@ use std::{collections::Bound, sync::Arc}; use async_lock::RwLockReadGuard; use parquet::arrow::ProjectionMask; -use parquet_lru::LruCache; +use crate::ParquetLru; use crate::{ - fs::{manager::StoreManager, FileId}, + fs::manager::StoreManager, record::Record, stream, stream::ScanStream, @@ -14,7 +14,7 @@ use crate::{ DbError, Projection, Scan, Schema, }; -pub struct Snapshot<'s, R, C> +pub struct Snapshot<'s, R> where R: Record, { @@ -22,13 +22,12 @@ where share: RwLockReadGuard<'s, Schema>, version: VersionRef, manager: Arc, - parquet_cache: C, + parquet_cache: ParquetLru, } -impl<'s, R, C> Snapshot<'s, R, C> +impl<'s, R> Snapshot<'s, R> where R: Record, - C: LruCache + Unpin, { pub async fn get<'get>( &'get self, @@ -58,7 +57,7 @@ where pub fn scan<'scan, 'range>( &'scan self, range: (Bound<&'range R::Key>, Bound<&'range R::Key>), - ) -> Scan<'scan, 'range, R, C> { + ) -> Scan<'scan, 'range, R> { Scan::new( &self.share, &self.manager, @@ -74,7 +73,7 @@ where share: RwLockReadGuard<'s, Schema>, version: VersionRef, manager: Arc, - parquet_cache: C, + parquet_cache: ParquetLru, ) -> Self { Self { ts: version.load_ts(), @@ -101,9 +100,9 @@ where &'scan self, range: (Bound<&'range R::Key>, Bound<&'range R::Key>), fn_pre_stream: Box< - dyn FnOnce(Option) -> Option> + Send + 'scan, + dyn FnOnce(Option) -> Option> + Send + 'scan, >, - ) -> Scan<'scan, 'range, R, C> { + ) -> Scan<'scan, 'range, R> { Scan::new( &self.share, &self.manager, diff --git a/src/stream/level.rs b/src/stream/level.rs index 06a437c7..a60bc89f 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -13,7 +13,7 @@ use fusio::{ }; use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; -use parquet_lru::LruCache; +use parquet_lru::DynLruCache; use ulid::Ulid; use crate::{ @@ -27,10 +27,9 @@ use crate::{ DbOption, }; -enum FutureStatus<'level, R, C> +enum FutureStatus<'level, R> where R: Record, - C: LruCache, { Init(FileId), Ready(SsTableScan<'level, R>), @@ -38,16 +37,15 @@ where Ulid, Pin, Error>> + 'level>>, ), - OpenSst(Pin, Error>> + 'level>>), + OpenSst(Pin, Error>> + 'level>>), LoadStream( Pin, ParquetError>> + Send + 'level>>, ), } -pub(crate) struct LevelStream<'level, R, C> +pub(crate) struct LevelStream<'level, R> where R: Record, - C: LruCache, { lower: Bound<&'level R::Key>, upper: Bound<&'level R::Key>, @@ -57,16 +55,15 @@ where gens: VecDeque, limit: Option, projection_mask: ProjectionMask, - status: FutureStatus<'level, R, C>, + status: FutureStatus<'level, R>, fs: Arc, path: Option, - parquet_cache: C, + parquet_cache: Arc + Send + Sync>, } -impl<'level, R, C> LevelStream<'level, R, C> +impl<'level, R> LevelStream<'level, R> where R: Record, - C: LruCache, { // Kould: only used by Compaction now, and the start and end of the sstables range are known #[allow(clippy::too_many_arguments)] @@ -80,7 +77,7 @@ where limit: Option, projection_mask: ProjectionMask, fs: Arc, - parquet_cache: C, + parquet_cache: Arc + Send + Sync>, ) -> Option { let (lower, upper) = range; let mut gens: VecDeque = version.level_slice[level][start..end + 1] @@ -107,10 +104,9 @@ where } } -impl<'level, R, C> Stream for LevelStream<'level, R, C> +impl<'level, R> Stream for LevelStream<'level, R> where R: Record, - C: LruCache + Unpin, { type Item = Result, ParquetError>; @@ -226,7 +222,7 @@ mod tests { use fusio_dispatch::FsOptions; use futures_util::StreamExt; use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; - use parquet_lru::NoopCache; + use parquet_lru::NoCache; use tempfile::TempDir; use crate::{ @@ -269,7 +265,7 @@ mod tests { [0, 1, 2, 3], ), manager.base_fs().clone(), - NoopCache::default(), + Arc::new(NoCache::default()), ) .unwrap(); @@ -306,7 +302,7 @@ mod tests { [0, 1, 2, 4], ), manager.base_fs().clone(), - NoopCache::default(), + Arc::new(NoCache::default()), ) .unwrap(); @@ -343,7 +339,7 @@ mod tests { [0, 1, 2], ), manager.base_fs().clone(), - NoopCache::default(), + Arc::new(NoCache::default()), ) .unwrap(); diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index f34d1ebd..ac76dd13 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -6,9 +6,7 @@ use std::{ use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; -use parquet_lru::LruCache; use pin_project_lite::pin_project; -use ulid::Ulid; use crate::{ record::Record, @@ -16,25 +14,20 @@ use crate::{ }; pin_project! { - pub struct MemProjectionStream<'projection, R, C> + pub struct MemProjectionStream<'projection, R> where R: Record, - C: LruCache, { - stream: Box>, + stream: Box>, projection_mask: Arc, } } -impl<'projection, R, C> MemProjectionStream<'projection, R, C> +impl<'projection, R> MemProjectionStream<'projection, R> where R: Record, - C: LruCache, { - pub(crate) fn new( - stream: ScanStream<'projection, R, C>, - projection_mask: ProjectionMask, - ) -> Self { + pub(crate) fn new(stream: ScanStream<'projection, R>, projection_mask: ProjectionMask) -> Self { Self { stream: Box::new(stream), projection_mask: Arc::new(projection_mask), @@ -42,10 +35,9 @@ where } } -impl<'projection, R, C> Stream for MemProjectionStream<'projection, R, C> +impl<'projection, R> Stream for MemProjectionStream<'projection, R> where R: Record, - C: LruCache + Unpin, { type Item = Result, ParquetError>; @@ -69,7 +61,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; - use parquet_lru::NoopCache; + use parquet_lru::NoCache; use ulid::Ulid; use crate::{ @@ -131,7 +123,7 @@ mod tests { vec![0, 1, 2, 4], ); - let mut stream = MemProjectionStream::>::new( + let mut stream = MemProjectionStream::::new( mutable .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into(), diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 6d83e023..08e51b78 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -7,20 +7,17 @@ use std::{ use futures_core::{ready, Stream}; use futures_util::stream::StreamExt; -use parquet_lru::LruCache; use pin_project_lite::pin_project; -use ulid::Ulid; use super::{Entry, ScanStream}; use crate::{record::Record, timestamp::Timestamp}; pin_project! { - pub struct MergeStream<'merge, R, C> + pub struct MergeStream<'merge, R> where R: Record, - C: LruCache, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, ts: Timestamp, @@ -28,13 +25,12 @@ pin_project! { } } -impl<'merge, R, C> MergeStream<'merge, R, C> +impl<'merge, R> MergeStream<'merge, R> where R: Record, - C: LruCache + Unpin, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ts: Timestamp, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -66,10 +62,9 @@ where } } -impl<'merge, R, C> Stream for MergeStream<'merge, R, C> +impl<'merge, R> Stream for MergeStream<'merge, R> where R: Record, - C: LruCache + Unpin, { type Item = Result, parquet::errors::ParquetError>; @@ -165,7 +160,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; - use parquet_lru::NoopCache; + use parquet_lru::NoCache; use ulid::Ulid; use super::MergeStream; @@ -219,7 +214,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::>::from_vec( + let mut merge = MergeStream::::from_vec( vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), @@ -298,12 +293,10 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::>::from_vec( - vec![m1.scan(bound, 0.into()).into()], - 0.into(), - ) - .await - .unwrap(); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()], 0.into()) + .await + .unwrap(); if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { assert_eq!(entry.key().value, "1"); @@ -328,12 +321,10 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::>::from_vec( - vec![m1.scan(bound, 1.into()).into()], - 1.into(), - ) - .await - .unwrap(); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()], 1.into()) + .await + .unwrap(); if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { assert_eq!(entry.key().value, "1"); @@ -382,7 +373,7 @@ mod tests { let lower = "1".to_string(); let upper = "3".to_string(); { - let mut merge = MergeStream::>::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], @@ -402,7 +393,7 @@ mod tests { assert!(merge.next().await.is_none()); } { - let mut merge = MergeStream::>::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a26dffeb..fa0b5afe 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -15,10 +15,8 @@ use std::{ use futures_core::Stream; use futures_util::{ready, stream}; use parquet::arrow::ProjectionMask; -use parquet_lru::LruCache; use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; -use ulid::Ulid; use crate::{ inmem::{immutable::ImmutableScan, mutable::MutableScan}, @@ -102,10 +100,9 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R, C> + pub enum ScanStream<'scan, R> where R: Record, - C: LruCache, { Transaction { #[pin] @@ -125,19 +122,18 @@ pin_project! { }, Level { #[pin] - inner: LevelStream<'scan, R, C>, + inner: LevelStream<'scan, R>, }, MemProjection { #[pin] - inner: MemProjectionStream<'scan, R, C>, + inner: MemProjectionStream<'scan, R>, } } } -impl<'scan, R, C> From> for ScanStream<'scan, R, C> +impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, - C: LruCache, { fn from(inner: TransactionScan<'scan, R>) -> Self { ScanStream::Transaction { @@ -146,10 +142,9 @@ where } } -impl<'scan, R, C> From> for ScanStream<'scan, R, C> +impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, - C: LruCache, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -158,10 +153,9 @@ where } } -impl<'scan, R, C> From> for ScanStream<'scan, R, C> +impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, - C: LruCache, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -170,30 +164,27 @@ where } } -impl<'scan, R, C> From> for ScanStream<'scan, R, C> +impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, - C: LruCache, { fn from(inner: SsTableScan<'scan, R>) -> Self { ScanStream::SsTable { inner } } } -impl<'scan, R, C> From> for ScanStream<'scan, R, C> +impl<'scan, R> From> for ScanStream<'scan, R> where R: Record, - C: LruCache, { - fn from(inner: MemProjectionStream<'scan, R, C>) -> Self { + fn from(inner: MemProjectionStream<'scan, R>) -> Self { ScanStream::MemProjection { inner } } } -impl fmt::Debug for ScanStream<'_, R, C> +impl fmt::Debug for ScanStream<'_, R> where R: Record, - C: LruCache, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -207,10 +198,9 @@ where } } -impl<'scan, R, C> Stream for ScanStream<'scan, R, C> +impl<'scan, R> Stream for ScanStream<'scan, R> where R: Record, - C: LruCache + Unpin, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/stream/package.rs b/src/stream/package.rs index b114878b..7e23b1b0 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -4,9 +4,7 @@ use std::{ }; use futures_core::Stream; -use parquet_lru::LruCache; use pin_project_lite::pin_project; -use ulid::Ulid; use crate::{ inmem::immutable::{ArrowArrays, Builder}, @@ -15,27 +13,25 @@ use crate::{ }; pin_project! { - pub struct PackageStream<'package, R, C> + pub struct PackageStream<'package, R> where R: Record, - C: LruCache, { row_count: usize, batch_size: usize, - inner: MergeStream<'package, R, C>, + inner: MergeStream<'package, R>, builder: ::Builder, projection_indices: Option>, } } -impl<'package, R, C> PackageStream<'package, R, C> +impl<'package, R> PackageStream<'package, R> where R: Record, - C: LruCache, { pub(crate) fn new( batch_size: usize, - merge: MergeStream<'package, R, C>, + merge: MergeStream<'package, R>, projection_indices: Option>, instance: &RecordInstance, ) -> Self { @@ -49,10 +45,9 @@ where } } -impl<'package, R, C> Stream for PackageStream<'package, R, C> +impl<'package, R> Stream for PackageStream<'package, R> where R: Record, - C: LruCache + Unpin, { type Item = Result; @@ -89,9 +84,7 @@ mod tests { use arrow::array::{BooleanArray, RecordBatch, StringArray, UInt32Array}; use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; - use parquet_lru::NoopCache; use tempfile::TempDir; - use ulid::Ulid; use crate::{ inmem::{ @@ -184,7 +177,7 @@ mod tests { .await .unwrap(); - let merge = MergeStream::>::from_vec( + let merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into()], diff --git a/src/transaction.rs b/src/transaction.rs index 4dadc312..88d8bdf3 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -10,9 +10,7 @@ use std::{ use flume::SendError; use lockable::AsyncLimit; use parquet::{arrow::ProjectionMask, errors::ParquetError}; -use parquet_lru::LruCache; use thiserror::Error; -use ulid::Ulid; use crate::{ compaction::CompactTask, @@ -44,22 +42,20 @@ where } /// optimistic ACID transaction, open with /// [`DB::transaction`](crate::DB::transaction) method -pub struct Transaction<'txn, R, C> +pub struct Transaction<'txn, R> where R: Record, - C: LruCache, { local: BTreeMap>, - snapshot: Snapshot<'txn, R, C>, + snapshot: Snapshot<'txn, R>, lock_map: LockMap, } -impl<'txn, R, C> Transaction<'txn, R, C> +impl<'txn, R> Transaction<'txn, R> where R: Record + Send, - C: LruCache + Unpin, { - pub(crate) fn new(snapshot: Snapshot<'txn, R, C>, lock_map: LockMap) -> Self { + pub(crate) fn new(snapshot: Snapshot<'txn, R>, lock_map: LockMap) -> Self { Self { local: BTreeMap::new(), snapshot, @@ -88,7 +84,7 @@ where pub fn scan<'scan, 'range>( &'scan self, range: (Bound<&'range R::Key>, Bound<&'range R::Key>), - ) -> Scan<'scan, 'range, R, C> { + ) -> Scan<'scan, 'range, R> { let ts = self.snapshot.ts(); let inner = self.local.range(range); self.snapshot._scan( @@ -263,7 +259,7 @@ mod tests { async fn transaction_read_write() { let temp_dir = TempDir::new().unwrap(); - let db = DB::::new( + let db = DB::::new( DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), TokioExecutor::new(), ) @@ -391,7 +387,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); @@ -424,7 +420,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); diff --git a/src/version/mod.rs b/src/version/mod.rs index 541b37d6..7a256daa 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -13,11 +13,10 @@ use std::{ use flume::{SendError, Sender}; use fusio::DynFs; use parquet::arrow::ProjectionMask; -use parquet_lru::LruCache; use thiserror::Error; use tracing::error; -use ulid::Ulid; +use crate::ParquetLru; use crate::{ fs::{manager::StoreManager, FileId, FileType}, ondisk::sstable::SsTable, @@ -118,16 +117,13 @@ impl Version where R: Record, { - pub(crate) async fn query( + pub(crate) async fn query( &self, manager: &StoreManager, key: &TimestampedRef, projection_mask: ProjectionMask, - parquet_cache: C, - ) -> Result>, VersionError> - where - C: LruCache, - { + parquet_cache: ParquetLru, + ) -> Result>, VersionError> { let level_0_path = self .option .level_fs_path(0) @@ -183,18 +179,15 @@ where Ok(None) } - async fn table_query( + async fn table_query( &self, store: &Arc, key: &TimestampedRef<::Key>, level: usize, gen: FileId, projection_mask: ProjectionMask, - parquet_cache: C, - ) -> Result>, VersionError> - where - C: LruCache, - { + parquet_cache: ParquetLru, + ) -> Result>, VersionError> { let file = store .open_options( &self.option.table_path(gen, level), @@ -202,7 +195,7 @@ where ) .await .map_err(VersionError::Fusio)?; - SsTable::::open(parquet_cache, gen, file) + SsTable::::open(parquet_cache, gen, file) .await? .get(key, projection_mask) .await @@ -220,19 +213,16 @@ where } #[allow(clippy::too_many_arguments)] - pub(crate) async fn streams<'streams, C>( + pub(crate) async fn streams<'streams>( &self, manager: &StoreManager, - streams: &mut Vec>, + streams: &mut Vec>, range: (Bound<&'streams R::Key>, Bound<&'streams R::Key>), ts: Timestamp, limit: Option, projection_mask: ProjectionMask, - parquet_cache: C, - ) -> Result<(), VersionError> - where - C: LruCache, - { + parquet_cache: ParquetLru, + ) -> Result<(), VersionError> { let level_0_path = self .option .level_fs_path(0) diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index b1f0e1e2..b4a1b13a 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -72,8 +72,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db: DB = - DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for _ in 0..WRITE_TIMES { let customer = gen_record(&mut rng, &mut primary_key_count);