Skip to content

Commit

Permalink
feat: compaction integration (#997)
Browse files Browse the repository at this point in the history
* feat: trigger compaction on flush

* chore: rebase develop

* feat: add config item max_file_in_level0 and remove compaction_after_flush

* fix: cr comments

* chore: add unit test to cover Timestamp::new_inclusive

* fix: workaround to fix future is not Sync

* fix: future is not sync

* fix: some cr comments
  • Loading branch information
v0y4g3r authored Feb 15, 2023
1 parent e2904b9 commit 75b8afe
Show file tree
Hide file tree
Showing 30 changed files with 515 additions and 196 deletions.
4 changes: 4 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ metasrv_addrs = ['127.0.0.1:3002']
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false

[compaction]
max_inflight_task = 4
max_file_in_level0 = 16
3 changes: 3 additions & 0 deletions src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ mod tests {
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableType;
Expand Down Expand Up @@ -485,12 +486,14 @@ mod tests {
.build()
.unwrap();
let object_store = ObjectStore::new(accessor);
let noop_compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let table_engine = Arc::new(MitoEngine::new(
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
noop_compaction_scheduler,
),
object_store,
));
Expand Down
10 changes: 9 additions & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;

use datanode::datanode::ObjectStoreConfig;
use datanode::datanode::{CompactionConfig, ObjectStoreConfig};
use servers::Mode;

use super::*;
Expand Down Expand Up @@ -181,6 +181,14 @@ mod tests {
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
};

assert_eq!(
CompactionConfig {
max_inflight_task: 4,
max_file_in_level0: 16,
},
options.compaction
);
}

#[test]
Expand Down
27 changes: 26 additions & 1 deletion src/common/time/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl TimestampRange {
pub fn new_inclusive(start: Option<Timestamp>, end: Option<Timestamp>) -> Self {
// check for emptiness
if let (Some(start_ts), Some(end_ts)) = (start, end) {
if start_ts >= end_ts {
if start_ts > end_ts {
return Self::empty();
}
}
Expand Down Expand Up @@ -462,4 +462,29 @@ mod tests {

assert!(!full.intersects(&empty));
}

#[test]
fn test_new_inclusive() {
let range = TimestampRange::new_inclusive(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(3)),
);
assert!(!range.is_empty());
assert!(range.contains(&Timestamp::new_millisecond(1)));
assert!(range.contains(&Timestamp::new_millisecond(3)));

let range = TimestampRange::new_inclusive(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(1)),
);
assert!(!range.is_empty());
assert_eq!(1, range.start.unwrap().value());
assert!(range.contains(&Timestamp::new_millisecond(1)));

let range = TimestampRange::new_inclusive(
Some(Timestamp::new_millisecond(2)),
Some(Timestamp::new_millisecond(1)),
);
assert!(range.is_empty());
}
}
38 changes: 38 additions & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use common_telemetry::info;
use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use servers::Mode;
use storage::compaction::CompactionSchedulerConfig;
use storage::config::EngineConfig as StorageEngineConfig;

use crate::error::Result;
use crate::instance::{Instance, InstanceRef};
Expand Down Expand Up @@ -104,6 +106,40 @@ impl Default for WalConfig {
}
}

/// Options for table compaction
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct CompactionConfig {
/// Max task number that can concurrently run.
pub max_inflight_task: usize,
/// Max files in level 0 to trigger compaction.
pub max_file_in_level0: usize,
}

impl Default for CompactionConfig {
fn default() -> Self {
Self {
max_inflight_task: 4,
max_file_in_level0: 8,
}
}
}

impl From<&DatanodeOptions> for CompactionSchedulerConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_inflight_task: value.compaction.max_inflight_task,
}
}
}

impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_files_in_l0: value.compaction.max_file_in_level0,
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct DatanodeOptions {
Expand All @@ -117,6 +153,7 @@ pub struct DatanodeOptions {
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
pub compaction: CompactionConfig,
pub mode: Mode,
}

Expand All @@ -133,6 +170,7 @@ impl Default for DatanodeOptions {
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
compaction: CompactionConfig::default(),
mode: Mode::Standalone,
}
}
Expand Down
16 changes: 15 additions & 1 deletion src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use snafu::prelude::*;
use storage::compaction::{
CompactionSchedulerConfig, CompactionSchedulerRef, LocalCompactionScheduler, SimplePicker,
};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
use table::Table;
Expand Down Expand Up @@ -92,12 +96,15 @@ impl Instance {
}
};

let compaction_scheduler = create_compaction_scheduler(opts);

let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
StorageEngineConfig::from(opts),
logstore.clone(),
object_store.clone(),
compaction_scheduler,
),
object_store,
));
Expand Down Expand Up @@ -204,6 +211,13 @@ impl Instance {
}
}

fn create_compaction_scheduler<S: LogStore>(opts: &DatanodeOptions) -> CompactionSchedulerRef<S> {
let picker = SimplePicker::default();
let config = CompactionSchedulerConfig::from(opts);
let scheduler = LocalCompactionScheduler::new(config, picker);
Arc::new(scheduler)
}

pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let object_store = match store_config {
ObjectStoreConfig::File { .. } => new_fs_object_store(store_config).await,
Expand Down
3 changes: 3 additions & 0 deletions src/datanode/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use mito::config::EngineConfig as TableEngineConfig;
use query::QueryEngineFactory;
use servers::Mode;
use snafu::ResultExt;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableId;
Expand All @@ -46,12 +47,14 @@ impl Instance {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_log_store(&opts.wal).await?);
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
logstore.clone(),
object_store.clone(),
compaction_scheduler,
),
object_store,
));
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ mod tests {
use query::parser::{QueryLanguageParser, QueryStatement};
use query::QueryEngineFactory;
use sql::statements::statement::Statement;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::TableReference;
Expand Down Expand Up @@ -209,7 +210,7 @@ mod tests {
let store_dir = dir.path().to_string_lossy();
let accessor = Builder::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);

let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let sql = r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
Expand All @@ -221,6 +222,7 @@ mod tests {
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
),
object_store,
));
Expand Down
4 changes: 3 additions & 1 deletion src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ mod tests {
Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef,
};
use log_store::NoopLogStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::region::RegionImpl;
use storage::EngineImpl;
Expand Down Expand Up @@ -643,13 +644,14 @@ mod tests {

let (dir, object_store) =
test_util::new_test_object_store("test_insert_with_column_default_constraint").await;

let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let table_engine = MitoEngine::new(
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
),
object_store,
);
Expand Down
4 changes: 3 additions & 1 deletion src/mito/src/table/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datatypes::vectors::VectorRef;
use log_store::NoopLogStore;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::{EngineContext, TableEngine};
Expand Down Expand Up @@ -127,11 +128,12 @@ pub struct TestEngineComponents {

pub async fn setup_test_engine_and_table() -> TestEngineComponents {
let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await;

let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let storage_engine = EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
);
let table_engine = MitoEngine::new(
EngineConfig::default(),
Expand Down
3 changes: 3 additions & 0 deletions src/script/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ mod tests {
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use mito::engine::MitoEngine;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use tempdir::TempDir;
Expand All @@ -135,12 +136,14 @@ mod tests {
};

let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap();
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let mock_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(log_store),
object_store.clone(),
compaction_scheduler,
),
object_store,
));
Expand Down
2 changes: 1 addition & 1 deletion src/script/src/python/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ pub(crate) mod greptime_builtin {
Ok(obj) => match py_vec_obj_to_array(&obj, vm, 1){
Ok(v) => if v.len()==1{
Ok(v)
}else{
} else {
Err(vm.new_runtime_error(format!("Expect return's length to be at most one, found to be length of {}.", v.len())))
},
Err(err) => Err(vm
Expand Down
13 changes: 13 additions & 0 deletions src/storage/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,22 @@
// limitations under the License.

mod dedup_deque;
pub mod noop;
mod picker;
mod rate_limit;
mod scheduler;
mod strategy;
mod task;
mod writer;

use std::sync::Arc;

pub use picker::{Picker, PickerContext, SimplePicker};
pub use scheduler::{
CompactionRequest, CompactionRequestImpl, CompactionScheduler, CompactionSchedulerConfig,
LocalCompactionScheduler,
};
pub use task::{CompactionTask, CompactionTaskImpl};

pub type CompactionSchedulerRef<S> =
Arc<dyn CompactionScheduler<CompactionRequestImpl<S>> + Send + Sync>;
Loading

0 comments on commit 75b8afe

Please sign in to comment.