Skip to content

Commit

Permalink
refactor: use parquet lru reader as trait object
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Nov 19, 2024
1 parent 2e3b9ae commit 4c5d905
Show file tree
Hide file tree
Showing 17 changed files with 206 additions and 291 deletions.
2 changes: 1 addition & 1 deletion bindings/python/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type PyExecutor = TokioExecutor;
pub struct TonboDB {
desc: Arc<Vec<Column>>,
primary_key_index: usize,
db: Arc<DB<DynRecord, PyExecutor, NoopCache<FileId>>>,
db: Arc<DB<DynRecord, PyExecutor>>,
}

#[pymethods]
Expand Down
8 changes: 4 additions & 4 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::{
use fusio::path::Path;
use futures_core::Stream;
use futures_util::StreamExt;
use parquet_lru::NoopCache;
use parquet_lru::NoCache;
use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor, fs::FileId, inmem::immutable::ArrowArrays, record::Record,
Expand All @@ -43,12 +43,12 @@ pub struct Music {
}

struct MusicProvider {
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
}

struct MusicExec {
cache: PlanProperties,
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
Expand Down Expand Up @@ -98,7 +98,7 @@ impl TableProvider for MusicProvider {

impl MusicExec {
fn new(
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
projection: Option<&Vec<usize>>,
) -> Self {
let schema = Music::arrow_schema();
Expand Down
3 changes: 1 addition & 2 deletions parquet-lru/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ name = "parquet-lru"
version = "0.1.0"

[features]
default = ["foyer"]
foyer = ["dep:foyer", "dep:serde"]
full = ["foyer"]

[dependencies]
bytes = { version = "1.8.0", features = ["serde"] }
Expand All @@ -17,4 +17,3 @@ futures-core = "0.3.31"
futures-util = "0.3.31"
parquet = { version = "53.2.0", features = ["async"] }
serde = { version = "1.0.214", optional = true }
thiserror = "2.0.3"
25 changes: 8 additions & 17 deletions parquet-lru/src/foyer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use parquet::{
};
use serde::{Deserialize, Serialize};

use crate::{Error, LruCache, Options};
use crate::LruCache;

#[derive(Clone)]
pub struct FoyerCache<K>
Expand All @@ -32,23 +32,14 @@ impl<K> LruCache<K> for FoyerCache<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
{
type LruReader<R: AsyncFileReader + 'static> = FoyerReader<K, R>;
type LruReader<R> = FoyerReader<K, R>
where
R: AsyncFileReader + 'static;

async fn new(options: Options) -> Result<Self, Error> {
Ok(Self {
inner: Arc::new(FoyerCacheInner {
meta: foyer::CacheBuilder::new(options.meta_capacity).build(),
data: foyer::HybridCacheBuilder::new()
.memory(options.data_capacity)
.storage(foyer::Engine::Large)
.build()
.await
.map_err(|e| Error::External(e.into()))?,
}),
})
}

async fn get_reader<R: AsyncFileReader>(&self, key: K, reader: R) -> FoyerReader<K, R> {
async fn get_reader<R>(&self, key: K, reader: R) -> FoyerReader<K, R>
where
R: AsyncFileReader,
{
FoyerReader::new(self.clone(), key, reader)
}
}
Expand Down
77 changes: 35 additions & 42 deletions parquet-lru/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,56 @@
mod r#dyn;

Check failure on line 1 in parquet-lru/src/lib.rs

View workflow job for this annotation

GitHub Actions / check

file not found for module `r#dyn`

Check failure on line 1 in parquet-lru/src/lib.rs

View workflow job for this annotation

GitHub Actions / check

file not found for module `r#dyn`
#[cfg(feature = "foyer")]
pub mod foyer;

use std::{future::Future, marker::PhantomData};

use parquet::{arrow::async_reader::AsyncFileReader, errors::Result};
use thiserror::Error;
use parquet::arrow::async_reader::AsyncFileReader;

#[derive(Default)]
pub struct Options {
meta_capacity: usize,
data_capacity: usize,
}

impl Options {
pub fn meta_capacity(mut self, meta_capacity: usize) -> Self {
self.meta_capacity = meta_capacity;
self
}

pub fn data_capacity(mut self, data_capacity: usize) -> Self {
self.data_capacity = data_capacity;
self
}
}
pub use crate::r#dyn::*;

Check warning on line 9 in parquet-lru/src/lib.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `crate::r#dyn::*`

Check warning on line 9 in parquet-lru/src/lib.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `crate::r#dyn::*`

pub trait LruCache<K>: Clone + Send + Sync + 'static {
type LruReader<R: AsyncFileReader + 'static>: AsyncFileReader + 'static;

fn new(options: Options) -> impl Future<Output = Result<Self, Error>> + Send;
pub trait LruCache<K>
where
K: 'static,
{
type LruReader<R>: AsyncFileReader + 'static
where
R: AsyncFileReader + 'static;

fn get_reader<R>(&self, key: K, reader: R) -> impl Future<Output = Self::LruReader<R>> + Send
where
R: AsyncFileReader + 'static;
}

#[derive(Clone, Default)]
pub struct NoopCache<K> {
#[derive(Default)]
pub struct NoCache<K> {
_phantom: PhantomData<K>,
}

impl<K> LruCache<K> for NoopCache<K>
where
K: Send + Sync + Clone + 'static,
{
type LruReader<R: AsyncFileReader + 'static> = R;

async fn new(_options: Options) -> Result<Self, Error> {
Ok(Self {
impl<K> Clone for NoCache<K> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData,
})
}

async fn get_reader<R: AsyncFileReader>(&self, _key: K, reader: R) -> R {
reader
}
}
}

#[derive(Debug, Error)]
pub enum Error {
#[error("External lru implementation error: {0}")]
External(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
unsafe impl<K> Send for NoCache<K> {}

unsafe impl<K> Sync for NoCache<K> {}

impl<K> LruCache<K> for NoCache<K>
where
K: 'static,
{
type LruReader<R> = R
where
R: AsyncFileReader + 'static;

#[allow(clippy::manual_async_fn)]
fn get_reader<R>(&self, _key: K, reader: R) -> impl Future<Output = R> + Send
where
R: AsyncFileReader,
{
async move { reader }
}
}
42 changes: 17 additions & 25 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use fusio::DynFs;
use fusio_parquet::writer::AsyncWriter;
use futures_util::StreamExt;
use parquet::arrow::{AsyncArrowWriter, ProjectionMask};
use parquet_lru::LruCache;
use thiserror::Error;
use tokio::sync::oneshot;

use crate::ParquetLru;
use crate::{
fs::{manager::StoreManager, FileId, FileType},
inmem::{
Expand Down Expand Up @@ -60,13 +60,10 @@ where
}
}

pub(crate) async fn check_then_compaction<C>(
pub(crate) async fn check_then_compaction(
&mut self,
parquet_lru_cache: C,
) -> Result<(), CompactionError<R>>
where
C: LruCache<FileId> + Unpin,
{
parquet_lru_cache: ParquetLru,
) -> Result<(), CompactionError<R>> {
let mut guard = self.schema.write().await;

guard.trigger.reset();
Expand Down Expand Up @@ -114,7 +111,7 @@ where
&mut delete_gens,
&guard.record_instance,
&self.manager,
&parquet_lru_cache,
parquet_lru_cache,
)
.await?;
}
Expand Down Expand Up @@ -193,7 +190,7 @@ where
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn major_compaction<C>(
pub(crate) async fn major_compaction(
version: &Version<R>,
option: &DbOption<R>,
mut min: &R::Key,
Expand All @@ -202,11 +199,8 @@ where
delete_gens: &mut Vec<(FileId, usize)>,
instance: &RecordInstance,
manager: &StoreManager,
parquet_cache: &C,
) -> Result<(), CompactionError<R>>
where
C: LruCache<FileId> + Unpin,
{
parquet_cache: ParquetLru,
) -> Result<(), CompactionError<R>> {
let mut level = 0;

while level < MAX_LEVEL - 2 {
Expand Down Expand Up @@ -391,18 +385,15 @@ where
(meet_scopes_l, start_l, end_l - 1)
}

async fn build_tables<'scan, C>(
async fn build_tables<'scan>(
option: &DbOption<R>,
version_edits: &mut Vec<VersionEdit<<R as Record>::Key>>,
level: usize,
streams: Vec<ScanStream<'scan, R, C>>,
streams: Vec<ScanStream<'scan, R>>,
instance: &RecordInstance,
fs: &Arc<dyn DynFs>,
) -> Result<(), CompactionError<R>>
where
C: LruCache<FileId> + Unpin,
{
let mut stream = MergeStream::<R, C>::from_vec(streams, u32::MAX.into()).await?;
) -> Result<(), CompactionError<R>> {
let mut stream = MergeStream::<R>::from_vec(streams, u32::MAX.into()).await?;

// Kould: is the capacity parameter necessary?
let mut builder = R::Columns::builder(&instance.arrow_schema::<R>(), 8192);
Expand Down Expand Up @@ -529,8 +520,9 @@ pub(crate) mod tests {
use fusio_dispatch::FsOptions;
use fusio_parquet::writer::AsyncWriter;
use parquet::arrow::AsyncArrowWriter;
use parquet_lru::NoopCache;
use parquet_lru::{DynLruCache, NoCache};
use tempfile::TempDir;
use ulid::Ulid;

use crate::{
compaction::Compactor,
Expand Down Expand Up @@ -827,7 +819,7 @@ pub(crate) mod tests {
&mut vec![],
&RecordInstance::Normal,
&manager,
&NoopCache::default(),
Arc::new(NoCache::default()),
)
.await
.unwrap();
Expand Down Expand Up @@ -1219,7 +1211,7 @@ pub(crate) mod tests {
&mut vec![],
&RecordInstance::Normal,
&manager,
&NoopCache::default(),
Arc::new(NoCache::default()),
)
.await
.unwrap();
Expand All @@ -1240,7 +1232,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);

let db: DB<Test, TokioExecutor, _> = DB::new(option, TokioExecutor::new()).await.unwrap();
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new()).await.unwrap();

for i in 5..9 {
let item = Test {
Expand Down
Loading

0 comments on commit 4c5d905

Please sign in to comment.