From e7146c4baf309d175c3d3dbea8037b03425e0264 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 19 Nov 2024 16:16:45 -0500 Subject: [PATCH 01/12] feat(pageserver): support schedule gc-compaction Signed-off-by: Alex Chi Z --- pageserver/src/http/routes.rs | 44 ++++-- pageserver/src/tenant.rs | 153 ++++++++++++++++--- pageserver/src/tenant/timeline.rs | 32 +++- pageserver/src/tenant/timeline/compaction.rs | 51 ++++--- test_runner/regress/test_compaction.py | 22 ++- 5 files changed, 239 insertions(+), 63 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ceb1c3b012f5..1ef43d37b0bd 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -87,7 +87,7 @@ use crate::tenant::timeline::offload::offload_timeline; use crate::tenant::timeline::offload::OffloadError; use crate::tenant::timeline::CompactFlags; use crate::tenant::timeline::CompactOptions; -use crate::tenant::timeline::CompactRange; +use crate::tenant::timeline::CompactRequest; use crate::tenant::timeline::CompactionError; use crate::tenant::timeline::Timeline; use crate::tenant::GetTimelineError; @@ -1971,7 +1971,7 @@ async fn timeline_compact_handler( let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; - let compact_range = json_request_maybe::>(&mut request).await?; + let compact_request = json_request_maybe::>(&mut request).await?; let state = get_state(&request); @@ -1996,22 +1996,44 @@ async fn timeline_compact_handler( let wait_until_uploaded = parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); + let wait_until_scheduled_compaction_done = + parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")? + .unwrap_or(false); + let options = CompactOptions { - compact_range, + compact_range: compact_request + .as_ref() + .and_then(|r| r.compact_range.clone()), + compact_below_lsn: compact_request + .as_ref() + .and_then(|r| r.compact_below_lsn.as_ref().map(|x| x.0)), flags, }; + let scheduled = compact_request.map(|r| r.scheduled).unwrap_or(false); + async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; - timeline - .compact_with_options(&cancel, options, &ctx) - .await - .map_err(|e| ApiError::InternalServerError(e.into()))?; - if wait_until_uploaded { - timeline.remote_client.wait_completion().await - // XXX map to correct ApiError for the cases where it's due to shutdown - .context("wait completion").map_err(ApiError::InternalServerError)?; + if scheduled { + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + let rx = tenant.schedule_compaction(timeline_id, options).await; + if wait_until_scheduled_compaction_done { + // It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction. + rx.await.ok(); + } + } else { + timeline + .compact_with_options(&cancel, options, &ctx) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + if wait_until_uploaded { + timeline.remote_client.wait_completion().await + // XXX map to correct ApiError for the cases where it's due to shutdown + .context("wait completion").map_err(ApiError::InternalServerError)?; + } } json_response(StatusCode::OK, ()) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0214ee68fa08..900f2436cbee 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -37,15 +37,19 @@ use remote_timeline_client::manifest::{ }; use remote_timeline_client::UploadQueueNotReadyError; use std::collections::BTreeMap; +use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; +use timeline::compaction::ScheduledCompactionTask; use timeline::import_pgdata; use timeline::offload::offload_timeline; use timeline::ShutdownMode; +use timeline::CompactFlags; +use timeline::CompactOptions; use tokio::io::BufReader; use tokio::sync::watch; use tokio::task::JoinSet; @@ -339,6 +343,11 @@ pub struct Tenant { /// Overhead of mutex is acceptable because compaction is done with a multi-second period. compaction_circuit_breaker: std::sync::Mutex, + /// Scheduled compaction tasks. Currently, this can only be populated by triggering + /// a manual gc-compaction from the manual compaction API. + scheduled_compaction_tasks: + std::sync::Mutex>>, + /// If the tenant is in Activating state, notify this to encourage it /// to proceed to Active as soon as possible, rather than waiting for lazy /// background warmup. @@ -2954,26 +2963,63 @@ impl Tenant { for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload { let pending_task_left = if *can_compact { - Some( - timeline - .compact(cancel, EnumSet::empty(), ctx) - .instrument(info_span!("compact_timeline", %timeline_id)) - .await - .inspect_err(|e| match e { - timeline::CompactionError::ShuttingDown => (), - timeline::CompactionError::Offload(_) => { - // Failures to offload timelines do not trip the circuit breaker, because - // they do not do lots of writes the way compaction itself does: it is cheap - // to retry, and it would be bad to stop all compaction because of an issue with offloading. - } - timeline::CompactionError::Other(e) => { - self.compaction_circuit_breaker - .lock() - .unwrap() - .fail(&CIRCUIT_BREAKERS_BROKEN, e); + let has_pending_l0_compaction_task = timeline + .compact(cancel, EnumSet::empty(), ctx) + .instrument(info_span!("compact_timeline", %timeline_id)) + .await + .inspect_err(|e| match e { + timeline::CompactionError::ShuttingDown => (), + timeline::CompactionError::Offload(_) => { + // Failures to offload timelines do not trip the circuit breaker, because + // they do not do lots of writes the way compaction itself does: it is cheap + // to retry, and it would be bad to stop all compaction because of an issue with offloading. + } + timeline::CompactionError::Other(e) => { + self.compaction_circuit_breaker + .lock() + .unwrap() + .fail(&CIRCUIT_BREAKERS_BROKEN, e); + } + })?; + if !has_pending_l0_compaction_task { + let next_scheduled_compaction_task = { + let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); + if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) { + tline_pending_tasks.pop_front() + } else { + None + } + }; + if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task + { + if !next_scheduled_compaction_task + .options + .flags + .contains(CompactFlags::EnhancedGcBottomMostCompaction) + { + warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options); + Some(true) + } else { + let _ = timeline + .compact_with_options( + cancel, + next_scheduled_compaction_task.options, + ctx, + ) + .instrument(info_span!("scheduled_compact_timeline", %timeline_id)) + .await?; + if let Some(rx) = next_scheduled_compaction_task.result_rx.take() { + // TODO: we can send compaction statistics in the future + rx.send(()).ok(); } - })?, - ) + Some(true) + } + } else { + None + } + } else { + Some(true) + } } else { None }; @@ -2993,6 +3039,22 @@ impl Tenant { Ok(has_pending_task) } + /// Schedule a compaction task for a timeline. + pub(crate) async fn schedule_compaction( + &self, + timeline_id: TimelineId, + options: CompactOptions, + ) -> tokio::sync::oneshot::Receiver<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); + let tline_pending_tasks = guard.entry(timeline_id).or_insert_with(VecDeque::new); + tline_pending_tasks.push_back(ScheduledCompactionTask { + options, + result_rx: Some(tx), + }); + rx + } + // Call through to all timelines to freeze ephemeral layers if needed. Usually // this happens during ingest: this background housekeeping is for freezing layers // that are open but haven't been written to for some time. @@ -3993,6 +4055,7 @@ impl Tenant { // use an extremely long backoff. Some(Duration::from_secs(3600 * 24)), )), + scheduled_compaction_tasks: Mutex::new(Default::default()), activate_now_sem: tokio::sync::Semaphore::new(0), attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()), cancel: CancellationToken::default(), @@ -9149,6 +9212,7 @@ mod tests { CompactOptions { flags: dryrun_flags, compact_range: None, + compact_below_lsn: None, }, &ctx, ) @@ -9385,6 +9449,7 @@ mod tests { CompactOptions { flags: dryrun_flags, compact_range: None, + compact_below_lsn: None, }, &ctx, ) @@ -9871,7 +9936,15 @@ mod tests { // Do a partial compaction on key range 0..2 tline - .partial_compact_with_gc(get_key(0)..get_key(2), &cancel, EnumSet::new(), &ctx) + .compact_with_gc( + &cancel, + CompactOptions { + flags: EnumSet::new(), + compact_range: Some((get_key(0)..get_key(2)).into()), + compact_below_lsn: None, + }, + &ctx, + ) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; @@ -9910,7 +9983,15 @@ mod tests { // Do a partial compaction on key range 2..4 tline - .partial_compact_with_gc(get_key(2)..get_key(4), &cancel, EnumSet::new(), &ctx) + .compact_with_gc( + &cancel, + CompactOptions { + flags: EnumSet::new(), + compact_range: Some((get_key(2)..get_key(4)).into()), + compact_below_lsn: None, + }, + &ctx, + ) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; @@ -9954,7 +10035,15 @@ mod tests { // Do a partial compaction on key range 4..9 tline - .partial_compact_with_gc(get_key(4)..get_key(9), &cancel, EnumSet::new(), &ctx) + .compact_with_gc( + &cancel, + CompactOptions { + flags: EnumSet::new(), + compact_range: Some((get_key(4)..get_key(9)).into()), + compact_below_lsn: None, + }, + &ctx, + ) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; @@ -9997,7 +10086,15 @@ mod tests { // Do a partial compaction on key range 9..10 tline - .partial_compact_with_gc(get_key(9)..get_key(10), &cancel, EnumSet::new(), &ctx) + .compact_with_gc( + &cancel, + CompactOptions { + flags: EnumSet::new(), + compact_range: Some((get_key(9)..get_key(10)).into()), + compact_below_lsn: None, + }, + &ctx, + ) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; @@ -10045,7 +10142,15 @@ mod tests { // Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones. tline - .partial_compact_with_gc(get_key(0)..get_key(10), &cancel, EnumSet::new(), &ctx) + .compact_with_gc( + &cancel, + CompactOptions { + flags: EnumSet::new(), + compact_range: Some((get_key(0)..get_key(10)).into()), + compact_below_lsn: None, + }, + &ctx, + ) .await .unwrap(); let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c1ff0f426d5c..17710ee84a68 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -767,7 +767,7 @@ pub enum GetLogicalSizePriority { Background, } -#[derive(enumset::EnumSetType)] +#[derive(Debug, enumset::EnumSetType)] pub(crate) enum CompactFlags { ForceRepartition, ForceImageLayerCreation, @@ -776,6 +776,19 @@ pub(crate) enum CompactFlags { DryRun, } +#[serde_with::serde_as] +#[derive(Debug, Clone, serde::Deserialize)] +pub(crate) struct CompactRequest { + pub compact_range: Option, + pub compact_below_lsn: Option, + /// Whether the compaction job should be scheduled. + pub scheduled: bool, +} + +#[serde_with::serde_as] +#[derive(Debug, Clone, serde::Deserialize)] +pub(crate) struct LsnDisplay(#[serde_as(as = "serde_with::DisplayFromStr")] pub Lsn); + #[serde_with::serde_as] #[derive(Debug, Clone, serde::Deserialize)] pub(crate) struct CompactRange { @@ -785,10 +798,24 @@ pub(crate) struct CompactRange { pub end: Key, } -#[derive(Clone, Default)] +impl From> for CompactRange { + fn from(range: Range) -> Self { + CompactRange { + start: range.start, + end: range.end, + } + } +} + +#[derive(Debug, Clone, Default)] pub(crate) struct CompactOptions { pub flags: EnumSet, + /// If set, the compaction will only compact the key range specified by this option. + /// This option is only used by GC compaction. pub compact_range: Option, + /// If set, the compaction will only compact the LSN below this value. + /// This option is only used by GC compaction. + pub compact_below_lsn: Option, } impl std::fmt::Debug for Timeline { @@ -1635,6 +1662,7 @@ impl Timeline { CompactOptions { flags, compact_range: None, + compact_below_lsn: None, }, ctx, ) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index ecd68ba55ec4..757ce5e3b485 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -16,7 +16,6 @@ use super::{ use anyhow::{anyhow, bail, Context}; use bytes::Bytes; -use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; use pageserver_api::key::KEY_SIZE; @@ -64,6 +63,12 @@ use super::CompactionError; /// Maximum number of deltas before generating an image layer in bottom-most compaction. const COMPACTION_DELTA_THRESHOLD: usize = 5; +/// A scheduled compaction task. +pub struct ScheduledCompactionTask { + pub options: CompactOptions, + pub result_rx: Option>, +} + pub struct GcCompactionJobDescription { /// All layers to read in the compaction job selected_layers: Vec, @@ -1746,24 +1751,6 @@ impl Timeline { Ok(()) } - pub(crate) async fn compact_with_gc( - self: &Arc, - cancel: &CancellationToken, - options: CompactOptions, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - self.partial_compact_with_gc( - options - .compact_range - .map(|range| range.start..range.end) - .unwrap_or_else(|| Key::MIN..Key::MAX), - cancel, - options.flags, - ctx, - ) - .await - } - /// An experimental compaction building block that combines compaction with garbage collection. /// /// The current implementation picks all delta + image layers that are below or intersecting with @@ -1771,17 +1758,19 @@ impl Timeline { /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon, /// and create delta layers with all deltas >= gc horizon. /// - /// If `key_range` is provided, it will only compact the keys within the range, aka partial compaction. + /// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction. /// Partial compaction will read and process all layers overlapping with the key range, even if it might /// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained /// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not /// part of the range. - pub(crate) async fn partial_compact_with_gc( + /// + /// If `options.compact_below_lsn` is provided, the compaction will only compact layers below or intersect with + /// the LSN. Otherwise, it will use the gc cutoff by default. + pub(crate) async fn compact_with_gc( self: &Arc, - compaction_key_range: Range, cancel: &CancellationToken, - flags: EnumSet, + options: CompactOptions, ctx: &RequestContext, ) -> anyhow::Result<()> { // Block other compaction/GC tasks from running for now. GC-compaction could run along @@ -1803,6 +1792,12 @@ impl Timeline { ) .await?; + let flags = options.flags; + let compaction_key_range = options + .compact_range + .map(|range| range.start..range.end) + .unwrap_or_else(|| Key::MIN..Key::MAX); + let dry_run = flags.contains(CompactFlags::DryRun); if compaction_key_range == (Key::MIN..Key::MAX) { @@ -1826,7 +1821,15 @@ impl Timeline { let layers = guard.layer_map()?; let gc_info = self.gc_info.read().unwrap(); let mut retain_lsns_below_horizon = Vec::new(); - let gc_cutoff = gc_info.cutoffs.select_min(); + let real_gc_cutoff = gc_info.cutoffs.select_min(); + // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for + // each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use + // the real cutoff. + let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff); + if gc_cutoff > real_gc_cutoff { + warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff); + gc_cutoff = real_gc_cutoff; + } for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns { if lsn < &gc_cutoff { retain_lsns_below_horizon.push(*lsn); diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 79fd25630452..d4935d56ee30 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -129,6 +129,20 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): log.info("Writing initial data ...") workload.write_rows(row_count, env.pageserver.id) + # schedule a gc-compaction in advance, it will be triggered along with L0 compaction + ps_http.timeline_compact( + tenant_id, + timeline_id, + enhanced_gc_bottom_most_compaction=True, + body={ + "scheduled": True, + "compact_range": { + "start": "000000000000000000000000000000000000", + "end": "030000000000000000000000000000000000", + }, + }, + ) + for i in range(1, churn_rounds + 1): if i % 10 == 0: log.info(f"Running churn round {i}/{churn_rounds} ...") @@ -142,11 +156,15 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): timeline_id, enhanced_gc_bottom_most_compaction=True, body={ - "start": "000000000000000000000000000000000000", - "end": "030000000000000000000000000000000000", + "compact_range": { + "start": "000000000000000000000000000000000000", + "end": "030000000000000000000000000000000000", + } }, ) + env.pageserver.assert_log_contains("scheduled_compaction") + log.info("Validating at workload end ...") workload.validate(env.pageserver.id) From 8a8b05dbf120c7b050e9282516cc8bb7f0f2a827 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 19 Nov 2024 17:18:01 -0500 Subject: [PATCH 02/12] fix Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 8 ++++---- pageserver/src/tenant/timeline.rs | 1 + pageserver/src/tenant/timeline/compaction.rs | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 900f2436cbee..bfa3f922c082 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3008,9 +3008,9 @@ impl Tenant { ) .instrument(info_span!("scheduled_compact_timeline", %timeline_id)) .await?; - if let Some(rx) = next_scheduled_compaction_task.result_rx.take() { + if let Some(tx) = next_scheduled_compaction_task.result_tx.take() { // TODO: we can send compaction statistics in the future - rx.send(()).ok(); + tx.send(()).ok(); } Some(true) } @@ -3047,10 +3047,10 @@ impl Tenant { ) -> tokio::sync::oneshot::Receiver<()> { let (tx, rx) = tokio::sync::oneshot::channel(); let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - let tline_pending_tasks = guard.entry(timeline_id).or_insert_with(VecDeque::new); + let tline_pending_tasks = guard.entry(timeline_id).or_default(); tline_pending_tasks.push_back(ScheduledCompactionTask { options, - result_rx: Some(tx), + result_tx: Some(tx), }); rx } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 17710ee84a68..b6530ec3a9c8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -782,6 +782,7 @@ pub(crate) struct CompactRequest { pub compact_range: Option, pub compact_below_lsn: Option, /// Whether the compaction job should be scheduled. + #[serde(default)] pub scheduled: bool, } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 757ce5e3b485..a90819ff1a29 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -66,7 +66,7 @@ const COMPACTION_DELTA_THRESHOLD: usize = 5; /// A scheduled compaction task. pub struct ScheduledCompactionTask { pub options: CompactOptions, - pub result_rx: Option>, + pub result_tx: Option>, } pub struct GcCompactionJobDescription { From da19fd1a45773326ab2b2248b9aff167dc9ac83b Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 20 Nov 2024 16:06:52 -0500 Subject: [PATCH 03/12] fix tests Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 20 +++++++++------- test_runner/regress/test_compaction.py | 33 ++++++++++---------------- 2 files changed, 24 insertions(+), 29 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index bfa3f922c082..ba9d1f174952 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2962,6 +2962,9 @@ impl Tenant { for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload { + // pending_task_left == None: cannot compact, maybe still pending tasks + // pending_task_left == Some(true): compaction task left + // pending_task_left == Some(false): no compaction task left let pending_task_left = if *can_compact { let has_pending_l0_compaction_task = timeline .compact(cancel, EnumSet::empty(), ctx) @@ -2981,12 +2984,18 @@ impl Tenant { .fail(&CIRCUIT_BREAKERS_BROKEN, e); } })?; - if !has_pending_l0_compaction_task { + if has_pending_l0_compaction_task { + Some(true) + } else { + let has_pending_scheduled_compaction_task; let next_scheduled_compaction_task = { let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) { - tline_pending_tasks.pop_front() + let next_task = tline_pending_tasks.pop_front(); + has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty(); + next_task } else { + has_pending_scheduled_compaction_task = false; None } }; @@ -2998,7 +3007,6 @@ impl Tenant { .contains(CompactFlags::EnhancedGcBottomMostCompaction) { warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options); - Some(true) } else { let _ = timeline .compact_with_options( @@ -3012,13 +3020,9 @@ impl Tenant { // TODO: we can send compaction statistics in the future tx.send(()).ok(); } - Some(true) } - } else { - None } - } else { - Some(true) + Some(has_pending_scheduled_compaction_task) } } else { None diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index d4935d56ee30..56bec14c188a 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -113,13 +113,19 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_recei def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): - env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF) + SMOKE_CONF = { + # Run both gc and gc-compaction. + "gc_period": "5s", + "compaction_period": "5s", + } + + env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF) tenant_id = env.initial_tenant timeline_id = env.initial_timeline row_count = 1000 - churn_rounds = 10 + churn_rounds = 20 ps_http = env.pageserver.http_client() @@ -129,40 +135,25 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): log.info("Writing initial data ...") workload.write_rows(row_count, env.pageserver.id) - # schedule a gc-compaction in advance, it will be triggered along with L0 compaction - ps_http.timeline_compact( - tenant_id, - timeline_id, - enhanced_gc_bottom_most_compaction=True, - body={ - "scheduled": True, - "compact_range": { - "start": "000000000000000000000000000000000000", - "end": "030000000000000000000000000000000000", - }, - }, - ) - for i in range(1, churn_rounds + 1): if i % 10 == 0: log.info(f"Running churn round {i}/{churn_rounds} ...") - workload.churn_rows(row_count, env.pageserver.id) - # Force L0 compaction to ensure the number of layers is within bounds, so that gc-compaction can run. - ps_http.timeline_compact(tenant_id, timeline_id, force_l0_compaction=True) - assert ps_http.perf_info(tenant_id, timeline_id)[0]["num_of_l0"] <= 1 ps_http.timeline_compact( tenant_id, timeline_id, enhanced_gc_bottom_most_compaction=True, body={ + "scheduled": True, "compact_range": { "start": "000000000000000000000000000000000000", "end": "030000000000000000000000000000000000", - } + }, }, ) + workload.churn_rows(row_count, env.pageserver.id) + env.pageserver.assert_log_contains("scheduled_compaction") log.info("Validating at workload end ...") From 8ae68eace9a0c6cd8e3b253b5e3e59cb900b9317 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 20 Nov 2024 16:30:13 -0500 Subject: [PATCH 04/12] fix Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 4 ++-- test_runner/regress/test_compaction.py | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index a90819ff1a29..af687667d1b9 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1849,7 +1849,7 @@ impl Timeline { .map(|desc| desc.get_lsn_range().end) .max() else { - info!("no layers to compact with gc"); + info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff); return Ok(()); }; // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key @@ -1872,7 +1872,7 @@ impl Timeline { } } if selected_layers.is_empty() { - info!("no layers to compact with gc"); + info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compaction_key_range.start, compaction_key_range.end); return Ok(()); } retain_lsns_below_horizon.sort(); diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 56bec14c188a..a43fe9a50421 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -14,7 +14,8 @@ from fixtures.utils import skip_in_debug_build, wait_until from fixtures.workload import Workload -AGGRESIVE_COMPACTION_TENANT_CONF = { + +AGGRESSIVE_COMPACTION_TENANT_CONF = { # Disable gc and compaction. The test runs compaction manually. "gc_period": "0s", "compaction_period": "0s", @@ -23,6 +24,7 @@ # Compact small layers "compaction_target_size": 1024**2, "image_creation_threshold": 2, + "lsn_lease_length": "0s", } @@ -43,7 +45,7 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_recei page_cache_size=10; wal_receiver_protocol='{wal_receiver_protocol}' """ - env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF) + env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF) tenant_id = env.initial_tenant timeline_id = env.initial_timeline @@ -117,6 +119,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): # Run both gc and gc-compaction. "gc_period": "5s", "compaction_period": "5s", + # No PiTR interval and small GC horizon + "pitr_interval": "0s", + "gc_horizon": f"{1024 ** 2}", + "lsn_lease_length": "0s", } env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF) @@ -154,7 +160,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): workload.churn_rows(row_count, env.pageserver.id) - env.pageserver.assert_log_contains("scheduled_compaction") + # ensure gc_compaction is scheduled + env.pageserver.assert_log_contains("scheduled_compact_timeline") + # and it's actually run instead of skipped (i.e., no layers to compact) + env.pageserver.assert_log_contains("gc-compaction statistics") log.info("Validating at workload end ...") workload.validate(env.pageserver.id) From 86f1c503e78e976774ccbabf7e398644d284524b Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 20 Nov 2024 17:03:46 -0500 Subject: [PATCH 05/12] fix test_compaction_smoke Signed-off-by: Alex Chi Z --- test_runner/regress/test_compaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index a43fe9a50421..c96687e4dd4c 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -24,7 +24,7 @@ # Compact small layers "compaction_target_size": 1024**2, "image_creation_threshold": 2, - "lsn_lease_length": "0s", + # "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later } From 6a8e825faf554ed742d0b448a9ad65a9385ee6c5 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 20 Nov 2024 22:57:22 -0500 Subject: [PATCH 06/12] fix assertion Signed-off-by: Alex Chi Z --- test_runner/regress/test_compaction.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index c96687e4dd4c..46c83ff24903 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -160,10 +160,8 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): workload.churn_rows(row_count, env.pageserver.id) - # ensure gc_compaction is scheduled - env.pageserver.assert_log_contains("scheduled_compact_timeline") - # and it's actually run instead of skipped (i.e., no layers to compact) - env.pageserver.assert_log_contains("gc-compaction statistics") + # ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked) + env.pageserver.assert_log_contains("scheduled_compact_timeline.*picked .* layers for compaction") log.info("Validating at workload end ...") workload.validate(env.pageserver.id) From 7bc2b34676a2ea005661f5d07a48ebd66e6c40cc Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 21 Nov 2024 11:03:11 -0500 Subject: [PATCH 07/12] ensure compaction gets scheduled for the workload Signed-off-by: Alex Chi Z --- test_runner/regress/test_compaction.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 46c83ff24903..af2841c1199d 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -113,7 +113,7 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_recei assert non_vectored_average < 8 assert vectored_average < 8 - +@skip_in_debug_build("only run with release build") def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): SMOKE_CONF = { # Run both gc and gc-compaction. @@ -130,8 +130,8 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): tenant_id = env.initial_tenant timeline_id = env.initial_timeline - row_count = 1000 - churn_rounds = 20 + row_count = 10000 + churn_rounds = 50 ps_http = env.pageserver.http_client() From b85c2e1c3ed090e9811de50157c4d7819d8fb494 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 21 Nov 2024 12:43:14 -0500 Subject: [PATCH 08/12] fix fmt Signed-off-by: Alex Chi Z --- test_runner/regress/test_compaction.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index af2841c1199d..f011a847383a 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -14,7 +14,6 @@ from fixtures.utils import skip_in_debug_build, wait_until from fixtures.workload import Workload - AGGRESSIVE_COMPACTION_TENANT_CONF = { # Disable gc and compaction. The test runs compaction manually. "gc_period": "0s", @@ -113,6 +112,7 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_recei assert non_vectored_average < 8 assert vectored_average < 8 + @skip_in_debug_build("only run with release build") def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): SMOKE_CONF = { @@ -161,7 +161,9 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): workload.churn_rows(row_count, env.pageserver.id) # ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked) - env.pageserver.assert_log_contains("scheduled_compact_timeline.*picked .* layers for compaction") + env.pageserver.assert_log_contains( + "scheduled_compact_timeline.*picked .* layers for compaction" + ) log.info("Validating at workload end ...") workload.validate(env.pageserver.id) From b8741faa949f9b84aab70006edf45b2ebabc7cac Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 21 Nov 2024 12:52:48 -0500 Subject: [PATCH 09/12] fix range Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 2 +- test_runner/regress/test_compaction.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ba9d1f174952..cfba32c5e17a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -47,9 +47,9 @@ use storage_broker::BrokerClientChannel; use timeline::compaction::ScheduledCompactionTask; use timeline::import_pgdata; use timeline::offload::offload_timeline; -use timeline::ShutdownMode; use timeline::CompactFlags; use timeline::CompactOptions; +use timeline::ShutdownMode; use tokio::io::BufReader; use tokio::sync::watch; use tokio::task::JoinSet; diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index f011a847383a..7ca698b49046 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -153,7 +153,8 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): "scheduled": True, "compact_range": { "start": "000000000000000000000000000000000000", - "end": "030000000000000000000000000000000000", + # skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this + "end": "010000000000000000000000000000000000", }, }, ) From 70f1c62f92274187c118f62a6eb9b08213111f21 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 5 Dec 2024 13:15:39 -0500 Subject: [PATCH 10/12] Update pageserver/src/tenant/timeline/compaction.rs Co-authored-by: Christian Schwarz --- pageserver/src/tenant/timeline/compaction.rs | 21 +++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index af687667d1b9..8ececa2bfb46 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1821,15 +1821,18 @@ impl Timeline { let layers = guard.layer_map()?; let gc_info = self.gc_info.read().unwrap(); let mut retain_lsns_below_horizon = Vec::new(); - let real_gc_cutoff = gc_info.cutoffs.select_min(); - // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for - // each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use - // the real cutoff. - let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff); - if gc_cutoff > real_gc_cutoff { - warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff); - gc_cutoff = real_gc_cutoff; - } + let gc_cutoff = { + let real_gc_cutoff = gc_info.cutoffs.select_min(); + // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for + // each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use + // the real cutoff. + let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff); + if gc_cutoff > real_gc_cutoff { + warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff); + gc_cutoff = real_gc_cutoff; + } + gc_cutoff + }; for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns { if lsn < &gc_cutoff { retain_lsns_below_horizon.push(*lsn); From 71ec0cba173750db00d37cabd84e0612416c4804 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 5 Dec 2024 13:24:39 -0500 Subject: [PATCH 11/12] rm LsnDisplay Signed-off-by: Alex Chi Z --- pageserver/src/http/routes.rs | 4 +--- pageserver/src/tenant/timeline.rs | 6 +----- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 1ef43d37b0bd..c44a95bf8f17 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2004,9 +2004,7 @@ async fn timeline_compact_handler( compact_range: compact_request .as_ref() .and_then(|r| r.compact_range.clone()), - compact_below_lsn: compact_request - .as_ref() - .and_then(|r| r.compact_below_lsn.as_ref().map(|x| x.0)), + compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn), flags, }; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b6530ec3a9c8..8fb03ec2be5e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -780,16 +780,12 @@ pub(crate) enum CompactFlags { #[derive(Debug, Clone, serde::Deserialize)] pub(crate) struct CompactRequest { pub compact_range: Option, - pub compact_below_lsn: Option, + pub compact_below_lsn: Option, /// Whether the compaction job should be scheduled. #[serde(default)] pub scheduled: bool, } -#[serde_with::serde_as] -#[derive(Debug, Clone, serde::Deserialize)] -pub(crate) struct LsnDisplay(#[serde_as(as = "serde_with::DisplayFromStr")] pub Lsn); - #[serde_with::serde_as] #[derive(Debug, Clone, serde::Deserialize)] pub(crate) struct CompactRange { From 7a0274cb48ecb6f2c8ec9708959fad100f1e773b Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 5 Dec 2024 13:31:38 -0500 Subject: [PATCH 12/12] add cancel api Signed-off-by: Alex Chi Z --- pageserver/src/http/routes.rs | 24 ++++++++++++++++++++++++ pageserver/src/tenant.rs | 14 ++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c44a95bf8f17..59da0ddbd711 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1962,6 +1962,26 @@ async fn timeline_gc_handler( json_response(StatusCode::OK, gc_result) } +// Cancel scheduled compaction tasks +async fn timeline_cancel_compact_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + let state = get_state(&request); + async { + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + tenant.cancel_scheduled_compaction(timeline_id); + json_response(StatusCode::OK, ()) + } + .instrument(info_span!("timeline_cancel_compact", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) + .await +} + // Run compaction immediately on given timeline. async fn timeline_compact_handler( mut request: Request, @@ -3305,6 +3325,10 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact", |r| api_handler(r, timeline_compact_handler), ) + .delete( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact", + |r| api_handler(r, timeline_cancel_compact_handler), + ) .put( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload", |r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index cfba32c5e17a..d75fd235879e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3043,6 +3043,20 @@ impl Tenant { Ok(has_pending_task) } + /// Cancel scheduled compaction tasks + pub(crate) fn cancel_scheduled_compaction( + &self, + timeline_id: TimelineId, + ) -> Vec { + let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); + if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) { + let current_tline_pending_tasks = std::mem::take(tline_pending_tasks); + current_tline_pending_tasks.into_iter().collect() + } else { + Vec::new() + } + } + /// Schedule a compaction task for a timeline. pub(crate) async fn schedule_compaction( &self,