Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move parquet cache to influxdb3_cache crate #25630

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use datafusion_util::config::register_iox_object_store;
use influxdb3_cache::{
last_cache::{self, LastCacheProvider},
meta_cache::MetaCacheProvider,
parquet_cache::create_cached_obj_store_and_oracle,
};
use influxdb3_process::{
build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID,
Expand All @@ -25,7 +26,6 @@ use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
parquet_cache::create_cached_obj_store_and_oracle,
persister::Persister,
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs},
WriteBuffer,
Expand Down
6 changes: 6 additions & 0 deletions influxdb3_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ influxdb3_wal = { path = "../influxdb3_wal" }
anyhow.workspace = true
arrow.workspace = true
async-trait.workspace = true
bytes.workspace = true
dashmap.workspace = true
datafusion.workspace = true
futures.workspace = true
indexmap.workspace = true
parking_lot.workspace = true
object_store.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand All @@ -34,11 +38,13 @@ data_types.workspace = true

# Local deps
influxdb3_write = { path = "../influxdb3_write" }
influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }

# crates.io deps
bimap.workspace = true
insta.workspace = true
object_store.workspace = true
pretty_assertions.workspace = true
test-log.workspace = true

[lints]
Expand Down
1 change: 1 addition & 0 deletions influxdb3_cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod last_cache;
pub mod meta_cache;
pub mod parquet_cache;

#[cfg(test)]
mod test_helpers;
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type DynError = Arc<dyn std::error::Error + Send + Sync>;
///
/// Contains a notifier to notify the caller that registers the cache request when the item
/// has been cached successfully (or if the cache request failed in some way)
#[derive(Debug)]
pub struct CacheRequest {
path: Path,
notifier: oneshot::Sender<()>,
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,12 @@ mod tests {
use hyper::{body, Body, Client, Request, Response, StatusCode};
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_id::{DbId, TableId};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::WalConfig;
use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::persisted_files::PersistedFiles;
use influxdb3_write::WriteBuffer;
Expand Down
6 changes: 4 additions & 2 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,15 @@ mod tests {
use data_types::NamespaceName;
use datafusion::{assert_batches_sorted_eq, error::DataFusionError};
use futures::TryStreamExt;
use influxdb3_cache::{last_cache::LastCacheProvider, meta_cache::MetaCacheProvider};
use influxdb3_cache::{
last_cache::LastCacheProvider, meta_cache::MetaCacheProvider,
parquet_cache::test_cached_obj_store_and_oracle,
};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
parquet_cache::test_cached_obj_store_and_oracle,
persister::Persister,
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs},
WriteBuffer,
Expand Down
1 change: 0 additions & 1 deletion influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! metadata of the parquet files that were written in that snapshot.

pub mod chunk;
pub mod parquet_cache;
pub mod paths;
pub mod persister;
pub mod write_buffer;
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pub mod queryable_buffer;
mod table_buffer;
pub mod validator;

use crate::parquet_cache::ParquetCacheOracle;
use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::queryable_buffer::QueryableBuffer;
Expand All @@ -26,6 +25,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::logical_expr::Expr;
use influxdb3_cache::last_cache::{self, LastCacheProvider};
use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider};
use influxdb3_cache::parquet_cache::ParquetCacheOracle;
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::{object_store::WalObjectStore, DeleteDatabaseDefinition};
Expand Down Expand Up @@ -699,7 +699,6 @@ impl WriteBuffer for WriteBufferImpl {}
#[allow(clippy::await_holding_lock)]
mod tests {
use super::*;
use crate::parquet_cache::test_cached_obj_store_and_oracle;
use crate::paths::{CatalogFilePath, SnapshotInfoFilePath};
use crate::persister::Persister;
use crate::PersistedSnapshot;
Expand All @@ -708,6 +707,7 @@ mod tests {
use bytes::Bytes;
use datafusion_util::config::register_iox_object_store;
use futures_util::StreamExt;
use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_id::{DbId, ParquetFileId};
use influxdb3_test_helpers::object_store::RequestCountedObjectStore;
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::chunk::BufferChunk;
use crate::parquet_cache::{CacheRequest, ParquetCacheOracle};
use crate::paths::ParquetFilePath;
use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
Expand All @@ -18,6 +17,7 @@ use datafusion_util::stream_from_batches;
use hashbrown::HashMap;
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_id::{DbId, TableId};
use influxdb3_wal::{CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch};
Expand Down
Loading