From f4a668a27de4557dbfac0b004189c37d068118d5 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." Date: Thu, 1 Aug 2024 10:00:06 -0400 Subject: [PATCH] fix(pageserver): skip existing layers for btm-gc-compaction (#8498) part of https://github.com/neondatabase/neon/issues/8002 Due to the limitation of the current layer map implementation, we cannot directly replace a layer. It's interpreted as an insert and a deletion, and there will be file exist error when renaming the newly-created layer to replace the old layer. We work around that by changing the end key of the image layer. A long-term fix would involve a refactor around the layer file naming. For delta layers, we simply skip layers with the same key range produced, though it is possible to add an extra key as an alternative solution. * The image layer range for the layers generated from gc-compaction will be Key::MIN..(Key..MAX-1), to avoid being recognized as an L0 delta layer. * Skip existing layers if it turns out that we need to generate a layer with the same persistent key in the same generation. Note that it is possible that the newly-generated layer has different content from the existing layer. For example, when the user drops a retain_lsn, the compaction could have combined or dropped some records, therefore creating a smaller layer than the existing one. We discard the "optimized" layer for now because we cannot deal with such rewrites within the same generation. --------- Signed-off-by: Alex Chi Z Co-authored-by: Christian Schwarz --- pageserver/src/tenant.rs | 47 ++- .../src/tenant/storage_layer/layer_desc.rs | 14 + pageserver/src/tenant/timeline/compaction.rs | 279 ++++++++++++++++-- .../src/tenant/timeline/layer_manager.rs | 14 +- 4 files changed, 320 insertions(+), 34 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b9257dfbe8c6..84c5095610d5 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -6963,7 +6963,11 @@ mod tests { vec![ // Image layer at GC horizon PersistentLayerKey { - key_range: Key::MIN..Key::MAX, + key_range: { + let mut key = Key::MAX; + key.field6 -= 1; + Key::MIN..key + }, lsn_range: Lsn(0x30)..Lsn(0x31), is_delta: false }, @@ -6982,6 +6986,15 @@ mod tests { ] ); + // increase GC horizon and compact again + { + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + guard.cutoffs.time = Lsn(0x40); + guard.cutoffs.space = Lsn(0x40); + } + tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + Ok(()) } @@ -7333,6 +7346,15 @@ mod tests { ); } + // increase GC horizon and compact again + { + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + guard.cutoffs.time = Lsn(0x40); + guard.cutoffs.space = Lsn(0x40); + } + tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + Ok(()) } @@ -7837,6 +7859,10 @@ mod tests { ]; let verify_result = || async { + let gc_horizon = { + let gc_info = tline.gc_info.read().unwrap(); + gc_info.cutoffs.time + }; for idx in 0..10 { assert_eq!( tline @@ -7847,7 +7873,7 @@ mod tests { ); assert_eq!( tline - .get(get_key(idx as u32), Lsn(0x30), &ctx) + .get(get_key(idx as u32), gc_horizon, &ctx) .await .unwrap(), &expected_result_at_gc_horizon[idx] @@ -7873,7 +7899,24 @@ mod tests { let cancel = CancellationToken::new(); tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + verify_result().await; + + // compact again + tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + verify_result().await; + + // increase GC horizon and compact again + { + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + guard.cutoffs.time = Lsn(0x38); + guard.cutoffs.space = Lsn(0x38); + } + tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result + // not increasing the GC horizon and compact again + tline.compact_with_gc(&cancel, &ctx).await.unwrap(); verify_result().await; Ok(()) diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index bd765560e4e3..cbd18e650f62 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -41,6 +41,20 @@ pub struct PersistentLayerKey { pub is_delta: bool, } +impl std::fmt::Display for PersistentLayerKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}..{} {}..{} is_delta={}", + self.key_range.start, + self.key_range.end, + self.lsn_range.start, + self.lsn_range.end, + self.is_delta + ) + } +} + impl PersistentLayerDesc { pub fn key(&self) -> PersistentLayerKey { PersistentLayerKey { diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 4fe9bbafab37..61d662d25d66 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -4,7 +4,7 @@ //! //! The old legacy algorithm is implemented directly in `timeline.rs`. -use std::collections::BinaryHeap; +use std::collections::{BinaryHeap, HashSet}; use std::ops::{Deref, Range}; use std::sync::Arc; @@ -30,7 +30,9 @@ use crate::page_cache; use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD}; use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::storage_layer::merge_iterator::MergeIterator; -use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc, ValueReconstructState}; +use crate::tenant::storage_layer::{ + AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState, +}; use crate::tenant::timeline::ImageLayerCreationOutcome; use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter}; use crate::tenant::timeline::{Layer, ResidentLayer}; @@ -1368,7 +1370,7 @@ impl Timeline { pub(crate) async fn generate_key_retention( self: &Arc, key: Key, - history: &[(Key, Lsn, Value)], + full_history: &[(Key, Lsn, Value)], horizon: Lsn, retain_lsn_below_horizon: &[Lsn], delta_threshold_cnt: usize, @@ -1376,14 +1378,14 @@ impl Timeline { ) -> anyhow::Result { // Pre-checks for the invariants if cfg!(debug_assertions) { - for (log_key, _, _) in history { + for (log_key, _, _) in full_history { assert_eq!(log_key, &key, "mismatched key"); } - for i in 1..history.len() { - assert!(history[i - 1].1 <= history[i].1, "unordered LSN"); - if history[i - 1].1 == history[i].1 { + for i in 1..full_history.len() { + assert!(full_history[i - 1].1 <= full_history[i].1, "unordered LSN"); + if full_history[i - 1].1 == full_history[i].1 { assert!( - matches!(history[i - 1].2, Value::Image(_)), + matches!(full_history[i - 1].2, Value::Image(_)), "unordered delta/image, or duplicated delta" ); } @@ -1414,7 +1416,7 @@ impl Timeline { } lsn_split_points.push(horizon); let mut current_idx = 0; - for item @ (_, lsn, _) in history { + for item @ (_, lsn, _) in full_history { while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] { current_idx += 1; } @@ -1459,6 +1461,68 @@ impl Timeline { if let Some((key, lsn, img)) = base_img_from_ancestor { replay_history.push((key, lsn, Value::Image(img))); } + + /// Generate debug information for the replay history + fn generate_history_trace(replay_history: &[(Key, Lsn, Value)]) -> String { + use std::fmt::Write; + let mut output = String::new(); + if let Some((key, _, _)) = replay_history.first() { + write!(output, "key={} ", key).unwrap(); + let mut cnt = 0; + for (_, lsn, val) in replay_history { + if val.is_image() { + write!(output, "i@{} ", lsn).unwrap(); + } else if val.will_init() { + write!(output, "di@{} ", lsn).unwrap(); + } else { + write!(output, "d@{} ", lsn).unwrap(); + } + cnt += 1; + if cnt >= 128 { + write!(output, "... and more").unwrap(); + break; + } + } + } else { + write!(output, "").unwrap(); + } + output + } + + fn generate_debug_trace( + replay_history: Option<&[(Key, Lsn, Value)]>, + full_history: &[(Key, Lsn, Value)], + lsns: &[Lsn], + horizon: Lsn, + ) -> String { + use std::fmt::Write; + let mut output = String::new(); + if let Some(replay_history) = replay_history { + writeln!( + output, + "replay_history: {}", + generate_history_trace(replay_history) + ) + .unwrap(); + } else { + writeln!(output, "replay_history: ",).unwrap(); + } + writeln!( + output, + "full_history: {}", + generate_history_trace(full_history) + ) + .unwrap(); + writeln!( + output, + "when processing: [{}] horizon={}", + lsns.iter().map(|l| format!("{l}")).join(","), + horizon + ) + .unwrap(); + output + } + for (i, split_for_lsn) in split_history.into_iter().enumerate() { // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly. records_since_last_image += split_for_lsn.len(); @@ -1483,10 +1547,27 @@ impl Timeline { } } if let Some((_, _, val)) = replay_history.first() { - assert!(val.will_init(), "invalid history, no base image"); + if !val.will_init() { + return Err(anyhow::anyhow!("invalid history, no base image")).with_context( + || { + generate_debug_trace( + Some(&replay_history), + full_history, + retain_lsn_below_horizon, + horizon, + ) + }, + ); + } } if generate_image && records_since_last_image > 0 { records_since_last_image = 0; + let replay_history_for_debug = if cfg!(debug_assertions) { + Some(replay_history.clone()) + } else { + None + }; + let replay_history_for_debug_ref = replay_history_for_debug.as_deref(); let history = std::mem::take(&mut replay_history); let mut img = None; let mut records = Vec::with_capacity(history.len()); @@ -1494,14 +1575,30 @@ impl Timeline { img = Some((*lsn, val.clone())); for (_, lsn, val) in history.into_iter().skip(1) { let Value::WalRecord(rec) = val else { - panic!("invalid record") + return Err(anyhow::anyhow!( + "invalid record, first record is image, expect walrecords" + )) + .with_context(|| { + generate_debug_trace( + replay_history_for_debug_ref, + full_history, + retain_lsn_below_horizon, + horizon, + ) + }); }; records.push((lsn, rec)); } } else { for (_, lsn, val) in history.into_iter() { let Value::WalRecord(rec) = val else { - panic!("invalid record") + return Err(anyhow::anyhow!("invalid record, first record is walrecord, expect rest are walrecord")) + .with_context(|| generate_debug_trace( + replay_history_for_debug_ref, + full_history, + retain_lsn_below_horizon, + horizon, + )); }; records.push((lsn, rec)); } @@ -1513,12 +1610,11 @@ impl Timeline { replay_history.push((key, request_lsn, Value::Image(img.clone()))); retention.push(vec![(request_lsn, Value::Image(img))]); } else { - retention.push( - split_for_lsn - .iter() - .map(|(_, lsn, value)| (*lsn, value.clone())) - .collect(), - ); + let deltas = split_for_lsn + .iter() + .map(|(_, lsn, value)| (*lsn, value.clone())) + .collect_vec(); + retention.push(deltas); } } let mut result = Vec::with_capacity(retention.len()); @@ -1533,7 +1629,7 @@ impl Timeline { result.push((lsn_split_points[idx], KeyLogAtLsn(logs))); } } - unreachable!() + unreachable!("key retention is empty") } /// An experimental compaction building block that combines compaction with garbage collection. @@ -1544,11 +1640,26 @@ impl Timeline { /// and create delta layers with all deltas >= gc horizon. pub(crate) async fn compact_with_gc( self: &Arc, - _cancel: &CancellationToken, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result<()> { use std::collections::BTreeSet; + // Block other compaction/GC tasks from running for now. GC-compaction could run along + // with legacy compaction tasks in the future. + + let _compaction_lock = tokio::select! { + guard = self.compaction_lock.lock() => guard, + // TODO: refactor to CompactionError to correctly pass cancelled error + _ = cancel.cancelled() => return Err(anyhow!("cancelled")), + }; + + let _gc = tokio::select! { + guard = self.gc_lock.lock() => guard, + // TODO: refactor to CompactionError to correctly pass cancelled error + _ = cancel.cancelled() => return Err(anyhow!("cancelled")), + }; + info!("running enhanced gc bottom-most compaction"); scopeguard::defer! { @@ -1644,6 +1755,13 @@ impl Timeline { let mut accumulated_values = Vec::new(); let mut last_key: Option = None; + enum FlushDeltaResult { + /// Create a new resident layer + CreateResidentLayer(ResidentLayer), + /// Keep an original delta layer + KeepLayer(PersistentLayerKey), + } + #[allow(clippy::too_many_arguments)] async fn flush_deltas( deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>, @@ -1654,7 +1772,7 @@ impl Timeline { lowest_retain_lsn: Lsn, ctx: &RequestContext, last_batch: bool, - ) -> anyhow::Result> { + ) -> anyhow::Result> { // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid // overlapping layers. // @@ -1677,28 +1795,78 @@ impl Timeline { if !need_split && !last_batch { return Ok(None); } - let deltas = std::mem::take(deltas); + let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas); if deltas.is_empty() { return Ok(None); } let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1; + let delta_key = PersistentLayerKey { + key_range: { + let key_start = deltas.first().unwrap().0; + let key_end = deltas.last().unwrap().0.next(); + key_start..key_end + }, + lsn_range: lowest_retain_lsn..end_lsn, + is_delta: true, + }; + { + // Hack: skip delta layer if we need to produce a layer of a same key-lsn. + // + // This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range. + // For example, consider the case where a single delta with range [0x10,0x50) exists. + // And we have branches at LSN 0x10, 0x20, 0x30. + // Then we delete branch @ 0x20. + // Bottom-most compaction may now delete the delta [0x20,0x30). + // And that wouldnt' change the shape of the layer. + // + // Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes. + // That's why it's safe to skip. + let guard = tline.layers.read().await; + + if guard.contains_key(&delta_key) { + let layer_generation = guard.get_from_key(&delta_key).metadata().generation; + drop(guard); + if layer_generation == tline.generation { + // TODO: depending on whether we design this compaction process to run along with + // other compactions, there could be layer map modifications after we drop the + // layer guard, and in case it creates duplicated layer key, we will still error + // in the end. + info!( + key=%delta_key, + ?layer_generation, + "discard delta layer due to duplicated layer in the same generation" + ); + return Ok(Some(FlushDeltaResult::KeepLayer(delta_key))); + } + } + } + let mut delta_layer_writer = DeltaLayerWriter::new( tline.conf, tline.timeline_id, tline.tenant_shard_id, - deltas.first().unwrap().0, + delta_key.key_range.start, lowest_retain_lsn..end_lsn, ctx, ) .await?; - let key_end = deltas.last().unwrap().0.next(); for (key, lsn, val) in deltas { delta_layer_writer.put_value(key, lsn, val, ctx).await?; } - let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?; - Ok(Some(delta_layer)) + let delta_layer = delta_layer_writer + .finish(delta_key.key_range.end, tline, ctx) + .await?; + Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer))) } + // Hack the key range to be min..(max-1). Otherwise, the image layer will be + // interpreted as an L0 delta layer. + let hack_image_layer_range = { + let mut end_key = Key::MAX; + end_key.field6 -= 1; + Key::MIN..end_key + }; + // Only create image layers when there is no ancestor branches. TODO: create covering image layer // when some condition meet. let mut image_layer_writer = if self.ancestor_timeline.is_none() { @@ -1707,7 +1875,7 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - &(Key::MIN..Key::MAX), // covers the full key range + &hack_image_layer_range, // covers the full key range lowest_retain_lsn, ctx, ) @@ -1737,6 +1905,42 @@ impl Timeline { let img = tline.get(key, tline.ancestor_lsn, ctx).await?; Ok(Some((key, tline.ancestor_lsn, img))) } + let image_layer_key = PersistentLayerKey { + key_range: hack_image_layer_range, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn), + is_delta: false, + }; + + // Like with delta layers, it can happen that we re-produce an already existing image layer. + // This could happen when a user triggers force compaction and image generation. In this case, + // it's always safe to rewrite the layer. + let discard_image_layer = { + let guard = self.layers.read().await; + if guard.contains_key(&image_layer_key) { + let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation; + drop(guard); + if layer_generation == self.generation { + // TODO: depending on whether we design this compaction process to run along with + // other compactions, there could be layer map modifications after we drop the + // layer guard, and in case it creates duplicated layer key, we will still error + // in the end. + info!( + key=%image_layer_key, + ?layer_generation, + "discard image layer due to duplicated layer key in the same generation", + ); + true + } else { + false + } + } else { + false + } + }; + + // Actually, we can decide not to write to the image layer at all at this point because + // the key and LSN range are determined. However, to keep things simple here, we still + // create this writer, and discard the writer in the end. let mut delta_values = Vec::new(); let delta_split_points = delta_split_points.into_iter().collect_vec(); @@ -1824,7 +2028,9 @@ impl Timeline { ); assert!(delta_values.is_empty(), "unprocessed keys"); - let image_layer = if let Some(writer) = image_layer_writer { + let image_layer = if discard_image_layer { + None + } else if let Some(writer) = image_layer_writer { Some(writer.finish(self, ctx).await?) } else { None @@ -1835,7 +2041,22 @@ impl Timeline { if image_layer.is_some() { 1 } else { 0 } ); let mut compact_to = Vec::new(); - compact_to.extend(delta_layers); + let mut keep_layers = HashSet::new(); + for action in delta_layers { + match action { + FlushDeltaResult::CreateResidentLayer(layer) => { + compact_to.push(layer); + } + FlushDeltaResult::KeepLayer(l) => { + keep_layers.insert(l); + } + } + } + if discard_image_layer { + keep_layers.insert(image_layer_key); + } + let mut layer_selection = layer_selection; + layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); compact_to.extend(image_layer); // Step 3: Place back to the layer map. { diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 1e4edd34ad53..1bc2acbd3492 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -35,6 +35,10 @@ impl LayerManager { self.layer_fmgr.get_from_desc(desc) } + pub(crate) fn get_from_key(&self, desc: &PersistentLayerKey) -> Layer { + self.layer_fmgr.get_from_key(desc) + } + /// Get an immutable reference to the layer map. /// /// We expect users only to be able to get an immutable layer map. If users want to make modifications, @@ -365,16 +369,20 @@ impl Default for LayerFileManager { } impl LayerFileManager { - fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T { + fn get_from_key(&self, key: &PersistentLayerKey) -> T { // The assumption for the `expect()` is that all code maintains the following invariant: // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. self.0 - .get(&desc.key()) - .with_context(|| format!("get layer from desc: {}", desc.layer_name())) + .get(key) + .with_context(|| format!("get layer from key: {}", key)) .expect("not found") .clone() } + fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T { + self.get_from_key(&desc.key()) + } + fn contains_key(&self, key: &PersistentLayerKey) -> bool { self.0.contains_key(key) }