diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 5fc00df1b8ee..c68f379c9c12 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -24,3 +24,7 @@ metasrv_addrs = ['127.0.0.1:3002'] timeout_millis = 3000 connect_timeout_millis = 5000 tcp_nodelay = false + +[compaction] +max_inflight_task = 4 +max_file_in_level0 = 16 diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index bcc9e4444605..2ac85407830f 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -399,6 +399,7 @@ mod tests { use mito::config::EngineConfig; use mito::engine::MitoEngine; use object_store::ObjectStore; + use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::metadata::TableType; @@ -485,12 +486,14 @@ mod tests { .build() .unwrap(); let object_store = ObjectStore::new(accessor); + let noop_compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let table_engine = Arc::new(MitoEngine::new( EngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), Arc::new(NoopLogStore::default()), object_store.clone(), + noop_compaction_scheduler, ), object_store, )); diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 4c1f066f1d2f..f17542c86aec 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -143,7 +143,7 @@ mod tests { use std::assert_matches::assert_matches; use std::time::Duration; - use datanode::datanode::ObjectStoreConfig; + use datanode::datanode::{CompactionConfig, ObjectStoreConfig}; use servers::Mode; use super::*; @@ -181,6 +181,14 @@ mod tests { ObjectStoreConfig::S3 { .. } => unreachable!(), ObjectStoreConfig::Oss { .. } => unreachable!(), }; + + assert_eq!( + CompactionConfig { + max_inflight_task: 4, + max_file_in_level0: 16, + }, + options.compaction + ); } #[test] diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index 08f5642066cc..ed08014505d1 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -205,7 +205,7 @@ impl TimestampRange { pub fn new_inclusive(start: Option, end: Option) -> Self { // check for emptiness if let (Some(start_ts), Some(end_ts)) = (start, end) { - if start_ts >= end_ts { + if start_ts > end_ts { return Self::empty(); } } @@ -462,4 +462,29 @@ mod tests { assert!(!full.intersects(&empty)); } + + #[test] + fn test_new_inclusive() { + let range = TimestampRange::new_inclusive( + Some(Timestamp::new_millisecond(1)), + Some(Timestamp::new_millisecond(3)), + ); + assert!(!range.is_empty()); + assert!(range.contains(&Timestamp::new_millisecond(1))); + assert!(range.contains(&Timestamp::new_millisecond(3))); + + let range = TimestampRange::new_inclusive( + Some(Timestamp::new_millisecond(1)), + Some(Timestamp::new_millisecond(1)), + ); + assert!(!range.is_empty()); + assert_eq!(1, range.start.unwrap().value()); + assert!(range.contains(&Timestamp::new_millisecond(1))); + + let range = TimestampRange::new_inclusive( + Some(Timestamp::new_millisecond(2)), + Some(Timestamp::new_millisecond(1)), + ); + assert!(range.is_empty()); + } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index bf2a43a4ff29..5b1773583bdf 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -20,6 +20,8 @@ use common_telemetry::info; use meta_client::MetaClientOpts; use serde::{Deserialize, Serialize}; use servers::Mode; +use storage::compaction::CompactionSchedulerConfig; +use storage::config::EngineConfig as StorageEngineConfig; use crate::error::Result; use crate::instance::{Instance, InstanceRef}; @@ -104,6 +106,40 @@ impl Default for WalConfig { } } +/// Options for table compaction +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +pub struct CompactionConfig { + /// Max task number that can concurrently run. + pub max_inflight_task: usize, + /// Max files in level 0 to trigger compaction. + pub max_file_in_level0: usize, +} + +impl Default for CompactionConfig { + fn default() -> Self { + Self { + max_inflight_task: 4, + max_file_in_level0: 8, + } + } +} + +impl From<&DatanodeOptions> for CompactionSchedulerConfig { + fn from(value: &DatanodeOptions) -> Self { + Self { + max_inflight_task: value.compaction.max_inflight_task, + } + } +} + +impl From<&DatanodeOptions> for StorageEngineConfig { + fn from(value: &DatanodeOptions) -> Self { + Self { + max_files_in_l0: value.compaction.max_file_in_level0, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct DatanodeOptions { @@ -117,6 +153,7 @@ pub struct DatanodeOptions { pub wal: WalConfig, pub storage: ObjectStoreConfig, pub enable_memory_catalog: bool, + pub compaction: CompactionConfig, pub mode: Mode, } @@ -133,6 +170,7 @@ impl Default for DatanodeOptions { wal: WalConfig::default(), storage: ObjectStoreConfig::default(), enable_memory_catalog: false, + compaction: CompactionConfig::default(), mode: Mode::Standalone, } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index f0395bbbe90f..1b828ad3def6 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -38,8 +38,12 @@ use object_store::{util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use servers::Mode; use snafu::prelude::*; +use storage::compaction::{ + CompactionSchedulerConfig, CompactionSchedulerRef, LocalCompactionScheduler, SimplePicker, +}; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; +use store_api::logstore::LogStore; use table::table::numbers::NumbersTable; use table::table::TableIdProviderRef; use table::Table; @@ -92,12 +96,15 @@ impl Instance { } }; + let compaction_scheduler = create_compaction_scheduler(opts); + let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( - StorageEngineConfig::default(), + StorageEngineConfig::from(opts), logstore.clone(), object_store.clone(), + compaction_scheduler, ), object_store, )); @@ -204,6 +211,13 @@ impl Instance { } } +fn create_compaction_scheduler(opts: &DatanodeOptions) -> CompactionSchedulerRef { + let picker = SimplePicker::default(); + let config = CompactionSchedulerConfig::from(opts); + let scheduler = LocalCompactionScheduler::new(config, picker); + Arc::new(scheduler) +} + pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { let object_store = match store_config { ObjectStoreConfig::File { .. } => new_fs_object_store(store_config).await, diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 3d0e4bd6eaee..ed764c2fae3d 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -24,6 +24,7 @@ use mito::config::EngineConfig as TableEngineConfig; use query::QueryEngineFactory; use servers::Mode; use snafu::ResultExt; +use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::metadata::TableId; @@ -46,12 +47,14 @@ impl Instance { let object_store = new_object_store(&opts.storage).await?; let logstore = Arc::new(create_log_store(&opts.wal).await?); let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), logstore.clone(), object_store.clone(), + compaction_scheduler, ), object_store, )); diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index fdb8770ea754..1ff9206adbd2 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -150,6 +150,7 @@ mod tests { use query::parser::{QueryLanguageParser, QueryStatement}; use query::QueryEngineFactory; use sql::statements::statement::Statement; + use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::engine::TableReference; @@ -209,7 +210,7 @@ mod tests { let store_dir = dir.path().to_string_lossy(); let accessor = Builder::default().root(&store_dir).build().unwrap(); let object_store = ObjectStore::new(accessor); - + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let sql = r#"insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000) @@ -221,6 +222,7 @@ mod tests { StorageEngineConfig::default(), Arc::new(NoopLogStore::default()), object_store.clone(), + compaction_scheduler, ), object_store, )); diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index f9bde0feb795..e29d4fa98534 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -605,6 +605,7 @@ mod tests { Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef, }; use log_store::NoopLogStore; + use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::region::RegionImpl; use storage::EngineImpl; @@ -643,13 +644,14 @@ mod tests { let (dir, object_store) = test_util::new_test_object_store("test_insert_with_column_default_constraint").await; - + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let table_engine = MitoEngine::new( EngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), Arc::new(NoopLogStore::default()), object_store.clone(), + compaction_scheduler, ), object_store, ); diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index 35721d892431..4b4681f15cf5 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -23,6 +23,7 @@ use datatypes::vectors::VectorRef; use log_store::NoopLogStore; use object_store::services::fs::Builder; use object_store::ObjectStore; +use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::engine::{EngineContext, TableEngine}; @@ -127,11 +128,12 @@ pub struct TestEngineComponents { pub async fn setup_test_engine_and_table() -> TestEngineComponents { let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await; - + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let storage_engine = EngineImpl::new( StorageEngineConfig::default(), Arc::new(NoopLogStore::default()), object_store.clone(), + compaction_scheduler, ); let table_engine = MitoEngine::new( EngineConfig::default(), diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 11f3a42ccd6c..637e1194a742 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -118,6 +118,7 @@ mod tests { use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; use mito::engine::MitoEngine; + use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use tempdir::TempDir; @@ -135,12 +136,14 @@ mod tests { }; let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap(); + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let mock_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( StorageEngineConfig::default(), Arc::new(log_store), object_store.clone(), + compaction_scheduler, ), object_store, )); diff --git a/src/script/src/python/builtins.rs b/src/script/src/python/builtins.rs index db1f62208d33..71815f175038 100644 --- a/src/script/src/python/builtins.rs +++ b/src/script/src/python/builtins.rs @@ -962,7 +962,7 @@ pub(crate) mod greptime_builtin { Ok(obj) => match py_vec_obj_to_array(&obj, vm, 1){ Ok(v) => if v.len()==1{ Ok(v) - }else{ + } else { Err(vm.new_runtime_error(format!("Expect return's length to be at most one, found to be length of {}.", v.len()))) }, Err(err) => Err(vm diff --git a/src/storage/src/compaction.rs b/src/storage/src/compaction.rs index 4dfd973ae55b..87650ad157b0 100644 --- a/src/storage/src/compaction.rs +++ b/src/storage/src/compaction.rs @@ -13,9 +13,22 @@ // limitations under the License. mod dedup_deque; +pub mod noop; mod picker; mod rate_limit; mod scheduler; mod strategy; mod task; mod writer; + +use std::sync::Arc; + +pub use picker::{Picker, PickerContext, SimplePicker}; +pub use scheduler::{ + CompactionRequest, CompactionRequestImpl, CompactionScheduler, CompactionSchedulerConfig, + LocalCompactionScheduler, +}; +pub use task::{CompactionTask, CompactionTaskImpl}; + +pub type CompactionSchedulerRef = + Arc> + Send + Sync>; diff --git a/src/storage/src/compaction/noop.rs b/src/storage/src/compaction/noop.rs new file mode 100644 index 000000000000..3cb24e31a89c --- /dev/null +++ b/src/storage/src/compaction/noop.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; +use std::marker::PhantomData; + +use store_api::storage::RegionId; + +use crate::compaction::{ + CompactionRequest, CompactionScheduler, CompactionTask, Picker, PickerContext, +}; + +pub struct NoopCompactionScheduler { + _phantom_data: PhantomData, +} + +impl Default for NoopCompactionScheduler { + fn default() -> Self { + Self { + _phantom_data: Default::default(), + } + } +} + +impl Debug for NoopCompactionScheduler { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NoopCompactionScheduler<...>").finish() + } +} + +#[derive(Default, Debug)] +pub struct NoopCompactionRequest; + +#[derive(Default, Debug)] +pub struct NoopCompactionPicker; + +impl Picker for NoopCompactionPicker { + fn pick(&self, _ctx: &PickerContext, _req: &R) -> crate::error::Result> { + Ok(None) + } +} + +#[derive(Debug)] +pub struct NoopCompactionTask; + +#[async_trait::async_trait] +impl CompactionTask for NoopCompactionTask { + async fn run(self) -> crate::error::Result<()> { + Ok(()) + } +} + +impl CompactionRequest for NoopCompactionRequest { + fn region_id(&self) -> RegionId { + 0 + } +} + +#[async_trait::async_trait] +impl CompactionScheduler for NoopCompactionScheduler { + async fn schedule(&self, _request: R) -> crate::error::Result { + Ok(true) + } + + async fn stop(&self) -> crate::error::Result<()> { + Ok(()) + } +} diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 39b3223ed780..51edbb77ec01 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::marker::PhantomData; +use std::sync::Arc; use common_telemetry::debug; use store_api::logstore::LogStore; use crate::compaction::scheduler::CompactionRequestImpl; -use crate::compaction::strategy::StrategyRef; +use crate::compaction::strategy::{SimpleTimeWindowStrategy, StrategyRef}; use crate::compaction::task::{CompactionTask, CompactionTaskImpl}; /// Picker picks input SST files and builds the compaction task. @@ -30,12 +31,17 @@ pub trait Picker: Send + 'static { pub struct PickerContext {} /// L0 -> L1 compaction based on time windows. -pub(crate) struct SimplePicker { +pub struct SimplePicker { strategy: StrategyRef, _phantom_data: PhantomData, } -#[allow(unused)] +impl Default for SimplePicker { + fn default() -> Self { + Self::new(Arc::new(SimpleTimeWindowStrategy {})) + } +} + impl SimplePicker { pub fn new(strategy: StrategyRef) -> Self { Self { @@ -51,7 +57,7 @@ impl Picker, CompactionTaskImpl> for Si ctx: &PickerContext, req: &CompactionRequestImpl, ) -> crate::error::Result>> { - let levels = &req.levels; + let levels = &req.levels(); for level_num in 0..levels.level_num() { let level = levels.level(level_num as u8); @@ -67,7 +73,7 @@ impl Picker, CompactionTaskImpl> for Si outputs, level_num ); return Ok(Some(CompactionTaskImpl { - schema: req.schema.clone(), + schema: req.schema(), sst_layer: req.sst_layer.clone(), outputs, writer: req.writer.clone(), diff --git a/src/storage/src/compaction/rate_limit.rs b/src/storage/src/compaction/rate_limit.rs index 6b96afe7ee45..5df5744dadf5 100644 --- a/src/storage/src/compaction/rate_limit.rs +++ b/src/storage/src/compaction/rate_limit.rs @@ -50,7 +50,6 @@ pub struct MaxInflightTaskLimiter { _phantom_data: PhantomData, } -#[allow(unused)] impl MaxInflightTaskLimiter { pub fn new(max_inflight_task: usize) -> Self { Self { diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index 17bbad72710a..d21894d1daa3 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::sync::{Arc, Mutex, RwLock}; use async_trait::async_trait; -use common_telemetry::{debug, info}; +use common_telemetry::{debug, error, info}; use snafu::ResultExt; use store_api::logstore::LogStore; -use table::metadata::TableId; +use store_api::storage::RegionId; use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -38,11 +39,9 @@ use crate::sst::AccessLayerRef; use crate::version::LevelMetasRef; use crate::wal::Wal; -/// Table compaction request. +/// Region compaction request. pub struct CompactionRequestImpl { - table_id: TableId, - pub levels: LevelMetasRef, - pub schema: RegionSchemaRef, + pub region_id: RegionId, pub sst_layer: AccessLayerRef, pub writer: RegionWriterRef, pub shared: SharedDataRef, @@ -50,36 +49,48 @@ pub struct CompactionRequestImpl { pub wal: Wal, } +impl CompactionRequestImpl { + #[inline] + pub(crate) fn schema(&self) -> RegionSchemaRef { + self.shared.version_control.current().schema().clone() + } + + #[inline] + pub(crate) fn levels(&self) -> LevelMetasRef { + self.shared.version_control.current().ssts().clone() + } +} + impl CompactionRequest for CompactionRequestImpl { #[inline] - fn table_id(&self) -> TableId { - self.table_id + fn region_id(&self) -> RegionId { + self.region_id } } pub trait CompactionRequest: Send + Sync + 'static { - fn table_id(&self) -> TableId; + fn region_id(&self) -> RegionId; } #[derive(Debug)] pub struct CompactionSchedulerConfig { - max_inflight_task: usize, + pub max_inflight_task: usize, } impl Default for CompactionSchedulerConfig { fn default() -> Self { Self { - max_inflight_task: 16, + max_inflight_task: 4, } } } /// CompactionScheduler defines a set of API to schedule compaction tasks. #[async_trait] -pub trait CompactionScheduler { +pub trait CompactionScheduler: Debug { /// Schedules a compaction request. /// Returns true if request is scheduled. Returns false if task queue already - /// contains the request with same table id. + /// contains the request with same region id. async fn schedule(&self, request: R) -> Result; /// Stops compaction scheduler. @@ -87,14 +98,22 @@ pub trait CompactionScheduler { } /// Compaction task scheduler based on local state. -#[allow(unused)] pub struct LocalCompactionScheduler { - request_queue: Arc>>, + request_queue: Arc>>, cancel_token: CancellationToken, task_notifier: Arc, join_handle: Mutex>>, } +impl Debug for LocalCompactionScheduler +where + R: CompactionRequest + Send + Sync, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LocalCompactionScheduler<...>").finish() + } +} + #[async_trait] impl CompactionScheduler for LocalCompactionScheduler where @@ -103,11 +122,11 @@ where async fn schedule(&self, request: R) -> Result { debug!( "Schedule request: {}, queue size: {}", - request.table_id(), + request.region_id(), self.remaining_requests().await ); let mut queue = self.request_queue.write().unwrap(); - let res = queue.push_back(request.table_id(), request); + let res = queue.push_back(request.region_id(), request); self.task_notifier.notify_one(); Ok(res) } @@ -122,7 +141,6 @@ where } } -#[allow(unused)] impl LocalCompactionScheduler where R: CompactionRequest, @@ -132,7 +150,7 @@ where T: CompactionTask, P: Picker + Send + Sync, { - let request_queue: Arc>> = + let request_queue: Arc>> = Arc::new(RwLock::new(DedupDeque::default())); let cancel_token = CancellationToken::new(); let task_notifier = Arc::new(Notify::new()); @@ -164,9 +182,8 @@ where } } -#[allow(unused)] struct CompactionHandler> { - req_queue: Arc>>, + req_queue: Arc>>, cancel_token: CancellationToken, task_notifier: Arc, limiter: Arc>, @@ -174,9 +191,8 @@ struct CompactionHandler> { _phantom_data: PhantomData, } -#[allow(unused)] impl> CompactionHandler { - /// Runs table compaction requests dispatch loop. + /// Runs region compaction requests dispatch loop. pub async fn run(&self) { let task_notifier = self.task_notifier.clone(); let limiter = self.limiter.clone(); @@ -186,15 +202,19 @@ impl> CompactionHandler // poll requests as many as possible until rate limited, and then wait for // notification (some task's finished). debug!("Notified, queue size: {:?}", self.req_queue.read().unwrap().len()); - while let Some((table_id, req)) = self.poll_task().await { + while let Some((region_id, req)) = self.poll_task().await{ if let Ok(token) = limiter.acquire_token(&req) { - debug!("Executing compaction request: {}", table_id); - self.handle_compaction_request(req, token).await; + debug!("Executing compaction request: {}", region_id); + if let Err(e) = self.handle_compaction_request(req, token).await { + error!(e; "Failed to submit compaction task for region: {}", region_id); + } else { + info!("Submitted region compaction task: {}", region_id); + } } else { // compaction rate limited, put back to req queue to wait for next // schedule - debug!("Put back request {}, queue size: {}", table_id, self.req_queue.read().unwrap().len()); - self.put_back_req(table_id, req).await; + debug!("Put back request {}, queue size: {}", region_id, self.req_queue.read().unwrap().len()); + self.put_back_req(region_id, req).await; break; } } @@ -208,35 +228,36 @@ impl> CompactionHandler } #[inline] - async fn poll_task(&self) -> Option<(TableId, R)> { + async fn poll_task(&self) -> Option<(RegionId, R)> { let mut queue = self.req_queue.write().unwrap(); queue.pop_front() } /// Puts request back to the front of request queue. #[inline] - async fn put_back_req(&self, table_id: TableId, req: R) { + async fn put_back_req(&self, region_id: RegionId, req: R) { let mut queue = self.req_queue.write().unwrap(); - queue.push_front(table_id, req); + queue.push_front(region_id, req); } // Handles compaction request, submit task to bg runtime. - async fn handle_compaction_request( - &self, - mut req: R, - token: BoxedRateLimitToken, - ) -> Result<()> { + async fn handle_compaction_request(&self, req: R, token: BoxedRateLimitToken) -> Result<()> { let cloned_notify = self.task_notifier.clone(); - let table_id = req.table_id(); + let region_id = req.region_id(); let Some(task) = self.build_compaction_task(req).await? else { - info!("No file needs compaction in table: {}", table_id); + info!("No file needs compaction in region: {}", region_id); return Ok(()); }; + debug!("Compaction task, region: {}, task: {:?}", region_id, task); // TODO(hl): we need to keep a track of task handle here to allow task cancellation. common_runtime::spawn_bg(async move { - task.run().await; // TODO(hl): handle errors - + if let Err(e) = task.run().await { + // TODO(hl): maybe resubmit compaction task on failure? + error!(e; "Failed to compact region: {}", region_id); + } else { + info!("Successfully compacted region: {}", region_id); + } // releases rate limit token token.try_release(); // notify scheduler to schedule next task when current task finishes. @@ -246,7 +267,6 @@ impl> CompactionHandler Ok(()) } - // TODO(hl): generate compaction task(find SSTs to compact along with the output of compaction) async fn build_compaction_task(&self, req: R) -> crate::error::Result> { let ctx = PickerContext {}; self.picker.pick(&ctx, &req) @@ -333,12 +353,12 @@ mod tests { #[derive(Default, Debug)] struct MockRequest { - table_id: TableId, + region_id: RegionId, } impl CompactionRequest for MockRequest { - fn table_id(&self) -> TableId { - self.table_id + fn region_id(&self) -> RegionId { + self.region_id } } @@ -356,12 +376,12 @@ mod tests { ); scheduler - .schedule(MockRequest { table_id: 1 }) + .schedule(MockRequest { region_id: 1 }) .await .unwrap(); scheduler - .schedule(MockRequest { table_id: 2 }) + .schedule(MockRequest { region_id: 2 }) .await .unwrap(); @@ -390,7 +410,7 @@ mod tests { for i in 0..task_size { scheduler .schedule(MockRequest { - table_id: i as TableId, + region_id: i as RegionId, }) .await .unwrap(); @@ -420,7 +440,7 @@ mod tests { for i in 0..task_size / 2 { scheduler .schedule(MockRequest { - table_id: i as TableId, + region_id: i as RegionId, }) .await .unwrap(); @@ -430,7 +450,7 @@ mod tests { for i in task_size / 2..task_size { scheduler .schedule(MockRequest { - table_id: i as TableId, + region_id: i as RegionId, }) .await .unwrap(); @@ -453,7 +473,7 @@ mod tests { let mut scheduled_task = 0; for _ in 0..10 { if scheduler - .schedule(MockRequest { table_id: 1 }) + .schedule(MockRequest { region_id: 1 }) .await .unwrap() { diff --git a/src/storage/src/compaction/strategy.rs b/src/storage/src/compaction/strategy.rs index b1b11e422738..ff3e7fc4cd53 100644 --- a/src/storage/src/compaction/strategy.rs +++ b/src/storage/src/compaction/strategy.rs @@ -43,13 +43,14 @@ impl Strategy for SimpleTimeWindowStrategy { return vec![]; } let files = find_compactable_files(level); + debug!("Compactable files found: {:?}", files); if files.is_empty() { return vec![]; } let time_bucket = infer_time_bucket(&files); let buckets = calculate_time_buckets(time_bucket, &files); - debug!("File buckets: {:?}", buckets); + debug!("File bucket:{}, file groups: {:?}", time_bucket, buckets); buckets .into_iter() .map(|(bound, files)| CompactionOutput { @@ -89,12 +90,7 @@ fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap Result<()>; } -#[allow(unused)] -pub(crate) struct CompactionTaskImpl { +pub struct CompactionTaskImpl { pub schema: RegionSchemaRef, pub sst_layer: AccessLayerRef, pub outputs: Vec, @@ -45,6 +43,14 @@ pub(crate) struct CompactionTaskImpl { pub manifest: RegionManifest, } +impl Debug for CompactionTaskImpl { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CompactionTaskImpl") + .field("region_name", &self.shared_data.name()) + .finish() + } +} + impl Drop for CompactionTaskImpl { fn drop(&mut self) { self.mark_files_compacting(false); @@ -60,7 +66,6 @@ impl CompactionTaskImpl { for output in self.outputs.drain(..) { let schema = self.schema.clone(); let sst_layer = self.sst_layer.clone(); - let object_store = self.sst_layer.object_store(); compacted_inputs.extend(output.inputs.iter().map(|f| FileMeta { file_name: f.file_name().to_string(), time_range: *f.time_range(), @@ -69,7 +74,7 @@ impl CompactionTaskImpl { // TODO(hl): Maybe spawn to runtime to exploit in-job parallelism. futs.push(async move { - match output.build(schema, sst_layer, object_store).await { + match output.build(schema, sst_layer).await { Ok(meta) => Ok(meta), Err(e) => Err(e), } @@ -137,17 +142,9 @@ impl CompactionTask for CompactionTaskImpl { } } -#[allow(unused)] -pub(crate) struct CompactionInput { - input_level: u8, - output_level: u8, - file: FileHandle, -} - /// Many-to-many compaction can be decomposed to a many-to-one compaction from level n to level n+1 /// and a many-to-one compaction from level n+1 to level n+1. #[derive(Debug)] -#[allow(unused)] pub struct CompactionOutput { /// Compaction output file level. pub(crate) output_level: Level, @@ -160,15 +157,10 @@ pub struct CompactionOutput { } impl CompactionOutput { - async fn build( - &self, - schema: RegionSchemaRef, - sst_layer: AccessLayerRef, - object_store: ObjectStore, - ) -> Result { + async fn build(&self, schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Result { let reader = build_sst_reader( schema, - sst_layer, + sst_layer.clone(), &self.inputs, self.bucket_bound, self.bucket_bound + self.bucket, @@ -176,10 +168,10 @@ impl CompactionOutput { .await?; let output_file_name = format!("{}.parquet", Uuid::new_v4().hyphenated()); let opts = WriteOptions {}; - let SstInfo { time_range } = - ParquetWriter::new(&output_file_name, Source::Reader(reader), object_store) - .write_sst(&opts) - .await?; + + let SstInfo { time_range } = sst_layer + .write_sst(&output_file_name, Source::Reader(reader), &opts) + .await?; Ok(FileMeta { file_name: output_file_name, @@ -197,10 +189,18 @@ pub mod tests { use crate::compaction::task::CompactionTask; pub type CallbackRef = Arc; + pub struct NoopCompactionTask { pub cbs: Vec, } + impl Debug for NoopCompactionTask { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("storage::compaction::task::tests::NoopCompactionTask") + .finish() + } + } + impl NoopCompactionTask { pub fn new(cbs: Vec) -> Self { Self { cbs } diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index e19877444ac4..880a13f2f3a2 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -102,8 +102,8 @@ mod tests { }; use crate::metadata::RegionMetadata; use crate::sst; - use crate::sst::parquet::{ParquetWriter, Source}; - use crate::sst::{FileMeta, FsAccessLayer, SstInfo, WriteOptions}; + use crate::sst::parquet::ParquetWriter; + use crate::sst::{FileMeta, FsAccessLayer, Source, SstInfo, WriteOptions}; use crate::test_util::descriptor_util::RegionDescBuilder; fn schema_for_test() -> RegionSchemaRef { diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index 1f6bf31efc3f..26f2741461a4 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -14,5 +14,13 @@ //! storage engine config -#[derive(Debug, Default, Clone)] -pub struct EngineConfig {} +#[derive(Debug, Clone)] +pub struct EngineConfig { + pub max_files_in_l0: usize, +} + +impl Default for EngineConfig { + fn default() -> Self { + Self { max_files_in_l0: 8 } + } +} diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 8664facc9bdf..c1dbf8968f6b 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -25,6 +25,7 @@ use store_api::storage::{ }; use crate::background::JobPoolImpl; +use crate::compaction::CompactionSchedulerRef; use crate::config::EngineConfig; use crate::error::{self, Error, Result}; use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; @@ -84,9 +85,19 @@ impl StorageEngine for EngineImpl { } impl EngineImpl { - pub fn new(config: EngineConfig, log_store: Arc, object_store: ObjectStore) -> Self { + pub fn new( + config: EngineConfig, + log_store: Arc, + object_store: ObjectStore, + compaction_scheduler: CompactionSchedulerRef, + ) -> Self { Self { - inner: Arc::new(EngineInner::new(config, log_store, object_store)), + inner: Arc::new(EngineInner::new( + config, + log_store, + object_store, + compaction_scheduler, + )), } } } @@ -210,13 +221,19 @@ struct EngineInner { memtable_builder: MemtableBuilderRef, flush_scheduler: FlushSchedulerRef, flush_strategy: FlushStrategyRef, + compaction_scheduler: CompactionSchedulerRef, + config: Arc, } impl EngineInner { - pub fn new(_config: EngineConfig, log_store: Arc, object_store: ObjectStore) -> Self { + pub fn new( + config: EngineConfig, + log_store: Arc, + object_store: ObjectStore, + compaction_scheduler: CompactionSchedulerRef, + ) -> Self { let job_pool = Arc::new(JobPoolImpl {}); let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); - Self { object_store, log_store, @@ -224,6 +241,8 @@ impl EngineInner { memtable_builder: Arc::new(DefaultMemtableBuilder::default()), flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), + compaction_scheduler, + config: Arc::new(config), } } @@ -320,6 +339,8 @@ impl EngineInner { memtable_builder: self.memtable_builder.clone(), flush_scheduler: self.flush_scheduler.clone(), flush_strategy: self.flush_strategy.clone(), + compaction_scheduler: self.compaction_scheduler.clone(), + engine_config: self.config.clone(), } } } @@ -333,6 +354,7 @@ mod tests { use tempdir::TempDir; use super::*; + use crate::compaction::noop::NoopCompactionScheduler; use crate::test_util::descriptor_util::RegionDescBuilder; #[tokio::test] @@ -347,7 +369,14 @@ mod tests { let config = EngineConfig::default(); - let engine = EngineImpl::new(config, Arc::new(log_store), object_store); + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); + + let engine = EngineImpl::new( + config, + Arc::new(log_store), + object_store, + compaction_scheduler, + ); let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index e61bc244092e..278a3aa4dc4e 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; @@ -27,7 +29,7 @@ use crate::manifest::action::*; use crate::manifest::region::RegionManifest; use crate::memtable::{IterContext, MemtableId, MemtableRef}; use crate::region::{RegionWriterRef, SharedDataRef}; -use crate::sst::{AccessLayerRef, FileMeta, SstInfo, WriteOptions}; +use crate::sst::{AccessLayerRef, FileMeta, Source, SstInfo, WriteOptions}; use crate::wal::Wal; /// Default write buffer size (32M). @@ -142,6 +144,8 @@ impl FlushScheduler for FlushSchedulerImpl { pub type FlushSchedulerRef = Arc; +pub type FlushCallback = Pin + Send + 'static>>; + pub struct FlushJob { /// Max memtable id in these memtables, /// used to remove immutable memtables in current version. @@ -160,10 +164,12 @@ pub struct FlushJob { pub wal: Wal, /// Region manifest service, used to persist metadata. pub manifest: RegionManifest, + /// Callbacks that get invoked on flush success. + pub on_success: Option, } impl FlushJob { - async fn write_memtables_to_layer(&self, ctx: &Context) -> Result> { + async fn write_memtables_to_layer(&mut self, ctx: &Context) -> Result> { if ctx.is_cancelled() { return CancelledSnafu {}.fail(); } @@ -184,10 +190,11 @@ impl FlushJob { let file_name = Self::generate_sst_file_name(); // TODO(hl): Check if random file name already exists in meta. let iter = m.iter(&iter_ctx)?; + let sst_layer = self.sst_layer.clone(); + futures.push(async move { - let SstInfo { time_range } = self - .sst_layer - .write_sst(&file_name, iter, &WriteOptions::default()) + let SstInfo { time_range } = sst_layer + .write_sst(&file_name, Source::Iter(iter), &WriteOptions::default()) .await?; Ok(FileMeta { @@ -209,7 +216,7 @@ impl FlushJob { Ok(metas) } - async fn write_manifest_and_apply(&self, file_metas: &[FileMeta]) -> Result<()> { + async fn write_manifest_and_apply(&mut self, file_metas: &[FileMeta]) -> Result<()> { let edit = RegionEdit { region_version: self.shared.version_control.metadata().version(), flushed_sequence: Some(self.flush_sequence), @@ -241,6 +248,10 @@ impl Job for FlushJob { async fn run(&mut self, ctx: &Context) -> Result<()> { let file_metas = self.write_memtables_to_layer(ctx).await?; self.write_manifest_and_apply(&file_metas).await?; + + if let Some(cb) = self.on_success.take() { + cb.await; + } Ok(()) } } diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index cc6c592e471a..78c573217dc1 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -17,7 +17,7 @@ mod background; mod chunk; pub mod codec; -mod compaction; +pub mod compaction; pub mod config; mod engine; pub mod error; diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 05e66352fa33..558674ce7cbb 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -28,6 +28,8 @@ use store_api::storage::{ WriteResponse, }; +use crate::compaction::CompactionSchedulerRef; +use crate::config::EngineConfig; use crate::error::{self, Error, Result}; use crate::flush::{FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::action::{ @@ -107,13 +109,15 @@ impl Region for RegionImpl { /// /// Contains all necessary storage related components needed by the region, such as logstore, /// manifest, memtable builder. -pub struct StoreConfig { +pub struct StoreConfig { pub log_store: Arc, pub sst_layer: AccessLayerRef, pub manifest: RegionManifest, pub memtable_builder: MemtableBuilderRef, pub flush_scheduler: FlushSchedulerRef, pub flush_strategy: FlushStrategyRef, + pub compaction_scheduler: CompactionSchedulerRef, + pub engine_config: Arc, } pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); @@ -163,10 +167,14 @@ impl RegionImpl { name, version_control: Arc::new(version_control), }), - writer: Arc::new(RegionWriter::new(store_config.memtable_builder)), + writer: Arc::new(RegionWriter::new( + store_config.memtable_builder, + store_config.engine_config.clone(), + )), wal, flush_strategy: store_config.flush_strategy, flush_scheduler: store_config.flush_scheduler, + compaction_scheduler: store_config.compaction_scheduler, sst_layer: store_config.sst_layer, manifest: store_config.manifest, }); @@ -236,11 +244,15 @@ impl RegionImpl { version_control, }); - let writer = Arc::new(RegionWriter::new(store_config.memtable_builder)); + let writer = Arc::new(RegionWriter::new( + store_config.memtable_builder, + store_config.engine_config.clone(), + )); let writer_ctx = WriterContext { shared: &shared, flush_strategy: &store_config.flush_strategy, flush_scheduler: &store_config.flush_scheduler, + compaction_scheduler: &store_config.compaction_scheduler, sst_layer: &store_config.sst_layer, wal: &wal, writer: &writer, @@ -257,6 +269,7 @@ impl RegionImpl { wal, flush_strategy: store_config.flush_strategy, flush_scheduler: store_config.flush_scheduler, + compaction_scheduler: store_config.compaction_scheduler, sst_layer: store_config.sst_layer, manifest: store_config.manifest, }); @@ -387,6 +400,7 @@ impl RegionImpl { shared: &inner.shared, flush_strategy: &inner.flush_strategy, flush_scheduler: &inner.flush_scheduler, + compaction_scheduler: &inner.compaction_scheduler, sst_layer: &inner.sst_layer, wal: &inner.wal, writer: &inner.writer, @@ -429,6 +443,7 @@ struct RegionInner { wal: Wal, flush_strategy: FlushStrategyRef, flush_scheduler: FlushSchedulerRef, + compaction_scheduler: CompactionSchedulerRef, sst_layer: AccessLayerRef, manifest: RegionManifest, } @@ -467,6 +482,7 @@ impl RegionInner { shared: &self.shared, flush_strategy: &self.flush_strategy, flush_scheduler: &self.flush_scheduler, + compaction_scheduler: &self.compaction_scheduler, sst_layer: &self.sst_layer, wal: &self.wal, writer: &self.writer, diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 24607aee83e0..678ec20b5f16 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -14,7 +14,8 @@ use std::sync::Arc; -use common_telemetry::logging; +use common_telemetry::tracing::log::info; +use common_telemetry::{error, logging}; use futures::TryStreamExt; use snafu::ResultExt; use store_api::logstore::LogStore; @@ -23,8 +24,10 @@ use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteRespon use tokio::sync::Mutex; use crate::background::JobHandle; +use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef}; +use crate::config::EngineConfig; use crate::error::{self, Result}; -use crate::flush::{FlushJob, FlushSchedulerRef, FlushStrategyRef}; +use crate::flush::{FlushCallback, FlushJob, FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::action::{ RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, }; @@ -34,7 +37,7 @@ use crate::proto::wal::WalHeader; use crate::region::{RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef}; use crate::schema::compat::CompatWrite; use crate::sst::AccessLayerRef; -use crate::version::{VersionControl, VersionControlRef, VersionEdit}; +use crate::version::{VersionControl, VersionControlRef, VersionEdit, VersionRef}; use crate::wal::Wal; use crate::write_batch::WriteBatch; @@ -56,9 +59,9 @@ pub struct RegionWriter { } impl RegionWriter { - pub fn new(memtable_builder: MemtableBuilderRef) -> RegionWriter { + pub fn new(memtable_builder: MemtableBuilderRef, config: Arc) -> RegionWriter { RegionWriter { - inner: Mutex::new(WriterInner::new(memtable_builder)), + inner: Mutex::new(WriterInner::new(memtable_builder, config)), version_mutex: Mutex::new(()), } } @@ -241,6 +244,7 @@ pub struct WriterContext<'a, S: LogStore> { pub shared: &'a SharedDataRef, pub flush_strategy: &'a FlushStrategyRef, pub flush_scheduler: &'a FlushSchedulerRef, + pub compaction_scheduler: &'a CompactionSchedulerRef, pub sst_layer: &'a AccessLayerRef, pub wal: &'a Wal, pub writer: &'a RegionWriterRef, @@ -271,13 +275,15 @@ impl<'a, S: LogStore> AlterContext<'a, S> { struct WriterInner { memtable_builder: MemtableBuilderRef, flush_handle: Option, + engine_config: Arc, } impl WriterInner { - fn new(memtable_builder: MemtableBuilderRef) -> WriterInner { + fn new(memtable_builder: MemtableBuilderRef, engine_config: Arc) -> WriterInner { WriterInner { memtable_builder, flush_handle: None, + engine_config, } } @@ -541,6 +547,8 @@ impl WriterInner { return Ok(()); } + let cb = Self::build_flush_callback(¤t_version, ctx, &self.engine_config); + let flush_req = FlushJob { max_memtable_id: max_memtable_id.unwrap(), memtables: mem_to_flush, @@ -551,6 +559,7 @@ impl WriterInner { writer: ctx.writer.clone(), wal: ctx.wal.clone(), manifest: ctx.manifest.clone(), + on_success: cb, }; let flush_handle = ctx @@ -561,4 +570,51 @@ impl WriterInner { Ok(()) } + + fn build_flush_callback( + version: &VersionRef, + ctx: &WriterContext, + config: &Arc, + ) -> Option { + let region_id = version.metadata().id(); + let compaction_request = CompactionRequestImpl { + region_id, + sst_layer: ctx.sst_layer.clone(), + writer: ctx.writer.clone(), + shared: ctx.shared.clone(), + manifest: ctx.manifest.clone(), + wal: ctx.wal.clone(), + }; + let compaction_scheduler = ctx.compaction_scheduler.clone(); + let shared_data = ctx.shared.clone(); + let max_files_in_l0 = config.max_files_in_l0; + let schedule_compaction_cb = Box::pin(async move { + let level0_file_num = shared_data + .version_control + .current() + .ssts() + .level(0) + .file_num(); + + if level0_file_num <= max_files_in_l0 { + info!( + "No enough SST files in level 0 (threshold: {}), skip compaction", + max_files_in_l0 + ); + return; + } + match compaction_scheduler.schedule(compaction_request).await { + Ok(scheduled) => { + info!( + "Schedule region {} compaction request result: {}", + region_id, scheduled + ) + } + Err(e) => { + error!(e;"Failed to schedule region compaction request {}", region_id); + } + } + }); + Some(schedule_compaction_cb) + } } diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 1caa0a4de48b..bd667ebb683e 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -23,13 +23,15 @@ use common_time::range::TimestampRange; use common_time::Timestamp; use object_store::{util, ObjectStore}; use serde::{Deserialize, Serialize}; +use store_api::storage::ChunkReader; use table::predicate::Predicate; +use crate::chunk::ChunkReaderImpl; use crate::error::Result; use crate::memtable::BoxedBatchIterator; -use crate::read::BoxedBatchReader; +use crate::read::{Batch, BoxedBatchReader}; use crate::schema::ProjectedSchemaRef; -use crate::sst::parquet::{ParquetReader, ParquetWriter, Source}; +use crate::sst::parquet::{ParquetReader, ParquetWriter}; /// Maximum level of SSTs. pub const MAX_LEVEL: u8 = 2; @@ -111,7 +113,7 @@ pub struct LevelMeta { } impl LevelMeta { - pub fn new_empty(level: Level) -> Self { + pub fn new(level: Level) -> Self { Self { level, files: HashMap::new(), @@ -132,6 +134,12 @@ impl LevelMeta { self.level } + /// Returns number of SST files in level. + #[inline] + pub fn file_num(&self) -> usize { + self.files.len() + } + pub fn files(&self) -> impl Iterator { self.files.values() } @@ -140,7 +148,7 @@ impl LevelMeta { fn new_level_meta_vec() -> LevelMetaVec { (0u8..MAX_LEVEL) .into_iter() - .map(LevelMeta::new_empty) + .map(LevelMeta::new) .collect::>() .try_into() .unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL @@ -243,7 +251,7 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug { async fn write_sst( &self, file_name: &str, - iter: BoxedBatchIterator, + source: Source, opts: &WriteOptions, ) -> Result; @@ -256,6 +264,33 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug { pub type AccessLayerRef = Arc; +/// Parquet writer data source. +pub enum Source { + /// Writes rows from memtable to parquet + Iter(BoxedBatchIterator), + /// Writes row from ChunkReaderImpl (maybe a set of SSTs) to parquet. + Reader(ChunkReaderImpl), +} + +impl Source { + async fn next_batch(&mut self) -> Result> { + match self { + Source::Iter(iter) => iter.next().transpose(), + Source::Reader(reader) => reader + .next_chunk() + .await + .map(|p| p.map(|chunk| Batch::new(chunk.columns))), + } + } + + fn projected_schema(&self) -> ProjectedSchemaRef { + match self { + Source::Iter(iter) => iter.schema(), + Source::Reader(reader) => reader.projected_schema().clone(), + } + } +} + /// Sst access layer based on local file system. #[derive(Debug)] pub struct FsAccessLayer { @@ -282,13 +317,13 @@ impl AccessLayer for FsAccessLayer { async fn write_sst( &self, file_name: &str, - iter: BoxedBatchIterator, + source: Source, opts: &WriteOptions, ) -> Result { // Now we only supports parquet format. We may allow caller to specific SST format in // WriteOptions in the future. let file_path = self.sst_file_path(file_name); - let writer = ParquetWriter::new(&file_path, Source::Iter(iter), self.object_store.clone()); + let writer = ParquetWriter::new(&file_path, source, self.object_store.clone()); writer.write_sst(opts).await } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 972894e40469..e106145c82ea 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -45,21 +45,18 @@ use parquet::file::properties::WriterProperties; use parquet::format::FileMetaData; use parquet::schema::types::SchemaDescriptor; use snafu::{OptionExt, ResultExt}; -use store_api::storage::ChunkReader; use table::predicate::Predicate; use tokio::io::BufReader; -use crate::chunk::ChunkReaderImpl; use crate::error::{ self, DecodeParquetTimeRangeSnafu, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu, Result, WriteObjectSnafu, WriteParquetSnafu, }; -use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BatchReader}; use crate::schema::compat::ReadAdapter; use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef}; use crate::sst; -use crate::sst::SstInfo; +use crate::sst::{Source, SstInfo}; /// Parquet sst writer. pub struct ParquetWriter<'a> { file_path: &'a str, @@ -321,7 +318,6 @@ impl<'a> ParquetReader<'a> { // checks if converting time range unit into ts col unit will result into rounding error. if time_unit_lossy(&self.time_range, ts_col_unit) { let filter = RowFilter::new(vec![Box::new(PlainTimestampRowFilter::new( - ts_col_idx, self.time_range, projection, ))]); @@ -343,15 +339,9 @@ impl<'a> ParquetReader<'a> { .and_then(|s| s.convert_to(ts_col_unit)) .map(|t| t.value()), ) { - Box::new(FastTimestampRowFilter::new( - ts_col_idx, projection, lower, upper, - )) as _ + Box::new(FastTimestampRowFilter::new(projection, lower, upper)) as _ } else { - Box::new(PlainTimestampRowFilter::new( - ts_col_idx, - self.time_range, - projection, - )) as _ + Box::new(PlainTimestampRowFilter::new(self.time_range, projection)) as _ }; let filter = RowFilter::new(vec![row_filter]); Some(filter) @@ -372,21 +362,14 @@ fn time_unit_lossy(range: &TimestampRange, ts_col_unit: TimeUnit) -> bool { /// `FastTimestampRowFilter` is used to filter rows within given timestamp range when reading /// row groups from parquet files, while avoids fetching all columns from SSTs file. struct FastTimestampRowFilter { - timestamp_index: usize, lower_bound: i64, upper_bound: i64, projection: ProjectionMask, } impl FastTimestampRowFilter { - fn new( - ts_col_idx: usize, - projection: ProjectionMask, - lower_bound: i64, - upper_bound: i64, - ) -> Self { + fn new(projection: ProjectionMask, lower_bound: i64, upper_bound: i64) -> Self { Self { - timestamp_index: ts_col_idx, lower_bound, upper_bound, projection, @@ -401,7 +384,8 @@ impl ArrowPredicate for FastTimestampRowFilter { /// Selects the rows matching given time range. fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result { - let ts_col = batch.column(self.timestamp_index); + // the projection has only timestamp column, so we can safely take the first column in batch. + let ts_col = batch.column(0); macro_rules! downcast_and_compute { ($typ: ty) => { @@ -443,15 +427,13 @@ impl ArrowPredicate for FastTimestampRowFilter { /// [PlainTimestampRowFilter] iterates each element in timestamp column, build a [Timestamp] struct /// and checks if given time range contains the timestamp. struct PlainTimestampRowFilter { - timestamp_index: usize, time_range: TimestampRange, projection: ProjectionMask, } impl PlainTimestampRowFilter { - fn new(timestamp_index: usize, time_range: TimestampRange, projection: ProjectionMask) -> Self { + fn new(time_range: TimestampRange, projection: ProjectionMask) -> Self { Self { - timestamp_index, time_range, projection, } @@ -464,7 +446,8 @@ impl ArrowPredicate for PlainTimestampRowFilter { } fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result { - let ts_col = batch.column(self.timestamp_index); + // the projection has only timestamp column, so we can safely take the first column in batch. + let ts_col = batch.column(0); macro_rules! downcast_and_compute { ($array_ty: ty, $unit: ident) => {{ @@ -532,33 +515,6 @@ impl BatchReader for ChunkStream { } } -/// Parquet writer data source. -pub enum Source { - /// Writes rows from memtable to parquet - Iter(BoxedBatchIterator), - /// Writes row from ChunkReaderImpl (maybe a set of SSTs) to parquet. - Reader(ChunkReaderImpl), -} - -impl Source { - async fn next_batch(&mut self) -> Result> { - match self { - Source::Iter(iter) => iter.next().transpose(), - Source::Reader(reader) => reader - .next_chunk() - .await - .map(|p| p.map(|chunk| Batch::new(chunk.columns))), - } - } - - fn projected_schema(&self) -> ProjectedSchemaRef { - match self { - Source::Iter(iter) => iter.schema(), - Source::Reader(reader) => reader.projected_schema().clone(), - } - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 70ebdcf897bd..ca3d3fb57cfb 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -20,6 +20,7 @@ use object_store::backend::fs::Builder; use object_store::ObjectStore; use crate::background::JobPoolImpl; +use crate::compaction::noop::NoopCompactionScheduler; use crate::engine; use crate::flush::{FlushSchedulerImpl, SizeBasedStrategy}; use crate::manifest::region::RegionManifest; @@ -51,7 +52,7 @@ pub async fn new_store_config( ..Default::default() }; let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); - + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); StoreConfig { log_store, sst_layer, @@ -59,5 +60,7 @@ pub async fn new_store_config( memtable_builder: Arc::new(DefaultMemtableBuilder::default()), flush_scheduler, flush_strategy: Arc::new(SizeBasedStrategy::default()), + compaction_scheduler, + engine_config: Default::default(), } } diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 4f333dc7c059..6f84b0c5f58b 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -243,7 +243,7 @@ impl Version { ); info!( - "After region compaction, region: {}, SST files: {:?}", + "After apply edit, region: {}, SST files: {:?}", self.metadata.id(), merged_ssts );