From 53f01bf62bf74b885b90c3f0db0eafb315937012 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 9 Dec 2024 11:27:10 -0500 Subject: [PATCH] refactor: move parquet cache to influxdb3_cache crate --- Cargo.lock | 5 +++++ influxdb3/src/commands/serve.rs | 2 +- influxdb3_cache/Cargo.toml | 6 ++++++ influxdb3_cache/src/lib.rs | 1 + .../src/parquet_cache/mod.rs | 1 + influxdb3_server/src/lib.rs | 2 +- influxdb3_server/src/query_executor.rs | 6 ++++-- influxdb3_write/src/lib.rs | 1 - influxdb3_write/src/write_buffer/mod.rs | 4 ++-- influxdb3_write/src/write_buffer/queryable_buffer.rs | 2 +- 10 files changed, 22 insertions(+), 8 deletions(-) rename {influxdb3_write => influxdb3_cache}/src/parquet_cache/mod.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index 1374f4efcf2..fa6a782c966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3092,11 +3092,15 @@ dependencies = [ "arrow_util", "async-trait", "bimap", + "bytes", + "dashmap", "data_types", "datafusion", + "futures", "indexmap 2.7.0", "influxdb3_catalog", "influxdb3_id", + "influxdb3_test_helpers", "influxdb3_wal", "influxdb3_write", "insta", @@ -3104,6 +3108,7 @@ dependencies = [ "object_store", "observability_deps", "parking_lot", + "pretty_assertions", "schema", "serde", "test-log", diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index ecbcc1c6138..4144facfca7 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -11,6 +11,7 @@ use datafusion_util::config::register_iox_object_store; use influxdb3_cache::{ last_cache::{self, LastCacheProvider}, meta_cache::MetaCacheProvider, + parquet_cache::create_cached_obj_store_and_oracle, }; use influxdb3_process::{ build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID, @@ -25,7 +26,6 @@ use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ - parquet_cache::create_cached_obj_store_and_oracle, persister::Persister, write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs}, WriteBuffer, diff --git a/influxdb3_cache/Cargo.toml b/influxdb3_cache/Cargo.toml index 58cd511d72f..3d7d28788f9 100644 --- a/influxdb3_cache/Cargo.toml +++ b/influxdb3_cache/Cargo.toml @@ -20,9 +20,13 @@ influxdb3_wal = { path = "../influxdb3_wal" } anyhow.workspace = true arrow.workspace = true async-trait.workspace = true +bytes.workspace = true +dashmap.workspace = true datafusion.workspace = true +futures.workspace = true indexmap.workspace = true parking_lot.workspace = true +object_store.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true @@ -34,11 +38,13 @@ data_types.workspace = true # Local deps influxdb3_write = { path = "../influxdb3_write" } +influxdb3_test_helpers = { path = "../influxdb3_test_helpers" } # crates.io deps bimap.workspace = true insta.workspace = true object_store.workspace = true +pretty_assertions.workspace = true test-log.workspace = true [lints] diff --git a/influxdb3_cache/src/lib.rs b/influxdb3_cache/src/lib.rs index 3abc2e1e8cd..41550938492 100644 --- a/influxdb3_cache/src/lib.rs +++ b/influxdb3_cache/src/lib.rs @@ -2,6 +2,7 @@ pub mod last_cache; pub mod meta_cache; +pub mod parquet_cache; #[cfg(test)] mod test_helpers; diff --git a/influxdb3_write/src/parquet_cache/mod.rs b/influxdb3_cache/src/parquet_cache/mod.rs similarity index 99% rename from influxdb3_write/src/parquet_cache/mod.rs rename to influxdb3_cache/src/parquet_cache/mod.rs index 9605272e62c..d83600cd771 100644 --- a/influxdb3_write/src/parquet_cache/mod.rs +++ b/influxdb3_cache/src/parquet_cache/mod.rs @@ -40,6 +40,7 @@ type DynError = Arc; /// /// Contains a notifier to notify the caller that registers the cache request when the item /// has been cached successfully (or if the cache request failed in some way) +#[derive(Debug)] pub struct CacheRequest { path: Path, notifier: oneshot::Sender<()>, diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index b834e862af9..b31f0c1b0cf 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -247,12 +247,12 @@ mod tests { use hyper::{body, Body, Client, Request, Response, StatusCode}; use influxdb3_cache::last_cache::LastCacheProvider; use influxdb3_cache::meta_cache::MetaCacheProvider; + use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_catalog::catalog::Catalog; use influxdb3_id::{DbId, TableId}; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::WalConfig; - use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_write::persister::Persister; use influxdb3_write::write_buffer::persisted_files::PersistedFiles; use influxdb3_write::WriteBuffer; diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index bf22e5aa39c..625ec0090c2 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -620,13 +620,15 @@ mod tests { use data_types::NamespaceName; use datafusion::{assert_batches_sorted_eq, error::DataFusionError}; use futures::TryStreamExt; - use influxdb3_cache::{last_cache::LastCacheProvider, meta_cache::MetaCacheProvider}; + use influxdb3_cache::{ + last_cache::LastCacheProvider, meta_cache::MetaCacheProvider, + parquet_cache::test_cached_obj_store_and_oracle, + }; use influxdb3_catalog::catalog::Catalog; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ - parquet_cache::test_cached_obj_store_and_oracle, persister::Persister, write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs}, WriteBuffer, diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 565cc5269c0..a8b15b1a554 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -5,7 +5,6 @@ //! metadata of the parquet files that were written in that snapshot. pub mod chunk; -pub mod parquet_cache; pub mod paths; pub mod persister; pub mod write_buffer; diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 60ed2be62f1..43a7522c9b3 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -5,7 +5,6 @@ pub mod queryable_buffer; mod table_buffer; pub mod validator; -use crate::parquet_cache::ParquetCacheOracle; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::queryable_buffer::QueryableBuffer; @@ -26,6 +25,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::logical_expr::Expr; use influxdb3_cache::last_cache::{self, LastCacheProvider}; use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider}; +use influxdb3_cache::parquet_cache::ParquetCacheOracle; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{ColumnId, DbId, TableId}; use influxdb3_wal::{object_store::WalObjectStore, DeleteDatabaseDefinition}; @@ -699,7 +699,6 @@ impl WriteBuffer for WriteBufferImpl {} #[allow(clippy::await_holding_lock)] mod tests { use super::*; - use crate::parquet_cache::test_cached_obj_store_and_oracle; use crate::paths::{CatalogFilePath, SnapshotInfoFilePath}; use crate::persister::Persister; use crate::PersistedSnapshot; @@ -708,6 +707,7 @@ mod tests { use bytes::Bytes; use datafusion_util::config::register_iox_object_store; use futures_util::StreamExt; + use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_id::{DbId, ParquetFileId}; use influxdb3_test_helpers::object_store::RequestCountedObjectStore; diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index f4509b323e2..7adc4fc0db0 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -1,5 +1,4 @@ use crate::chunk::BufferChunk; -use crate::parquet_cache::{CacheRequest, ParquetCacheOracle}; use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; @@ -18,6 +17,7 @@ use datafusion_util::stream_from_batches; use hashbrown::HashMap; use influxdb3_cache::last_cache::LastCacheProvider; use influxdb3_cache::meta_cache::MetaCacheProvider; +use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{DbId, TableId}; use influxdb3_wal::{CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch};