From 83afea3edbc52447025c741cd11fbf95a3c7bdd8 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." Date: Tue, 6 Aug 2024 10:07:48 +0800 Subject: [PATCH] feat(pageserver): support dry-run for gc-compaction, add statistics (#8557) Add dry-run mode that does not produce any image layer + delta layer. I will use this code to do some experiments and see how much space we can reclaim for tenants on staging. Part of https://github.com/neondatabase/neon/issues/8002 * Add dry-run mode that runs the full compaction process without updating the layer map. (We never call finish on the writers and the files will be removed before exiting the function). * Add compaction statistics and print them at the end of compaction. --------- Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 56 +++++-- .../src/tenant/storage_layer/image_layer.rs | 8 + pageserver/src/tenant/timeline.rs | 1 + pageserver/src/tenant/timeline/compaction.rs | 151 +++++++++++++++++- 4 files changed, 204 insertions(+), 12 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 84c5095610d5..72d3aedd05f5 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -6899,7 +6899,10 @@ mod tests { } let cancel = CancellationToken::new(); - tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); for (idx, expected) in expected_result.iter().enumerate() { assert_eq!( @@ -6993,7 +6996,10 @@ mod tests { guard.cutoffs.time = Lsn(0x40); guard.cutoffs.space = Lsn(0x40); } - tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); Ok(()) } @@ -7327,7 +7333,10 @@ mod tests { } let cancel = CancellationToken::new(); - tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); for idx in 0..10 { assert_eq!( @@ -7353,7 +7362,10 @@ mod tests { guard.cutoffs.time = Lsn(0x40); guard.cutoffs.space = Lsn(0x40); } - tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); Ok(()) } @@ -7898,11 +7910,28 @@ mod tests { verify_result().await; let cancel = CancellationToken::new(); - tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + let mut dryrun_flags = EnumSet::new(); + dryrun_flags.insert(CompactFlags::DryRun); + + tline + .compact_with_gc(&cancel, dryrun_flags, &ctx) + .await + .unwrap(); + // We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs + // cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests. + verify_result().await; + + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); verify_result().await; // compact again - tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); verify_result().await; // increase GC horizon and compact again @@ -7912,11 +7941,17 @@ mod tests { guard.cutoffs.time = Lsn(0x38); guard.cutoffs.space = Lsn(0x38); } - tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result // not increasing the GC horizon and compact again - tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); verify_result().await; Ok(()) @@ -8097,7 +8132,10 @@ mod tests { verify_result().await; let cancel = CancellationToken::new(); - branch_tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + branch_tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); verify_result().await; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index aa308ba3c148..f4f48aaf1645 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -753,6 +753,10 @@ struct ImageLayerWriterInner { } impl ImageLayerWriterInner { + fn size(&self) -> u64 { + self.tree.borrow_writer().size() + self.blob_writer.size() + } + /// /// Start building a new image layer. /// @@ -1044,6 +1048,10 @@ impl ImageLayerWriter { .finish(timeline, ctx, Some(end_key)) .await } + + pub(crate) fn size(&self) -> u64 { + self.inner.as_ref().unwrap().size() + } } impl Drop for ImageLayerWriter { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8c80a54bdd76..5c268bf87500 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -704,6 +704,7 @@ pub(crate) enum CompactFlags { ForceRepartition, ForceImageLayerCreation, EnhancedGcBottomMostCompaction, + DryRun, } impl std::fmt::Debug for Timeline { diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 421f718ad60a..1ff029a31387 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -19,8 +19,10 @@ use bytes::Bytes; use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; +use pageserver_api::key::KEY_SIZE; use pageserver_api::keyspace::ShardedRange; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; +use serde::Serialize; use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, trace, warn, Instrument}; use utils::id::TimelineId; @@ -41,6 +43,7 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile}; use crate::keyspace::KeySpace; use crate::repository::{Key, Value}; +use crate::walrecord::NeonWalRecord; use utils::lsn::Lsn; @@ -73,6 +76,7 @@ impl KeyHistoryRetention { key: Key, delta_writer: &mut Vec<(Key, Lsn, Value)>, mut image_writer: Option<&mut ImageLayerWriter>, + stat: &mut CompactionStatistics, ctx: &RequestContext, ) -> anyhow::Result<()> { let mut first_batch = true; @@ -82,6 +86,7 @@ impl KeyHistoryRetention { let Value::Image(img) = &logs[0].1 else { unreachable!() }; + stat.produce_image_key(img); if let Some(image_writer) = image_writer.as_mut() { image_writer.put_image(key, img.clone(), ctx).await?; } else { @@ -89,24 +94,111 @@ impl KeyHistoryRetention { } } else { for (lsn, val) in logs { + stat.produce_key(&val); delta_writer.push((key, lsn, val)); } } first_batch = false; } else { for (lsn, val) in logs { + stat.produce_key(&val); delta_writer.push((key, lsn, val)); } } } let KeyLogAtLsn(above_horizon_logs) = self.above_horizon; for (lsn, val) in above_horizon_logs { + stat.produce_key(&val); delta_writer.push((key, lsn, val)); } Ok(()) } } +#[derive(Debug, Serialize, Default)] +struct CompactionStatisticsNumSize { + num: u64, + size: u64, +} + +#[derive(Debug, Serialize, Default)] +pub struct CompactionStatistics { + delta_layer_visited: CompactionStatisticsNumSize, + image_layer_visited: CompactionStatisticsNumSize, + delta_layer_produced: CompactionStatisticsNumSize, + image_layer_produced: CompactionStatisticsNumSize, + num_delta_layer_discarded: usize, + num_image_layer_discarded: usize, + num_unique_keys_visited: usize, + wal_keys_visited: CompactionStatisticsNumSize, + image_keys_visited: CompactionStatisticsNumSize, + wal_produced: CompactionStatisticsNumSize, + image_produced: CompactionStatisticsNumSize, +} + +impl CompactionStatistics { + fn estimated_size_of_value(val: &Value) -> usize { + match val { + Value::Image(img) => img.len(), + Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(), + _ => std::mem::size_of::(), + } + } + fn estimated_size_of_key() -> usize { + KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer) + } + fn visit_delta_layer(&mut self, size: u64) { + self.delta_layer_visited.num += 1; + self.delta_layer_visited.size += size; + } + fn visit_image_layer(&mut self, size: u64) { + self.image_layer_visited.num += 1; + self.image_layer_visited.size += size; + } + fn on_unique_key_visited(&mut self) { + self.num_unique_keys_visited += 1; + } + fn visit_wal_key(&mut self, val: &Value) { + self.wal_keys_visited.num += 1; + self.wal_keys_visited.size += + Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64; + } + fn visit_image_key(&mut self, val: &Value) { + self.image_keys_visited.num += 1; + self.image_keys_visited.size += + Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64; + } + fn produce_key(&mut self, val: &Value) { + match val { + Value::Image(img) => self.produce_image_key(img), + Value::WalRecord(_) => self.produce_wal_key(val), + } + } + fn produce_wal_key(&mut self, val: &Value) { + self.wal_produced.num += 1; + self.wal_produced.size += + Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64; + } + fn produce_image_key(&mut self, val: &Bytes) { + self.image_produced.num += 1; + self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64; + } + fn discard_delta_layer(&mut self) { + self.num_delta_layer_discarded += 1; + } + fn discard_image_layer(&mut self) { + self.num_image_layer_discarded += 1; + } + fn produce_delta_layer(&mut self, size: u64) { + self.delta_layer_produced.num += 1; + self.delta_layer_produced.size += size; + } + fn produce_image_layer(&mut self, size: u64) { + self.image_layer_produced.num += 1; + self.image_layer_produced.size += size; + } +} + impl Timeline { /// TODO: cancellation /// @@ -118,12 +210,18 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) { - self.compact_with_gc(cancel, ctx) + self.compact_with_gc(cancel, flags, ctx) .await .map_err(CompactionError::Other)?; return Ok(false); } + if flags.contains(CompactFlags::DryRun) { + return Err(CompactionError::Other(anyhow!( + "dry-run mode is not supported for legacy compaction for now" + ))); + } + // High level strategy for compaction / image creation: // // 1. First, calculate the desired "partitioning" of the @@ -1641,6 +1739,7 @@ impl Timeline { pub(crate) async fn compact_with_gc( self: &Arc, cancel: &CancellationToken, + flags: EnumSet, ctx: &RequestContext, ) -> anyhow::Result<()> { use std::collections::BTreeSet; @@ -1664,12 +1763,16 @@ impl Timeline { ) .await?; - info!("running enhanced gc bottom-most compaction"); + let dry_run = flags.contains(CompactFlags::DryRun); + + info!("running enhanced gc bottom-most compaction, dry_run={dry_run}"); scopeguard::defer! { info!("done enhanced gc bottom-most compaction"); }; + let mut stat = CompactionStatistics::default(); + // Step 0: pick all delta layers + image layers below/intersect with the GC horizon. // The layer selection has the following properties: // 1. If a layer is in the selection, all layers below it are in the selection. @@ -1740,6 +1843,9 @@ impl Timeline { let key_range = desc.get_key_range(); delta_split_points.insert(key_range.start); delta_split_points.insert(key_range.end); + stat.visit_delta_layer(desc.file_size()); + } else { + stat.visit_image_layer(desc.file_size()); } } let mut delta_layers = Vec::new(); @@ -1775,6 +1881,8 @@ impl Timeline { tline: &Arc, lowest_retain_lsn: Lsn, ctx: &RequestContext, + stats: &mut CompactionStatistics, + dry_run: bool, last_batch: bool, ) -> anyhow::Result> { // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid @@ -1831,6 +1939,7 @@ impl Timeline { let layer_generation = guard.get_from_key(&delta_key).metadata().generation; drop(guard); if layer_generation == tline.generation { + stats.discard_delta_layer(); // TODO: depending on whether we design this compaction process to run along with // other compactions, there could be layer map modifications after we drop the // layer guard, and in case it creates duplicated layer key, we will still error @@ -1857,6 +1966,10 @@ impl Timeline { for (key, lsn, val) in deltas { delta_layer_writer.put_value(key, lsn, val, ctx).await?; } + stats.produce_delta_layer(delta_layer_writer.size()); + if dry_run { + return Ok(None); + } let delta_layer = delta_layer_writer .finish(delta_key.key_range.end, tline, ctx) .await?; @@ -1951,6 +2064,13 @@ impl Timeline { let mut current_delta_split_point = 0; let mut delta_layers = Vec::new(); while let Some((key, lsn, val)) = merge_iter.next().await? { + if cancel.is_cancelled() { + return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error + } + match val { + Value::Image(_) => stat.visit_image_key(&val), + Value::WalRecord(_) => stat.visit_wal_key(&val), + } if last_key.is_none() || last_key.as_ref() == Some(&key) { if last_key.is_none() { last_key = Some(key); @@ -1958,6 +2078,7 @@ impl Timeline { accumulated_values.push((key, lsn, val)); } else { let last_key = last_key.as_mut().unwrap(); + stat.on_unique_key_visited(); let retention = self .generate_key_retention( *last_key, @@ -1974,6 +2095,7 @@ impl Timeline { *last_key, &mut delta_values, image_layer_writer.as_mut(), + &mut stat, ctx, ) .await?; @@ -1986,6 +2108,8 @@ impl Timeline { self, lowest_retain_lsn, ctx, + &mut stat, + dry_run, false, ) .await?, @@ -1998,6 +2122,7 @@ impl Timeline { let last_key = last_key.expect("no keys produced during compaction"); // TODO: move this part to the loop body + stat.on_unique_key_visited(); let retention = self .generate_key_retention( last_key, @@ -2014,6 +2139,7 @@ impl Timeline { last_key, &mut delta_values, image_layer_writer.as_mut(), + &mut stat, ctx, ) .await?; @@ -2026,6 +2152,8 @@ impl Timeline { self, lowest_retain_lsn, ctx, + &mut stat, + dry_run, true, ) .await?, @@ -2033,12 +2161,28 @@ impl Timeline { assert!(delta_values.is_empty(), "unprocessed keys"); let image_layer = if discard_image_layer { + stat.discard_image_layer(); None } else if let Some(writer) = image_layer_writer { - Some(writer.finish(self, ctx).await?) + stat.produce_image_layer(writer.size()); + if !dry_run { + Some(writer.finish(self, ctx).await?) + } else { + None + } } else { None }; + + info!( + "gc-compaction statistics: {}", + serde_json::to_string(&stat)? + ); + + if dry_run { + return Ok(()); + } + info!( "produced {} delta layers and {} image layers", delta_layers.len(), @@ -2062,6 +2206,7 @@ impl Timeline { let mut layer_selection = layer_selection; layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); compact_to.extend(image_layer); + // Step 3: Place back to the layer map. { let mut guard = self.layers.write().await;