From a05169d839103bf9008227fc203307efe1540b68 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 26 Nov 2024 11:45:05 -0500 Subject: [PATCH 1/5] feat(pageserver): gc-compaction split job and partial scheduler Signed-off-by: Alex Chi Z --- pageserver/src/http/routes.rs | 10 +- pageserver/src/tenant.rs | 49 +++++-- pageserver/src/tenant/timeline.rs | 7 + pageserver/src/tenant/timeline/compaction.rs | 131 ++++++++++++++++++- test_runner/regress/test_compaction.py | 1 + 5 files changed, 187 insertions(+), 11 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index b3981b4a8e7d..84fad67cf778 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2036,15 +2036,23 @@ async fn timeline_compact_handler( parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")? .unwrap_or(false); + let sub_compaction = compact_request + .as_ref() + .map(|r| r.sub_compaction) + .unwrap_or(false); let options = CompactOptions { compact_range: compact_request .as_ref() .and_then(|r| r.compact_range.clone()), compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn), flags, + sub_compaction, }; - let scheduled = compact_request.map(|r| r.scheduled).unwrap_or(false); + let scheduled = compact_request + .as_ref() + .map(|r| r.scheduled) + .unwrap_or(false); async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 306ec9f5486e..4a9c44aefdbc 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -49,6 +49,7 @@ use timeline::import_pgdata; use timeline::offload::offload_timeline; use timeline::CompactFlags; use timeline::CompactOptions; +use timeline::CompactionError; use timeline::ShutdownMode; use tokio::io::BufReader; use tokio::sync::watch; @@ -2987,10 +2988,16 @@ impl Tenant { if has_pending_l0_compaction_task { Some(true) } else { - let has_pending_scheduled_compaction_task; + let mut has_pending_scheduled_compaction_task; let next_scheduled_compaction_task = { let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) { + if !tline_pending_tasks.is_empty() { + info!( + "{} tasks left in the compaction schedule queue", + tline_pending_tasks.len() + ); + } let next_task = tline_pending_tasks.pop_front(); has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty(); next_task @@ -3007,6 +3014,32 @@ impl Tenant { .contains(CompactFlags::EnhancedGcBottomMostCompaction) { warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options); + } else if next_scheduled_compaction_task.options.sub_compaction { + info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); + let jobs = timeline + .gc_compaction_split_jobs(next_scheduled_compaction_task.options) + .await + .map_err(CompactionError::Other)?; + if jobs.is_empty() { + info!("no jobs to run, skipping scheduled compaction task"); + } else { + has_pending_scheduled_compaction_task = true; + let jobs_len = jobs.len(); + let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); + let tline_pending_tasks = guard.entry(*timeline_id).or_default(); + for (idx, job) in jobs.into_iter().enumerate() { + tline_pending_tasks.push_back(ScheduledCompactionTask { + options: job, + result_tx: if idx == jobs_len - 1 { + // The last compaction job sends the completion signal + next_scheduled_compaction_task.result_tx.take() + } else { + None + }, + }); + } + info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len); + } } else { let _ = timeline .compact_with_options( @@ -9244,7 +9277,7 @@ mod tests { CompactOptions { flags: dryrun_flags, compact_range: None, - compact_below_lsn: None, + ..Default::default() }, &ctx, ) @@ -9481,7 +9514,7 @@ mod tests { CompactOptions { flags: dryrun_flags, compact_range: None, - compact_below_lsn: None, + ..Default::default() }, &ctx, ) @@ -9973,7 +10006,7 @@ mod tests { CompactOptions { flags: EnumSet::new(), compact_range: Some((get_key(0)..get_key(2)).into()), - compact_below_lsn: None, + ..Default::default() }, &ctx, ) @@ -10020,7 +10053,7 @@ mod tests { CompactOptions { flags: EnumSet::new(), compact_range: Some((get_key(2)..get_key(4)).into()), - compact_below_lsn: None, + ..Default::default() }, &ctx, ) @@ -10072,7 +10105,7 @@ mod tests { CompactOptions { flags: EnumSet::new(), compact_range: Some((get_key(4)..get_key(9)).into()), - compact_below_lsn: None, + ..Default::default() }, &ctx, ) @@ -10123,7 +10156,7 @@ mod tests { CompactOptions { flags: EnumSet::new(), compact_range: Some((get_key(9)..get_key(10)).into()), - compact_below_lsn: None, + ..Default::default() }, &ctx, ) @@ -10179,7 +10212,7 @@ mod tests { CompactOptions { flags: EnumSet::new(), compact_range: Some((get_key(0)..get_key(10)).into()), - compact_below_lsn: None, + ..Default::default() }, &ctx, ) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fc69525bf4f7..11603741f552 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -785,6 +785,9 @@ pub(crate) struct CompactRequest { /// Whether the compaction job should be scheduled. #[serde(default)] pub scheduled: bool, + /// Whether the compaction job should be split across key ranges. + #[serde(default)] + pub sub_compaction: bool, } #[serde_with::serde_as] @@ -814,6 +817,9 @@ pub(crate) struct CompactOptions { /// If set, the compaction will only compact the LSN below this value. /// This option is only used by GC compaction. pub compact_below_lsn: Option, + /// Enable sub-compaction (split compaction job across key ranges). + /// This option is only used by GC compaction. + pub sub_compaction: bool, } impl std::fmt::Debug for Timeline { @@ -1629,6 +1635,7 @@ impl Timeline { flags, compact_range: None, compact_below_lsn: None, + sub_compaction: false, }, ctx, ) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 8ececa2bfb46..1d3e1aab4825 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use super::layer_manager::LayerManager; use super::{ - CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode, - RecordedDuration, Timeline, + CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder, + ImageLayerCreationMode, RecordedDuration, Timeline, }; use anyhow::{anyhow, bail, Context}; @@ -1751,6 +1751,103 @@ impl Timeline { Ok(()) } + /// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of + /// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much + /// ad-hoc information about gc compaction itself. + pub(crate) async fn gc_compaction_split_jobs( + self: &Arc, + options: CompactOptions, + ) -> anyhow::Result> { + if !options.sub_compaction { + return Ok(vec![options]); + } + let compact_range = options.compact_range.clone().unwrap_or(CompactRange { + start: Key::MIN, + end: Key::MAX, + }); + let compact_lsn = if let Some(compact_lsn) = options.compact_below_lsn { + compact_lsn + } else { + let gc_info = self.gc_info.read().unwrap(); + gc_info.cutoffs.select_min() // use the real gc cutoff + }; + let mut compact_jobs = Vec::new(); + // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning + // by estimating the amount of files read for a compaction job. We should also partition on LSN. + let partition = self.partitioning.lock().await; + let ((dense_ks, sparse_ks), _) = &*partition; + // Truncate the key range to be within user specified compaction range. + fn truncate_to( + source_start: &Key, + source_end: &Key, + target_start: &Key, + target_end: &Key, + ) -> Option<(Key, Key)> { + let start = source_start.max(target_start); + let end = source_end.min(target_end); + if start < end { + Some((*start, *end)) + } else { + None + } + } + let mut split_key_ranges = Vec::new(); + for partition in &dense_ks.parts { + for range in partition.ranges.iter() { + let Some((start, end)) = truncate_to( + &range.start, + &range.end, + &compact_range.start, + &compact_range.end, + ) else { + continue; + }; + split_key_ranges.push((start, end)); + } + } + for partition in &sparse_ks.parts { + for range in partition.0.ranges.iter() { + let Some((start, end)) = truncate_to( + &range.start, + &range.end, + &compact_range.start, + &compact_range.end, + ) else { + continue; + }; + split_key_ranges.push((start, end)); + } + } + let guard = self.layers.read().await; + let layer_map = guard.layer_map()?; + let mut current_start = None; + // Split compaction job to about 2GB each + const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 2GB, TODO: should be configuration in the future + let ranges_num = split_key_ranges.len(); + for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() { + if current_start.is_none() { + current_start = Some(start); + } + let start = current_start.unwrap(); + let res = layer_map.range_search(start..end, compact_lsn); + let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::(); + if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 { + let mut compact_options = options.clone(); + debug!( + "splitting compaction job: {}..{}, estimated_size={}", + start, end, total_size + ); + compact_options.compact_range = Some(CompactRange { start, end }); + compact_options.compact_below_lsn = Some(compact_lsn); + compact_options.sub_compaction = false; + compact_jobs.push(compact_options); + current_start = Some(end); + } + } + drop(guard); + Ok(compact_jobs) + } + /// An experimental compaction building block that combines compaction with garbage collection. /// /// The current implementation picks all delta + image layers that are below or intersecting with @@ -1773,6 +1870,36 @@ impl Timeline { options: CompactOptions, ctx: &RequestContext, ) -> anyhow::Result<()> { + if options.sub_compaction { + info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); + let jobs = self.gc_compaction_split_jobs(options).await?; + let jobs_len = jobs.len(); + for (idx, job) in jobs.into_iter().enumerate() { + info!( + "running enhanced gc bottom-most compaction, sub-compaction {}/{}", + idx + 1, + jobs_len + ); + self.compact_with_gc_inner(cancel, job, ctx).await?; + } + if jobs_len == 0 { + info!("no jobs to run, skipping gc bottom-most compaction"); + } + return Ok(()); + } + self.compact_with_gc_inner(cancel, options, ctx).await + } + + async fn compact_with_gc_inner( + self: &Arc, + cancel: &CancellationToken, + options: CompactOptions, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + assert!( + !options.sub_compaction, + "sub-compaction should be handled by the outer function" + ); // Block other compaction/GC tasks from running for now. GC-compaction could run along // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. // Note that we already acquired the compaction lock when the outer `compact` function gets called. diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index de6653eb3f4e..e92dc47f3980 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -159,6 +159,7 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): enhanced_gc_bottom_most_compaction=True, body={ "scheduled": True, + "sub_compaction": True, "compact_range": { "start": "000000000000000000000000000000000000", # skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this From 15f26feb84b73ac3a0ce2c3514f7cc57d0d00732 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 5 Dec 2024 15:44:20 -0500 Subject: [PATCH 2/5] improve split algorithm Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 56 +++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 1d3e1aab4825..7c8935200c14 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1792,31 +1792,25 @@ impl Timeline { } } let mut split_key_ranges = Vec::new(); - for partition in &dense_ks.parts { - for range in partition.ranges.iter() { - let Some((start, end)) = truncate_to( - &range.start, - &range.end, - &compact_range.start, - &compact_range.end, - ) else { - continue; - }; - split_key_ranges.push((start, end)); - } - } - for partition in &sparse_ks.parts { - for range in partition.0.ranges.iter() { - let Some((start, end)) = truncate_to( - &range.start, - &range.end, - &compact_range.start, - &compact_range.end, - ) else { - continue; - }; - split_key_ranges.push((start, end)); - } + let mut ranges = dense_ks + .parts + .iter() + .map(|partition| partition.ranges.iter()) + .chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter())) + .flatten() + .cloned() + .collect_vec(); + ranges.sort_by(|a, b| ((&a.start, &a.end)).cmp(&(&b.start, &b.end))); + for range in ranges.iter() { + let Some((start, end)) = truncate_to( + &range.start, + &range.end, + &compact_range.start, + &compact_range.end, + ) else { + continue; + }; + split_key_ranges.push((start, end)); } let guard = self.layers.read().await; let layer_map = guard.layer_map()?; @@ -1829,10 +1823,22 @@ impl Timeline { current_start = Some(start); } let start = current_start.unwrap(); + if end >= start { + // We have already processed this partition. + continue; + } let res = layer_map.range_search(start..end, compact_lsn); let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::(); if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 { let mut compact_options = options.clone(); + // Try to extend the compaction range so that we include at least one full layer file. + let extended_end = res + .found + .iter() + .map(|(layer, _)| layer.layer.key_range.end) + .min() + .expect("at least one layer in the resuult?"); + let end = extended_end.max(end); debug!( "splitting compaction job: {}..{}, estimated_size={}", start, end, total_size From 4d4686d3fa9779e21dc86ea15f256c1d0109b167 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 5 Dec 2024 15:49:02 -0500 Subject: [PATCH 3/5] resolve some comments Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 7c8935200c14..e4f3e0ee93be 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1774,7 +1774,9 @@ impl Timeline { let mut compact_jobs = Vec::new(); // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning // by estimating the amount of files read for a compaction job. We should also partition on LSN. - let partition = self.partitioning.lock().await; + let Ok(partition) = self.partitioning.try_lock() else { + bail!("failed to acquire partition lock"); + }; let ((dense_ks, sparse_ks), _) = &*partition; // Truncate the key range to be within user specified compaction range. fn truncate_to( @@ -1816,7 +1818,7 @@ impl Timeline { let layer_map = guard.layer_map()?; let mut current_start = None; // Split compaction job to about 2GB each - const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 2GB, TODO: should be configuration in the future + const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future let ranges_num = split_key_ranges.len(); for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() { if current_start.is_none() { @@ -1839,7 +1841,7 @@ impl Timeline { .min() .expect("at least one layer in the resuult?"); let end = extended_end.max(end); - debug!( + info!( "splitting compaction job: {}..{}, estimated_size={}", start, end, total_size ); From e8f2cdc9c93420870ba3aabc230f4cedc29cc4a6 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 6 Dec 2024 12:06:11 -0500 Subject: [PATCH 4/5] resolve comments Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index e4f3e0ee93be..fbd1160995c3 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1765,8 +1765,8 @@ impl Timeline { start: Key::MIN, end: Key::MAX, }); - let compact_lsn = if let Some(compact_lsn) = options.compact_below_lsn { - compact_lsn + let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn { + compact_below_lsn } else { let gc_info = self.gc_info.read().unwrap(); gc_info.cutoffs.select_min() // use the real gc cutoff @@ -1802,7 +1802,7 @@ impl Timeline { .flatten() .cloned() .collect_vec(); - ranges.sort_by(|a, b| ((&a.start, &a.end)).cmp(&(&b.start, &b.end))); + ranges.sort_by(|a, b| (&a.start, &a.end).cmp(&(&b.start, &b.end))); for range in ranges.iter() { let Some((start, end)) = truncate_to( &range.start, @@ -1829,15 +1829,15 @@ impl Timeline { // We have already processed this partition. continue; } - let res = layer_map.range_search(start..end, compact_lsn); + let res = layer_map.range_search(start..end, compact_below_lsn); let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::(); if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 { let mut compact_options = options.clone(); // Try to extend the compaction range so that we include at least one full layer file. let extended_end = res .found - .iter() - .map(|(layer, _)| layer.layer.key_range.end) + .keys() + .map(|layer| layer.layer.key_range.end) .min() .expect("at least one layer in the resuult?"); let end = extended_end.max(end); @@ -1846,7 +1846,7 @@ impl Timeline { start, end, total_size ); compact_options.compact_range = Some(CompactRange { start, end }); - compact_options.compact_below_lsn = Some(compact_lsn); + compact_options.compact_below_lsn = Some(compact_below_lsn); compact_options.sub_compaction = false; compact_jobs.push(compact_options); current_start = Some(end); From 4c86d0caedb02a9c13c357e3982865c4b694d4a6 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 6 Dec 2024 12:41:01 -0500 Subject: [PATCH 5/5] disable valid layer map check for now Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 35 +++++++++++--------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index fbd1160995c3..0471cc449ef9 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -29,7 +29,6 @@ use utils::id::TimelineId; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; use crate::statvfs::Statvfs; -use crate::tenant::checks::check_valid_layermap; use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::storage_layer::batch_split_writer::{ BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter, @@ -1794,7 +1793,7 @@ impl Timeline { } } let mut split_key_ranges = Vec::new(); - let mut ranges = dense_ks + let ranges = dense_ks .parts .iter() .map(|partition| partition.ranges.iter()) @@ -1802,7 +1801,6 @@ impl Timeline { .flatten() .cloned() .collect_vec(); - ranges.sort_by(|a, b| (&a.start, &a.end).cmp(&(&b.start, &b.end))); for range in ranges.iter() { let Some((start, end)) = truncate_to( &range.start, @@ -1814,6 +1812,7 @@ impl Timeline { }; split_key_ranges.push((start, end)); } + split_key_ranges.sort(); let guard = self.layers.read().await; let layer_map = guard.layer_map()?; let mut current_start = None; @@ -1825,7 +1824,7 @@ impl Timeline { current_start = Some(start); } let start = current_start.unwrap(); - if end >= start { + if start >= end { // We have already processed this partition. continue; } @@ -1838,9 +1837,14 @@ impl Timeline { .found .keys() .map(|layer| layer.layer.key_range.end) - .min() - .expect("at least one layer in the resuult?"); - let end = extended_end.max(end); + .min(); + // It is possible that the search range does not contain any layer files when we reach the end of the loop. + // In this case, we simply use the specified key range end. + let end = if let Some(extended_end) = extended_end { + extended_end.max(end) + } else { + end + }; info!( "splitting compaction job: {}..{}, estimated_size={}", start, end, total_size @@ -2077,14 +2081,15 @@ impl Timeline { // Step 1: construct a k-merge iterator over all layers. // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. - let layer_names = job_desc - .selected_layers - .iter() - .map(|layer| layer.layer_desc().layer_name()) - .collect_vec(); - if let Some(err) = check_valid_layermap(&layer_names) { - warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err); - } + // disable the check for now because we need to adjust the check for partial compactions, will enable later. + // let layer_names = job_desc + // .selected_layers + // .iter() + // .map(|layer| layer.layer_desc().layer_name()) + // .collect_vec(); + // if let Some(err) = check_valid_layermap(&layer_names) { + // warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err); + // } // The maximum LSN we are processing in this compaction loop let end_lsn = job_desc .selected_layers