From 4b01a48446bac7151c0ee8c567fd63d1daafc0a5 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Sun, 8 Dec 2024 15:27:35 -0500 Subject: [PATCH] a little refactor Signed-off-by: Alex Chi Z --- pageserver/src/http/routes.rs | 9 +- pageserver/src/tenant.rs | 21 ++- pageserver/src/tenant/timeline.rs | 67 ++++++-- pageserver/src/tenant/timeline/compaction.rs | 153 +++++++++++-------- 4 files changed, 165 insertions(+), 85 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 99ffda1aedffb..376bbd89cb9bb 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2041,13 +2041,12 @@ async fn timeline_compact_handler( .map(|r| r.sub_compaction) .unwrap_or(false); let options = CompactOptions { - compact_range: compact_request + compact_key_range: compact_request .as_ref() - .and_then(|r| r.compact_range.clone()), - compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn), - compact_above_lsn: compact_request + .and_then(|r| r.compact_key_range.clone()), + compact_lsn_range: compact_request .as_ref() - .and_then(|r| r.compact_above_lsn.as_ref().map(|x| x.0)), + .and_then(|r| r.compact_lsn_range.clone()), flags, sub_compaction, }; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 55ad75d5b5849..baeba84c27d1b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -44,6 +44,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; +use timeline::compaction::GcCompactJob; use timeline::compaction::ScheduledCompactionTask; use timeline::import_pgdata; use timeline::offload::offload_timeline; @@ -3017,7 +3018,9 @@ impl Tenant { } else if next_scheduled_compaction_task.options.sub_compaction { info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); let jobs = timeline - .gc_compaction_split_jobs(next_scheduled_compaction_task.options) + .gc_compaction_split_jobs(GcCompactJob::from_compact_options( + next_scheduled_compaction_task.options, + )) .await .map_err(CompactionError::Other)?; if jobs.is_empty() { @@ -3029,7 +3032,21 @@ impl Tenant { let tline_pending_tasks = guard.entry(*timeline_id).or_default(); for (idx, job) in jobs.into_iter().enumerate() { tline_pending_tasks.push_back(ScheduledCompactionTask { - options: job, + // TODO: use a single `GcCompactJob` here instead of converting it back + // to `CompactOptions` + options: CompactOptions { + flags: if job.dry_run { + let mut flags: EnumSet = + EnumSet::default(); + flags |= CompactFlags::DryRun; + flags + } else { + EnumSet::default() + }, + sub_compaction: false, + compact_key_range: Some(job.compact_key_range.into()), + compact_lsn_range: Some(job.compact_lsn_range.into()), + }, result_tx: if idx == jobs_len - 1 { // The last compaction job sends the completion signal next_scheduled_compaction_task.result_tx.take() diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 6ca9c0d9d12a8..b9940aa2e30e9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -780,35 +780,71 @@ pub(crate) enum CompactFlags { #[serde_with::serde_as] #[derive(Debug, Clone, serde::Deserialize)] pub(crate) struct CompactRequest { - pub compact_range: Option, - pub compact_below_lsn: Option, - pub compact_above_lsn: Option, + pub compact_key_range: Option, + pub compact_lsn_range: Option, + /// Whether the compaction job should be scheduled. + #[serde(default)] + pub scheduled: bool, + /// Whether the compaction job should be split across key ranges. + #[serde(default)] + pub sub_compaction: bool, +} + +#[serde_with::serde_as] +#[derive(Debug, Clone, serde::Deserialize)] +pub(crate) struct CompactLsnRange { + pub start: Lsn, + pub end: Lsn, +} + +#[serde_with::serde_as] +#[derive(Debug, Clone, serde::Deserialize)] +pub(crate) struct CompactKeyRange { + #[serde_as(as = "serde_with::DisplayFromStr")] pub start: Key, #[serde_as(as = "serde_with::DisplayFromStr")] pub end: Key, } -impl From> for CompactRange { +impl From> for CompactLsnRange { + fn from(range: Range) -> Self { + Self { + start: range.start, + end: range.end, + } + } +} + +impl From> for CompactKeyRange { fn from(range: Range) -> Self { - CompactRange { + Self { start: range.start, end: range.end, } } } +impl From for Range { + fn from(range: CompactLsnRange) -> Self { + range.start..range.end + } +} + +impl From for Range { + fn from(range: CompactKeyRange) -> Self { + range.start..range.end + } +} + #[derive(Debug, Clone, Default)] pub(crate) struct CompactOptions { pub flags: EnumSet, /// If set, the compaction will only compact the key range specified by this option. - /// This option is only used by GC compaction. - pub compact_range: Option, - /// If set, the compaction will only compact the LSN below this value. - /// This option is only used by GC compaction. - pub compact_below_lsn: Option, - /// If set, the compaction will only compact the LSN above this value. - /// This option is only used by GC compaction. - pub compact_above_lsn: Option, + /// This option is only used by GC compaction. For the full explanation, see [`GcCompactionJob`]. + pub compact_key_range: Option, + /// If set, the compaction will only compact the LSN within this value. + /// This option is only used by GC compaction. For the full explanation, see [`GcCompactionJob`]. + pub compact_lsn_range: Option, /// Enable sub-compaction (split compaction job across key ranges). /// This option is only used by GC compaction. pub sub_compaction: bool, @@ -1633,9 +1669,8 @@ impl Timeline { cancel, CompactOptions { flags, - compact_range: None, - compact_below_lsn: None, - compact_above_lsn: None, + compact_key_range: None, + compact_lsn_range: None, sub_compaction: false, }, ctx, diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 97adec050100e..2d5ee51c76752 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use super::layer_manager::LayerManager; use super::{ - CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder, - ImageLayerCreationMode, RecordedDuration, Timeline, + CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode, + RecordedDuration, Timeline, }; use anyhow::{anyhow, bail, Context}; @@ -64,22 +64,64 @@ const COMPACTION_DELTA_THRESHOLD: usize = 5; /// A scheduled compaction task. pub struct ScheduledCompactionTask { + /// It's unfortunate that we need to store a compact options struct here because the only outer + /// API we can call here is `compact_with_options` which does a few setup calls before starting the + /// actual compaction job... We should refactor this to store `GcCompactionJob` in the future. pub options: CompactOptions, pub result_tx: Option>, } +/// A job description for the gc-compaction job. This structure describes the rectangle range that the job will +/// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets +/// called. +#[derive(Debug, Clone)] +pub(crate) struct GcCompactJob { + pub dry_run: bool, + /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range + /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary. + pub compact_key_range: Range, + /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be + /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range + /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`]. + /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here. + pub compact_lsn_range: Range, +} + +impl GcCompactJob { + pub fn from_compact_options(options: CompactOptions) -> Self { + GcCompactJob { + dry_run: options.flags.contains(CompactFlags::DryRun), + compact_key_range: options + .compact_key_range + .map(|x| x.into()) + .unwrap_or(Key::MIN..Key::MAX), + compact_lsn_range: options + .compact_lsn_range + .map(|x| x.into()) + .unwrap_or(Lsn::INVALID..Lsn::MAX), + } + } +} + +/// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called +/// and contains the exact layers we want to compact. pub struct GcCompactionJobDescription { /// All layers to read in the compaction job selected_layers: Vec, - /// GC cutoff of the job + /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to + /// keep all deltas <= this LSN or generate an image == this LSN. gc_cutoff: Lsn, - /// LSNs to retain for the job + /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or + /// generate an image == this LSN. retain_lsns_below_horizon: Vec, - /// Maximum layer LSN processed in this compaction + /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data + /// >= this LSN will be kept and will not be rewritten. max_layer_lsn: Lsn, - /// Minimum layer LSN processed in this compaction + /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive. + /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of + /// k-merge within gc-compaction. min_layer_lsn: Lsn, - /// Only compact layers overlapping with this range + /// Only compact layers overlapping with this range. compaction_key_range: Range, /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap. /// This field is here solely for debugging. The field will not be read once the compaction @@ -298,7 +340,7 @@ impl Timeline { ))); } - if options.compact_range.is_some() { + if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() { // maybe useful in the future? could implement this at some point return Err(CompactionError::Other(anyhow!( "compaction range is not supported for legacy compaction for now" @@ -1753,22 +1795,16 @@ impl Timeline { Ok(()) } - /// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of - /// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much - /// ad-hoc information about gc compaction itself. + /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job. + /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN + /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts + /// like a full compaction of the specified keyspace. pub(crate) async fn gc_compaction_split_jobs( self: &Arc, - options: CompactOptions, - ) -> anyhow::Result> { - if !options.sub_compaction { - return Ok(vec![options]); - } - let compact_range = options.compact_range.clone().unwrap_or(CompactRange { - start: Key::MIN, - end: Key::MAX, - }); - let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn { - compact_below_lsn + job: GcCompactJob, + ) -> anyhow::Result> { + let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX { + job.compact_lsn_range.end } else { let gc_info = self.gc_info.read().unwrap(); gc_info.cutoffs.select_min() // use the real gc cutoff @@ -1808,8 +1844,8 @@ impl Timeline { let Some((start, end)) = truncate_to( &range.start, &range.end, - &compact_range.start, - &compact_range.end, + &job.compact_key_range.start, + &job.compact_key_range.end, ) else { continue; }; @@ -1834,7 +1870,6 @@ impl Timeline { let res = layer_map.range_search(start..end, compact_below_lsn); let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::(); if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 { - let mut compact_options = options.clone(); // Try to extend the compaction range so that we include at least one full layer file. let extended_end = res .found @@ -1852,10 +1887,11 @@ impl Timeline { "splitting compaction job: {}..{}, estimated_size={}", start, end, total_size ); - compact_options.compact_range = Some(CompactRange { start, end }); - compact_options.compact_below_lsn = Some(compact_below_lsn); - compact_options.sub_compaction = false; - compact_jobs.push(compact_options); + compact_jobs.push(GcCompactJob { + dry_run: job.dry_run, + compact_key_range: start..end, + compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn, + }); current_start = Some(end); } } @@ -1885,9 +1921,11 @@ impl Timeline { options: CompactOptions, ctx: &RequestContext, ) -> anyhow::Result<()> { - if options.sub_compaction { + let sub_compaction = options.sub_compaction; + let job = GcCompactJob::from_compact_options(options); + if sub_compaction { info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); - let jobs = self.gc_compaction_split_jobs(options).await?; + let jobs = self.gc_compaction_split_jobs(job).await?; let jobs_len = jobs.len(); for (idx, job) in jobs.into_iter().enumerate() { info!( @@ -1902,19 +1940,15 @@ impl Timeline { } return Ok(()); } - self.compact_with_gc_inner(cancel, options, ctx).await + self.compact_with_gc_inner(cancel, job, ctx).await } async fn compact_with_gc_inner( self: &Arc, cancel: &CancellationToken, - options: CompactOptions, + job: GcCompactJob, ctx: &RequestContext, ) -> anyhow::Result<()> { - assert!( - !options.sub_compaction, - "sub-compaction should be handled by the outer function" - ); // Block other compaction/GC tasks from running for now. GC-compaction could run along // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. // Note that we already acquired the compaction lock when the outer `compact` function gets called. @@ -1934,19 +1968,11 @@ impl Timeline { ) .await?; - let flags = options.flags; - let compaction_key_range = options - .compact_range - .map(|range| range.start..range.end) - .unwrap_or_else(|| Key::MIN..Key::MAX); + let dry_run = job.dry_run; + let compact_key_range = job.compact_key_range; + let compact_lsn_range = job.compact_lsn_range; - let dry_run = flags.contains(CompactFlags::DryRun); - - if compaction_key_range == (Key::MIN..Key::MAX) { - info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compaction_key_range={}..{}", compaction_key_range.start, compaction_key_range.end); - } else { - info!("running enhanced gc bottom-most compaction, dry_run={dry_run}"); - } + info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end); scopeguard::defer! { info!("done enhanced gc bottom-most compaction"); @@ -1968,7 +1994,11 @@ impl Timeline { // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for // each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use // the real cutoff. - let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff); + let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX { + real_gc_cutoff + } else { + compact_lsn_range.end + }; if gc_cutoff > real_gc_cutoff { warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff); gc_cutoff = real_gc_cutoff; @@ -2000,16 +2030,16 @@ impl Timeline { let Some(min_layer_lsn) = layers .iter_historic_layers() .filter(|desc| { - if let Some(compact_above_lsn) = options.compact_above_lsn { - desc.get_lsn_range().end > compact_above_lsn // strictly larger than compact_above_lsn + if compact_lsn_range.start == Lsn::INVALID { + true // select all layers below if start == Lsn(0) } else { - true + desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn } }) .map(|desc| desc.get_lsn_range().start) .min() else { - info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", options.compact_above_lsn.unwrap_or_default()); + info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end); return Ok(()); }; // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key @@ -2018,22 +2048,21 @@ impl Timeline { for desc in layers.iter_historic_layers() { if desc.get_lsn_range().end <= max_layer_lsn && desc.get_lsn_range().start >= min_layer_lsn - && overlaps_with(&desc.get_key_range(), &compaction_key_range) + && overlaps_with(&desc.get_key_range(), &compact_key_range) { // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range, // even if it might contain extra keys selected_layers.push(guard.get_from_desc(&desc)); // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine // to overlap image layers) - if desc.is_delta() - && !fully_contains(&compaction_key_range, &desc.get_key_range()) + if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range()) { rewrite_layers.push(desc); } } } if selected_layers.is_empty() { - info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compaction_key_range.start, compaction_key_range.end); + info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end); return Ok(()); } retain_lsns_below_horizon.sort(); @@ -2043,11 +2072,11 @@ impl Timeline { retain_lsns_below_horizon, min_layer_lsn, max_layer_lsn, - compaction_key_range, + compaction_key_range: compact_key_range, rewrite_layers, } }; - let (has_data_below, lowest_retain_lsn) = if options.compact_above_lsn.is_some() { + let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID { // If we only compact above some LSN, we should get the history from the current branch below the specified LSN. // We use job_desc.min_layer_lsn as if it's the lowest branch point. (true, job_desc.min_layer_lsn) @@ -2190,9 +2219,9 @@ impl Timeline { /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`). /// The two cases are compaction in ancestor branches and `compact_above_lsn=Some`. /// In those cases, we need to pull up data from below the LSN range we're compaction. - /// + /// /// This function unifies the cases so that later code doesn't have to think about it. - /// + /// /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image /// is needed for reconstruction. This should be fixed in the future. ///