diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1066d165cd38d..56ec2bb27fbae 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4591,6 +4591,7 @@ mod tests { use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings}; + use pageserver_compaction::helpers::overlaps_with; use rand::{thread_rng, Rng}; use storage_layer::PersistentLayerKey; use tests::storage_layer::ValuesReconstructState; @@ -7104,23 +7105,7 @@ mod tests { } // Check if old layers are removed / new layers have the expected LSN - let mut all_layers = tline.inspect_historic_layers().await.unwrap(); - all_layers.sort_by(|k1, k2| { - ( - k1.is_delta, - k1.key_range.start, - k1.key_range.end, - k1.lsn_range.start, - k1.lsn_range.end, - ) - .cmp(&( - k2.is_delta, - k2.key_range.start, - k2.key_range.end, - k2.lsn_range.start, - k2.lsn_range.end, - )) - }); + let all_layers = inspect_and_sort(&tline, None).await; assert_eq!( all_layers, vec![ @@ -8657,4 +8642,247 @@ mod tests { Ok(()) } + + async fn inspect_and_sort( + tline: &Arc, + filter: Option>, + ) -> Vec { + let mut all_layers = tline.inspect_historic_layers().await.unwrap(); + if let Some(filter) = filter { + all_layers.retain(|layer| overlaps_with(&layer.key_range, &filter)); + } + all_layers.sort_by(|k1, k2| { + ( + k1.is_delta, + k1.key_range.start, + k1.key_range.end, + k1.lsn_range.start, + k1.lsn_range.end, + ) + .cmp(&( + k2.is_delta, + k2.key_range.start, + k2.key_range.end, + k2.lsn_range.start, + k2.lsn_range.end, + )) + }); + all_layers + } + + #[tokio::test] + async fn test_simple_partial_bottom_most_compaction() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_simple_partial_bottom_most_compaction").await?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + // using aux key here b/c they are guaranteed to be inside `collect_keyspace`. + let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + // img layer at 0x10 + let img_layer = (0..10) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) + .collect_vec(); + + let delta1 = vec![ + ( + get_key(1), + Lsn(0x20), + Value::Image(Bytes::from("value 1@0x20")), + ), + ( + get_key(2), + Lsn(0x30), + Value::Image(Bytes::from("value 2@0x30")), + ), + ( + get_key(3), + Lsn(0x40), + Value::Image(Bytes::from("value 3@0x40")), + ), + ]; + let delta2 = vec![ + ( + get_key(5), + Lsn(0x20), + Value::Image(Bytes::from("value 5@0x20")), + ), + ( + get_key(6), + Lsn(0x20), + Value::Image(Bytes::from("value 6@0x20")), + ), + ]; + let delta3 = vec![ + ( + get_key(8), + Lsn(0x48), + Value::Image(Bytes::from("value 8@0x48")), + ), + ( + get_key(9), + Lsn(0x48), + Value::Image(Bytes::from("value 9@0x48")), + ), + ]; + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![ + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3), + ], // delta layers + vec![(Lsn(0x10), img_layer)], // image layers + Lsn(0x50), + ) + .await?; + + { + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No)], + cutoffs: GcCutoffs { + time: Lsn(0x30), + space: Lsn(0x30), + }, + leases: Default::default(), + within_ancestor_pitr: false, + }; + } + + let cancel = CancellationToken::new(); + + // Do a partial compaction on key range 0..4, we should generate a image layer; no other layers + // can be removed because they might be used for other key ranges. + tline + .partial_compact_with_gc(Some(get_key(0)..get_key(4)), &cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + assert_eq!( + all_layers, + vec![ + PersistentLayerKey { + key_range: get_key(0)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false + }, + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false + }, + PersistentLayerKey { + key_range: get_key(1)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true + }, + PersistentLayerKey { + key_range: get_key(5)..get_key(7), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true + }, + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x48)..Lsn(0x50), + is_delta: true + } + ] + ); + + // Do a partial compaction on key range 4..10 + tline + .partial_compact_with_gc(Some(get_key(4)..get_key(10)), &cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + assert_eq!( + all_layers, + vec![ + PersistentLayerKey { + key_range: get_key(0)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false + }, + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false + }, // if GC kicks in, this layer will be removed + PersistentLayerKey { + key_range: get_key(4)..get_key(10), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false + }, + PersistentLayerKey { + key_range: get_key(1)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true + }, + PersistentLayerKey { + key_range: get_key(5)..get_key(7), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true + }, + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x48)..Lsn(0x50), + is_delta: true + } + ] + ); + + // Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones. + tline + .partial_compact_with_gc(Some(get_key(0)..get_key(10)), &cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + assert_eq!( + all_layers, + vec![ + PersistentLayerKey { + key_range: get_key(0)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false + }, + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false + }, + PersistentLayerKey { + key_range: get_key(4)..get_key(10), + lsn_range: Lsn(0x20)..Lsn(0x21), + is_delta: false + }, + PersistentLayerKey { + key_range: get_key(1)..get_key(4), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true + }, + PersistentLayerKey { + key_range: get_key(5)..get_key(7), + lsn_range: Lsn(0x20)..Lsn(0x48), + is_delta: true + }, + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x48)..Lsn(0x50), + is_delta: true + } + ] + ); + + Ok(()) + } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 5588363330d04..314a2db2c3790 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1731,20 +1731,32 @@ impl Timeline { Ok(()) } + pub(crate) async fn compact_with_gc( + self: &Arc, + cancel: &CancellationToken, + flags: EnumSet, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + self.partial_compact_with_gc(None, cancel, flags, ctx).await + } + /// An experimental compaction building block that combines compaction with garbage collection. /// /// The current implementation picks all delta + image layers that are below or intersecting with /// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta /// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon, /// and create delta layers with all deltas >= gc horizon. - pub(crate) async fn compact_with_gc( + /// + /// If `key_range`, it will only compact the keys within the range, aka partial compaction. This functionality + /// is not complete yet, and if it is set, only image layers will be generated. + /// + pub(crate) async fn partial_compact_with_gc( self: &Arc, + compaction_key_range: Option>, cancel: &CancellationToken, flags: EnumSet, ctx: &RequestContext, ) -> anyhow::Result<()> { - use std::collections::BTreeSet; - // Block other compaction/GC tasks from running for now. GC-compaction could run along // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. // Note that we already acquired the compaction lock when the outer `compact` function gets called. @@ -1765,8 +1777,13 @@ impl Timeline { .await?; let dry_run = flags.contains(CompactFlags::DryRun); + let partial_compaction = compaction_key_range.is_some(); - info!("running enhanced gc bottom-most compaction, dry_run={dry_run}"); + if let Some(ref compaction_key_range) = compaction_key_range { + info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compaction_key_range={}..{}", compaction_key_range.start, compaction_key_range.end); + } else { + info!("running enhanced gc bottom-most compaction, dry_run={dry_run}"); + } scopeguard::defer! { info!("done enhanced gc bottom-most compaction"); @@ -1778,7 +1795,7 @@ impl Timeline { // The layer selection has the following properties: // 1. If a layer is in the selection, all layers below it are in the selection. // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection. - let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = { + let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = if !partial_compaction { let guard = self.layers.read().await; let layers = guard.layer_map()?; let gc_info = self.gc_info.read().unwrap(); @@ -1794,7 +1811,7 @@ impl Timeline { retain_lsns_below_horizon.push(*lsn); } } - let mut selected_layers = Vec::new(); + let mut selected_layers: Vec = Vec::new(); drop(gc_info); // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers. let Some(max_layer_lsn) = layers @@ -1808,8 +1825,13 @@ impl Timeline { }; // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key // layers to compact. + let Some(compaction_key_range) = compaction_key_range.as_ref() else { + unreachable!() + }; for desc in layers.iter_historic_layers() { - if desc.get_lsn_range().end <= max_layer_lsn { + if desc.get_lsn_range().end <= max_layer_lsn + && overlaps_with(&desc.key_range, compaction_key_range) + { selected_layers.push(guard.get_from_desc(&desc)); } } @@ -1819,8 +1841,47 @@ impl Timeline { } retain_lsns_below_horizon.sort(); (selected_layers, gc_cutoff, retain_lsns_below_horizon) + } else { + // In case of partial compaction, we currently only support generating image layers, and therefore, + // we pick all layers that are below the lowest retain_lsn and does not intersect with any of the layers. + let guard = self.layers.read().await; + let layers = guard.layer_map()?; + let gc_info = self.gc_info.read().unwrap(); + let mut min_lsn = gc_info.cutoffs.select_min(); + for (lsn, _) in &gc_info.retain_lsns { + if lsn < &min_lsn { + min_lsn = *lsn; + } + } + for lsn in gc_info.leases.keys() { + if lsn < &min_lsn { + min_lsn = *lsn; + } + } + let mut selected_layers = Vec::new(); + drop(gc_info); + // |-------| |-------| |-------| + // | Delta | | Delta | | Delta | -- min_lsn could be intersecting with the layers + // |-------| |-------| |-------| <- we want to pick all the layers below min_lsn, so that + // | Delta | | Delta | | Delta | ...we can remove them after compaction + // |-------| |-------| |-------| + // Pick all the layers intersect or below the min_lsn, get the largest LSN in the selected layers. + for desc in layers.iter_historic_layers() { + if desc.get_lsn_range().end <= min_lsn { + selected_layers.push(guard.get_from_desc(&desc)); + } + } + if selected_layers.is_empty() { + info!("no layers to compact with gc"); + return Ok(()); + } + (selected_layers, min_lsn, Vec::new()) }; let lowest_retain_lsn = if self.ancestor_timeline.is_some() { + if partial_compaction { + warn!("partial compaction cannot run on child branches (for now)"); + return Ok(()); + } Lsn(self.ancestor_lsn.0 + 1) } else { let res = retain_lsns_below_horizon @@ -1848,23 +1909,18 @@ impl Timeline { self.check_compaction_space(&layer_selection).await?; - // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs. - // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. - let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?) + // Generate statistics for the compaction for layer in &layer_selection { let desc = layer.layer_desc(); if desc.is_delta() { - // ignore single-key layer files - if desc.key_range.start.next() != desc.key_range.end { - let lsn_range = &desc.lsn_range; - lsn_split_point.insert(lsn_range.start); - lsn_split_point.insert(lsn_range.end); - } stat.visit_delta_layer(desc.file_size()); } else { stat.visit_image_layer(desc.file_size()); } } + + // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs. + // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. let layer_names: Vec = layer_selection .iter() .map(|layer| layer.layer_desc().layer_name()) @@ -1915,7 +1971,10 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - Key::MIN, + compaction_key_range + .as_ref() + .map(|x| x.start) + .unwrap_or(Key::MIN), lowest_retain_lsn, self.get_compaction_target_size(), ctx, @@ -1976,28 +2035,35 @@ impl Timeline { } else { let last_key = last_key.as_mut().unwrap(); stat.on_unique_key_visited(); - let retention = self - .generate_key_retention( - *last_key, - &accumulated_values, - gc_cutoff, - &retain_lsns_below_horizon, - COMPACTION_DELTA_THRESHOLD, - get_ancestor_image(self, *last_key, ctx).await?, - ) - .await?; - // Put the image into the image layer. Currently we have a single big layer for the compaction. - retention - .pipe_to( - *last_key, - self, - &mut delta_layer_writer, - image_layer_writer.as_mut(), - &mut stat, - dry_run, - ctx, - ) - .await?; + let skip_adding_key = if let Some(ref compaction_key_range) = compaction_key_range { + !compaction_key_range.contains(last_key) + } else { + false + }; + if !skip_adding_key { + let retention = self + .generate_key_retention( + *last_key, + &accumulated_values, + gc_cutoff, + &retain_lsns_below_horizon, + COMPACTION_DELTA_THRESHOLD, + get_ancestor_image(self, *last_key, ctx).await?, + ) + .await?; + // Put the image into the image layer. Currently we have a single big layer for the compaction. + retention + .pipe_to( + *last_key, + self, + &mut delta_layer_writer, + image_layer_writer.as_mut(), + &mut stat, + dry_run, + ctx, + ) + .await?; + } accumulated_values.clear(); *last_key = key; accumulated_values.push((key, lsn, val)); @@ -2007,28 +2073,36 @@ impl Timeline { let last_key = last_key.expect("no keys produced during compaction"); // TODO: move this part to the loop body stat.on_unique_key_visited(); - let retention = self - .generate_key_retention( - last_key, - &accumulated_values, - gc_cutoff, - &retain_lsns_below_horizon, - COMPACTION_DELTA_THRESHOLD, - get_ancestor_image(self, last_key, ctx).await?, - ) - .await?; - // Put the image into the image layer. Currently we have a single big layer for the compaction. - retention - .pipe_to( - last_key, - self, - &mut delta_layer_writer, - image_layer_writer.as_mut(), - &mut stat, - dry_run, - ctx, - ) - .await?; + + let skip_adding_key = if let Some(ref compaction_key_range) = compaction_key_range { + !compaction_key_range.contains(&last_key) + } else { + false + }; + if !skip_adding_key { + let retention = self + .generate_key_retention( + last_key, + &accumulated_values, + gc_cutoff, + &retain_lsns_below_horizon, + COMPACTION_DELTA_THRESHOLD, + get_ancestor_image(self, last_key, ctx).await?, + ) + .await?; + // Put the image into the image layer. Currently we have a single big layer for the compaction. + retention + .pipe_to( + last_key, + self, + &mut delta_layer_writer, + image_layer_writer.as_mut(), + &mut stat, + dry_run, + ctx, + ) + .await?; + } let discard = |key: &PersistentLayerKey| { let key = key.clone(); @@ -2037,8 +2111,12 @@ impl Timeline { let produced_image_layers = if let Some(writer) = image_layer_writer { if !dry_run { + let end_key = compaction_key_range + .as_ref() + .map(|x| x.end) + .unwrap_or(Key::MAX); writer - .finish_with_discard_fn(self, ctx, Key::MAX, discard) + .finish_with_discard_fn(self, ctx, end_key, discard) .await? } else { let (layers, _) = writer.take()?; @@ -2059,6 +2137,10 @@ impl Timeline { Vec::new() }; + if partial_compaction && !produced_delta_layers.is_empty() { + bail!("partial compaction cannot produce delta layers (for now)"); + } + let mut compact_to = Vec::new(); let mut keep_layers = HashSet::new(); let produced_delta_layers_len = produced_delta_layers.len(); @@ -2090,6 +2172,16 @@ impl Timeline { let mut layer_selection = layer_selection; layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); + if let Some(ref compaction_key_range) = compaction_key_range { + // If a delta layer contains keys that are not compacted (overlapping with the key + // range), we should keep the layer. + layer_selection.retain(|x| { + let key_range = &x.layer_desc().key_range; + key_range.start >= compaction_key_range.start + && key_range.end <= compaction_key_range.end + }); + } + info!( "gc-compaction statistics: {}", serde_json::to_string(&stat)?