From 5ff4b991c74141505927a2498310b20c617a8786 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:23:24 -0500 Subject: [PATCH] feat(pageserver): gc-compaction split over LSN (#9900) ## Problem part of https://github.com/neondatabase/neon/issues/9114, stacked PR over https://github.com/neondatabase/neon/pull/9897, partially refactored to help with https://github.com/neondatabase/neon/issues/10031 ## Summary of changes * gc-compaction takes `above_lsn` parameter. We only compact the layers above this LSN, and all data below the LSN are treated as if they are on the ancestor branch. * refactored gc-compaction to take `GcCompactJob` that describes the rectangular range to be compacted. * Added unit test for this case. --------- Signed-off-by: Alex Chi Z Co-authored-by: Christian Schwarz --- pageserver/src/http/routes.rs | 13 +- pageserver/src/tenant.rs | 661 ++++++++++++++++++- pageserver/src/tenant/timeline.rs | 69 +- pageserver/src/tenant/timeline/compaction.rs | 233 ++++--- test_runner/regress/test_compaction.py | 4 +- 5 files changed, 878 insertions(+), 102 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 6e9ee976f41e..db7d29385641 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2081,13 +2081,20 @@ async fn timeline_compact_handler( .as_ref() .map(|r| r.sub_compaction) .unwrap_or(false); + let sub_compaction_max_job_size_mb = compact_request + .as_ref() + .and_then(|r| r.sub_compaction_max_job_size_mb); + let options = CompactOptions { - compact_range: compact_request + compact_key_range: compact_request + .as_ref() + .and_then(|r| r.compact_key_range.clone()), + compact_lsn_range: compact_request .as_ref() - .and_then(|r| r.compact_range.clone()), - compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn), + .and_then(|r| r.compact_lsn_range.clone()), flags, sub_compaction, + sub_compaction_max_job_size_mb, }; let scheduled = compact_request diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 92078e4b087c..99289d5f15f7 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -44,6 +44,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; +use timeline::compaction::GcCompactJob; use timeline::compaction::ScheduledCompactionTask; use timeline::import_pgdata; use timeline::offload::offload_timeline; @@ -3017,8 +3018,15 @@ impl Tenant { warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options); } else if next_scheduled_compaction_task.options.sub_compaction { info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); - let jobs = timeline - .gc_compaction_split_jobs(next_scheduled_compaction_task.options) + let jobs: Vec = timeline + .gc_compaction_split_jobs( + GcCompactJob::from_compact_options( + next_scheduled_compaction_task.options.clone(), + ), + next_scheduled_compaction_task + .options + .sub_compaction_max_job_size_mb, + ) .await .map_err(CompactionError::Other)?; if jobs.is_empty() { @@ -3029,9 +3037,23 @@ impl Tenant { let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); let tline_pending_tasks = guard.entry(*timeline_id).or_default(); for (idx, job) in jobs.into_iter().enumerate() { + // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions` + // until we do further refactors to allow directly call `compact_with_gc`. + let mut flags: EnumSet = EnumSet::default(); + flags |= CompactFlags::EnhancedGcBottomMostCompaction; + if job.dry_run { + flags |= CompactFlags::DryRun; + } + let options = CompactOptions { + flags, + sub_compaction: false, + compact_key_range: Some(job.compact_key_range.into()), + compact_lsn_range: Some(job.compact_lsn_range.into()), + sub_compaction_max_job_size_mb: None, + }; tline_pending_tasks.push_back(if idx == jobs_len - 1 { ScheduledCompactionTask { - options: job, + options, // The last job in the queue sends the signal and releases the gc guard result_tx: next_scheduled_compaction_task .result_tx @@ -3042,7 +3064,7 @@ impl Tenant { } } else { ScheduledCompactionTask { - options: job, + options, result_tx: None, gc_block: None, } @@ -5742,6 +5764,8 @@ mod tests { #[cfg(feature = "testing")] use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn}; #[cfg(feature = "testing")] + use timeline::CompactLsnRange; + #[cfg(feature = "testing")] use timeline::GcInfo; static TEST_KEY: Lazy = @@ -9333,7 +9357,6 @@ mod tests { &cancel, CompactOptions { flags: dryrun_flags, - compact_range: None, ..Default::default() }, &ctx, @@ -9582,7 +9605,6 @@ mod tests { &cancel, CompactOptions { flags: dryrun_flags, - compact_range: None, ..Default::default() }, &ctx, @@ -9612,6 +9634,8 @@ mod tests { #[cfg(feature = "testing")] #[tokio::test] async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> { + use timeline::CompactLsnRange; + let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?; let (tenant, ctx) = harness.load().await; @@ -9804,6 +9828,22 @@ mod tests { verify_result().await; + // Piggyback a compaction with above_lsn. Ensure it works correctly when the specified LSN intersects with the layer files. + // Now we already have a single large delta layer, so the compaction min_layer_lsn should be the same as ancestor LSN (0x18). + branch_tline + .compact_with_gc( + &cancel, + CompactOptions { + compact_lsn_range: Some(CompactLsnRange::above(Lsn(0x40))), + ..Default::default() + }, + &ctx, + ) + .await + .unwrap(); + + verify_result().await; + Ok(()) } @@ -10092,7 +10132,7 @@ mod tests { &cancel, CompactOptions { flags: EnumSet::new(), - compact_range: Some((get_key(0)..get_key(2)).into()), + compact_key_range: Some((get_key(0)..get_key(2)).into()), ..Default::default() }, &ctx, @@ -10139,7 +10179,7 @@ mod tests { &cancel, CompactOptions { flags: EnumSet::new(), - compact_range: Some((get_key(2)..get_key(4)).into()), + compact_key_range: Some((get_key(2)..get_key(4)).into()), ..Default::default() }, &ctx, @@ -10191,7 +10231,7 @@ mod tests { &cancel, CompactOptions { flags: EnumSet::new(), - compact_range: Some((get_key(4)..get_key(9)).into()), + compact_key_range: Some((get_key(4)..get_key(9)).into()), ..Default::default() }, &ctx, @@ -10242,7 +10282,7 @@ mod tests { &cancel, CompactOptions { flags: EnumSet::new(), - compact_range: Some((get_key(9)..get_key(10)).into()), + compact_key_range: Some((get_key(9)..get_key(10)).into()), ..Default::default() }, &ctx, @@ -10298,7 +10338,7 @@ mod tests { &cancel, CompactOptions { flags: EnumSet::new(), - compact_range: Some((get_key(0)..get_key(10)).into()), + compact_key_range: Some((get_key(0)..get_key(10)).into()), ..Default::default() }, &ctx, @@ -10327,7 +10367,6 @@ mod tests { }, ], ); - Ok(()) } @@ -10380,4 +10419,602 @@ mod tests { Ok(()) } + + #[cfg(feature = "testing")] + #[tokio::test] + async fn test_simple_bottom_most_compaction_above_lsn() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_simple_bottom_most_compaction_above_lsn").await?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + // using aux key here b/c they are guaranteed to be inside `collect_keyspace`. + let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let img_layer = (0..10) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) + .collect_vec(); + + let delta1 = vec![( + get_key(1), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + )]; + let delta4 = vec![( + get_key(1), + Lsn(0x28), + Value::WalRecord(NeonWalRecord::wal_append("@0x28")), + )]; + let delta2 = vec![ + ( + get_key(1), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(1), + Lsn(0x38), + Value::WalRecord(NeonWalRecord::wal_append("@0x38")), + ), + ]; + let delta3 = vec![ + ( + get_key(8), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ( + get_key(9), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ]; + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![ + // delta1/2/4 only contain a single key but multiple updates + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x28), delta1), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x30)..Lsn(0x50), delta2), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x28)..Lsn(0x30), delta4), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x30)..Lsn(0x50), delta3), + ], // delta layers + vec![(Lsn(0x10), img_layer)], // image layers + Lsn(0x50), + ) + .await?; + { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x30)) + .wait() + .await; + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![ + (Lsn(0x10), tline.timeline_id, MaybeOffloaded::No), + (Lsn(0x20), tline.timeline_id, MaybeOffloaded::No), + ], + cutoffs: GcCutoffs { + time: Lsn(0x30), + space: Lsn(0x30), + }, + leases: Default::default(), + within_ancestor_pitr: false, + }; + } + + let expected_result = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30@0x38"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10@0x48"), + Bytes::from_static(b"value 9@0x10@0x48"), + ]; + + let expected_result_at_gc_horizon = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_20 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_10 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let verify_result = || async { + let gc_horizon = { + let gc_info = tline.gc_info.read().unwrap(); + gc_info.cutoffs.time + }; + for idx in 0..10 { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + &expected_result[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), gc_horizon, &ctx) + .await + .unwrap(), + &expected_result_at_gc_horizon[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x20), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_20[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x10), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_10[idx] + ); + } + }; + + verify_result().await; + + let cancel = CancellationToken::new(); + tline + .compact_with_gc( + &cancel, + CompactOptions { + compact_lsn_range: Some(CompactLsnRange::above(Lsn(0x28))), + ..Default::default() + }, + &ctx, + ) + .await + .unwrap(); + verify_result().await; + + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + // The original image layer, not compacted + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + // Delta layer below the specified above_lsn not compacted + PersistentLayerKey { + key_range: get_key(1)..get_key(2), + lsn_range: Lsn(0x20)..Lsn(0x28), + is_delta: true, + }, + // Delta layer compacted above the LSN + PersistentLayerKey { + key_range: get_key(1)..get_key(10), + lsn_range: Lsn(0x28)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + // compact again + tline + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) + .await + .unwrap(); + verify_result().await; + + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + // The compacted image layer (full key range) + PersistentLayerKey { + key_range: Key::MIN..Key::MAX, + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + // All other data in the delta layer + PersistentLayerKey { + key_range: get_key(1)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + Ok(()) + } + + #[cfg(feature = "testing")] + #[tokio::test] + async fn test_simple_bottom_most_compaction_rectangle() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_simple_bottom_most_compaction_rectangle").await?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + // using aux key here b/c they are guaranteed to be inside `collect_keyspace`. + let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let img_layer = (0..10) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) + .collect_vec(); + + let delta1 = vec![( + get_key(1), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + )]; + let delta4 = vec![( + get_key(1), + Lsn(0x28), + Value::WalRecord(NeonWalRecord::wal_append("@0x28")), + )]; + let delta2 = vec![ + ( + get_key(1), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(1), + Lsn(0x38), + Value::WalRecord(NeonWalRecord::wal_append("@0x38")), + ), + ]; + let delta3 = vec![ + ( + get_key(8), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ( + get_key(9), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ]; + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![ + // delta1/2/4 only contain a single key but multiple updates + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x28), delta1), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x30)..Lsn(0x50), delta2), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x28)..Lsn(0x30), delta4), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x30)..Lsn(0x50), delta3), + ], // delta layers + vec![(Lsn(0x10), img_layer)], // image layers + Lsn(0x50), + ) + .await?; + { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x30)) + .wait() + .await; + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![ + (Lsn(0x10), tline.timeline_id, MaybeOffloaded::No), + (Lsn(0x20), tline.timeline_id, MaybeOffloaded::No), + ], + cutoffs: GcCutoffs { + time: Lsn(0x30), + space: Lsn(0x30), + }, + leases: Default::default(), + within_ancestor_pitr: false, + }; + } + + let expected_result = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30@0x38"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10@0x48"), + Bytes::from_static(b"value 9@0x10@0x48"), + ]; + + let expected_result_at_gc_horizon = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_20 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_10 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let verify_result = || async { + let gc_horizon = { + let gc_info = tline.gc_info.read().unwrap(); + gc_info.cutoffs.time + }; + for idx in 0..10 { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + &expected_result[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), gc_horizon, &ctx) + .await + .unwrap(), + &expected_result_at_gc_horizon[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x20), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_20[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x10), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_10[idx] + ); + } + }; + + verify_result().await; + + let cancel = CancellationToken::new(); + + tline + .compact_with_gc( + &cancel, + CompactOptions { + compact_key_range: Some((get_key(0)..get_key(2)).into()), + compact_lsn_range: Some((Lsn(0x20)..Lsn(0x28)).into()), + ..Default::default() + }, + &ctx, + ) + .await + .unwrap(); + verify_result().await; + + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + // The original image layer, not compacted + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + // According the selection logic, we select all layers with start key <= 0x28, so we would merge the layer 0x20-0x28 and + // the layer 0x28-0x30 into one. + PersistentLayerKey { + key_range: get_key(1)..get_key(2), + lsn_range: Lsn(0x20)..Lsn(0x30), + is_delta: true, + }, + // Above the upper bound and untouched + PersistentLayerKey { + key_range: get_key(1)..get_key(2), + lsn_range: Lsn(0x30)..Lsn(0x50), + is_delta: true, + }, + // This layer is untouched + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x30)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + tline + .compact_with_gc( + &cancel, + CompactOptions { + compact_key_range: Some((get_key(3)..get_key(8)).into()), + compact_lsn_range: Some((Lsn(0x28)..Lsn(0x40)).into()), + ..Default::default() + }, + &ctx, + ) + .await + .unwrap(); + verify_result().await; + + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + // The original image layer, not compacted + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + // Not in the compaction key range, uncompacted + PersistentLayerKey { + key_range: get_key(1)..get_key(2), + lsn_range: Lsn(0x20)..Lsn(0x30), + is_delta: true, + }, + // Not in the compaction key range, uncompacted but need rewrite because the delta layer overlaps with the range + PersistentLayerKey { + key_range: get_key(1)..get_key(2), + lsn_range: Lsn(0x30)..Lsn(0x50), + is_delta: true, + }, + // Note that when we specify the LSN upper bound to be 0x40, the compaction algorithm will not try to cut the layer + // horizontally in half. Instead, it will include all LSNs that overlap with 0x40. So the real max_lsn of the compaction + // becomes 0x50. + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x30)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + // compact again + tline + .compact_with_gc( + &cancel, + CompactOptions { + compact_key_range: Some((get_key(0)..get_key(5)).into()), + compact_lsn_range: Some((Lsn(0x20)..Lsn(0x50)).into()), + ..Default::default() + }, + &ctx, + ) + .await + .unwrap(); + verify_result().await; + + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + // The original image layer, not compacted + PersistentLayerKey { + key_range: get_key(0)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + // The range gets compacted + PersistentLayerKey { + key_range: get_key(1)..get_key(2), + lsn_range: Lsn(0x20)..Lsn(0x50), + is_delta: true, + }, + // Not touched during this iteration of compaction + PersistentLayerKey { + key_range: get_key(8)..get_key(10), + lsn_range: Lsn(0x30)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + // final full compaction + tline + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) + .await + .unwrap(); + verify_result().await; + + let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await; + check_layer_map_key_eq( + all_layers, + vec![ + // The compacted image layer (full key range) + PersistentLayerKey { + key_range: Key::MIN..Key::MAX, + lsn_range: Lsn(0x10)..Lsn(0x11), + is_delta: false, + }, + // All other data in the delta layer + PersistentLayerKey { + key_range: get_key(1)..get_key(10), + lsn_range: Lsn(0x10)..Lsn(0x50), + is_delta: true, + }, + ], + ); + + Ok(()) + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8f1d5f6577a6..b5c707922668 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -780,46 +780,90 @@ pub(crate) enum CompactFlags { #[serde_with::serde_as] #[derive(Debug, Clone, serde::Deserialize)] pub(crate) struct CompactRequest { - pub compact_range: Option, - pub compact_below_lsn: Option, + pub compact_key_range: Option, + pub compact_lsn_range: Option, /// Whether the compaction job should be scheduled. #[serde(default)] pub scheduled: bool, /// Whether the compaction job should be split across key ranges. #[serde(default)] pub sub_compaction: bool, + /// Max job size for each subcompaction job. + pub sub_compaction_max_job_size_mb: Option, } #[serde_with::serde_as] #[derive(Debug, Clone, serde::Deserialize)] -pub(crate) struct CompactRange { +pub(crate) struct CompactLsnRange { + pub start: Lsn, + pub end: Lsn, +} + +#[serde_with::serde_as] +#[derive(Debug, Clone, serde::Deserialize)] +pub(crate) struct CompactKeyRange { #[serde_as(as = "serde_with::DisplayFromStr")] pub start: Key, #[serde_as(as = "serde_with::DisplayFromStr")] pub end: Key, } -impl From> for CompactRange { +impl From> for CompactLsnRange { + fn from(range: Range) -> Self { + Self { + start: range.start, + end: range.end, + } + } +} + +impl From> for CompactKeyRange { fn from(range: Range) -> Self { - CompactRange { + Self { start: range.start, end: range.end, } } } +impl From for Range { + fn from(range: CompactLsnRange) -> Self { + range.start..range.end + } +} + +impl From for Range { + fn from(range: CompactKeyRange) -> Self { + range.start..range.end + } +} + +impl CompactLsnRange { + #[cfg(test)] + #[cfg(feature = "testing")] + pub fn above(lsn: Lsn) -> Self { + Self { + start: lsn, + end: Lsn::MAX, + } + } +} + #[derive(Debug, Clone, Default)] pub(crate) struct CompactOptions { pub flags: EnumSet, /// If set, the compaction will only compact the key range specified by this option. - /// This option is only used by GC compaction. - pub compact_range: Option, - /// If set, the compaction will only compact the LSN below this value. - /// This option is only used by GC compaction. - pub compact_below_lsn: Option, + /// This option is only used by GC compaction. For the full explanation, see [`compaction::GcCompactJob`]. + pub compact_key_range: Option, + /// If set, the compaction will only compact the LSN within this value. + /// This option is only used by GC compaction. For the full explanation, see [`compaction::GcCompactJob`]. + pub compact_lsn_range: Option, /// Enable sub-compaction (split compaction job across key ranges). /// This option is only used by GC compaction. pub sub_compaction: bool, + /// Set job size for the GC compaction. + /// This option is only used by GC compaction. + pub sub_compaction_max_job_size_mb: Option, } impl std::fmt::Debug for Timeline { @@ -1641,9 +1685,10 @@ impl Timeline { cancel, CompactOptions { flags, - compact_range: None, - compact_below_lsn: None, + compact_key_range: None, + compact_lsn_range: None, sub_compaction: false, + sub_compaction_max_job_size_mb: None, }, ctx, ) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index fa924d23b01c..701247194ba4 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use super::layer_manager::LayerManager; use super::{ - CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder, - ImageLayerCreationMode, RecordedDuration, Timeline, + CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode, + RecordedDuration, Timeline, }; use anyhow::{anyhow, bail, Context}; @@ -64,6 +64,9 @@ const COMPACTION_DELTA_THRESHOLD: usize = 5; /// A scheduled compaction task. pub(crate) struct ScheduledCompactionTask { + /// It's unfortunate that we need to store a compact options struct here because the only outer + /// API we can call here is `compact_with_options` which does a few setup calls before starting the + /// actual compaction job... We should refactor this to store `GcCompactionJob` in the future. pub options: CompactOptions, /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender. pub result_tx: Option>, @@ -71,16 +74,57 @@ pub(crate) struct ScheduledCompactionTask { pub gc_block: Option, } +/// A job description for the gc-compaction job. This structure describes the rectangle range that the job will +/// process. The exact layers that need to be compacted/rewritten will be generated when `compact_with_gc` gets +/// called. +#[derive(Debug, Clone)] +pub(crate) struct GcCompactJob { + pub dry_run: bool, + /// The key range to be compacted. The compaction algorithm will only regenerate key-value pairs within this range + /// [left inclusive, right exclusive), and other pairs will be rewritten into new files if necessary. + pub compact_key_range: Range, + /// The LSN range to be compacted. The compaction algorithm will use this range to determine the layers to be + /// selected for the compaction, and it does not guarantee the generated layers will have exactly the same LSN range + /// as specified here. The true range being compacted is `min_lsn/max_lsn` in [`GcCompactionJobDescription`]. + /// min_lsn will always <= the lower bound specified here, and max_lsn will always >= the upper bound specified here. + pub compact_lsn_range: Range, +} + +impl GcCompactJob { + pub fn from_compact_options(options: CompactOptions) -> Self { + GcCompactJob { + dry_run: options.flags.contains(CompactFlags::DryRun), + compact_key_range: options + .compact_key_range + .map(|x| x.into()) + .unwrap_or(Key::MIN..Key::MAX), + compact_lsn_range: options + .compact_lsn_range + .map(|x| x.into()) + .unwrap_or(Lsn::INVALID..Lsn::MAX), + } + } +} + +/// A job description for the gc-compaction job. This structure is generated when `compact_with_gc` is called +/// and contains the exact layers we want to compact. pub struct GcCompactionJobDescription { /// All layers to read in the compaction job selected_layers: Vec, - /// GC cutoff of the job + /// GC cutoff of the job. This is the lowest LSN that will be accessed by the read/GC path and we need to + /// keep all deltas <= this LSN or generate an image == this LSN. gc_cutoff: Lsn, - /// LSNs to retain for the job + /// LSNs to retain for the job. Read path will use this LSN so we need to keep deltas <= this LSN or + /// generate an image == this LSN. retain_lsns_below_horizon: Vec, - /// Maximum layer LSN processed in this compaction + /// Maximum layer LSN processed in this compaction, that is max(end_lsn of layers). Exclusive. All data + /// \>= this LSN will be kept and will not be rewritten. max_layer_lsn: Lsn, - /// Only compact layers overlapping with this range + /// Minimum layer LSN processed in this compaction, that is min(start_lsn of layers). Inclusive. + /// All access below (strict lower than `<`) this LSN will be routed through the normal read path instead of + /// k-merge within gc-compaction. + min_layer_lsn: Lsn, + /// Only compact layers overlapping with this range. compaction_key_range: Range, /// When partial compaction is enabled, these layers need to be rewritten to ensure no overlap. /// This field is here solely for debugging. The field will not be read once the compaction @@ -299,7 +343,7 @@ impl Timeline { ))); } - if options.compact_range.is_some() { + if options.compact_key_range.is_some() || options.compact_lsn_range.is_some() { // maybe useful in the future? could implement this at some point return Err(CompactionError::Other(anyhow!( "compaction range is not supported for legacy compaction for now" @@ -1754,25 +1798,26 @@ impl Timeline { Ok(()) } - /// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of - /// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much - /// ad-hoc information about gc compaction itself. + /// Split a gc-compaction job into multiple compaction jobs. The split is based on the key range and the estimated size of the compaction job. + /// The function returns a list of compaction jobs that can be executed separately. If the upper bound of the compact LSN + /// range is not specified, we will use the latest gc_cutoff as the upper bound, so that all jobs in the jobset acts + /// like a full compaction of the specified keyspace. pub(crate) async fn gc_compaction_split_jobs( self: &Arc, - options: CompactOptions, - ) -> anyhow::Result> { - if !options.sub_compaction { - return Ok(vec![options]); - } - let compact_range = options.compact_range.clone().unwrap_or(CompactRange { - start: Key::MIN, - end: Key::MAX, - }); - let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn { - compact_below_lsn + job: GcCompactJob, + sub_compaction_max_job_size_mb: Option, + ) -> anyhow::Result> { + let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX { + job.compact_lsn_range.end } else { *self.get_latest_gc_cutoff_lsn() // use the real gc cutoff }; + + // Split compaction job to about 4GB each + const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; + let sub_compaction_max_job_size_mb = + sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB); + let mut compact_jobs = Vec::new(); // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning // by estimating the amount of files read for a compaction job. We should also partition on LSN. @@ -1808,8 +1853,8 @@ impl Timeline { let Some((start, end)) = truncate_to( &range.start, &range.end, - &compact_range.start, - &compact_range.end, + &job.compact_key_range.start, + &job.compact_key_range.end, ) else { continue; }; @@ -1819,8 +1864,6 @@ impl Timeline { let guard = self.layers.read().await; let layer_map = guard.layer_map()?; let mut current_start = None; - // Split compaction job to about 2GB each - const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future let ranges_num = split_key_ranges.len(); for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() { if current_start.is_none() { @@ -1833,8 +1876,7 @@ impl Timeline { } let res = layer_map.range_search(start..end, compact_below_lsn); let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::(); - if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 { - let mut compact_options = options.clone(); + if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 { // Try to extend the compaction range so that we include at least one full layer file. let extended_end = res .found @@ -1852,10 +1894,11 @@ impl Timeline { "splitting compaction job: {}..{}, estimated_size={}", start, end, total_size ); - compact_options.compact_range = Some(CompactRange { start, end }); - compact_options.compact_below_lsn = Some(compact_below_lsn); - compact_options.sub_compaction = false; - compact_jobs.push(compact_options); + compact_jobs.push(GcCompactJob { + dry_run: job.dry_run, + compact_key_range: start..end, + compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn, + }); current_start = Some(end); } } @@ -1877,7 +1920,7 @@ impl Timeline { /// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not /// part of the range. /// - /// If `options.compact_below_lsn` is provided, the compaction will only compact layers below or intersect with + /// If `options.compact_lsn_range.end` is provided, the compaction will only compact layers below or intersect with /// the LSN. Otherwise, it will use the gc cutoff by default. pub(crate) async fn compact_with_gc( self: &Arc, @@ -1885,9 +1928,13 @@ impl Timeline { options: CompactOptions, ctx: &RequestContext, ) -> anyhow::Result<()> { - if options.sub_compaction { + let sub_compaction = options.sub_compaction; + let job = GcCompactJob::from_compact_options(options.clone()); + if sub_compaction { info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); - let jobs = self.gc_compaction_split_jobs(options).await?; + let jobs = self + .gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb) + .await?; let jobs_len = jobs.len(); for (idx, job) in jobs.into_iter().enumerate() { info!( @@ -1902,19 +1949,15 @@ impl Timeline { } return Ok(()); } - self.compact_with_gc_inner(cancel, options, ctx).await + self.compact_with_gc_inner(cancel, job, ctx).await } async fn compact_with_gc_inner( self: &Arc, cancel: &CancellationToken, - options: CompactOptions, + job: GcCompactJob, ctx: &RequestContext, ) -> anyhow::Result<()> { - assert!( - !options.sub_compaction, - "sub-compaction should be handled by the outer function" - ); // Block other compaction/GC tasks from running for now. GC-compaction could run along // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. // Note that we already acquired the compaction lock when the outer `compact` function gets called. @@ -1934,19 +1977,11 @@ impl Timeline { ) .await?; - let flags = options.flags; - let compaction_key_range = options - .compact_range - .map(|range| range.start..range.end) - .unwrap_or_else(|| Key::MIN..Key::MAX); + let dry_run = job.dry_run; + let compact_key_range = job.compact_key_range; + let compact_lsn_range = job.compact_lsn_range; - let dry_run = flags.contains(CompactFlags::DryRun); - - if compaction_key_range == (Key::MIN..Key::MAX) { - info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compaction_key_range={}..{}", compaction_key_range.start, compaction_key_range.end); - } else { - info!("running enhanced gc bottom-most compaction, dry_run={dry_run}"); - } + info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end); scopeguard::defer! { info!("done enhanced gc bottom-most compaction"); @@ -1970,11 +2005,15 @@ impl Timeline { // to get the truth data. let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn(); // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for - // each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use + // each of the retain_lsn. Therefore, if the user-provided `compact_lsn_range.end` is larger than the real gc cutoff, we will use // the real cutoff. - let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff); + let mut gc_cutoff = if compact_lsn_range.end == Lsn::MAX { + real_gc_cutoff + } else { + compact_lsn_range.end + }; if gc_cutoff > real_gc_cutoff { - warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff); + warn!("provided compact_lsn_range.end={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff); gc_cutoff = real_gc_cutoff; } gc_cutoff @@ -1991,7 +2030,7 @@ impl Timeline { } let mut selected_layers: Vec = Vec::new(); drop(gc_info); - // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers. + // Firstly, pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers. let Some(max_layer_lsn) = layers .iter_historic_layers() .filter(|desc| desc.get_lsn_range().start <= gc_cutoff) @@ -2001,27 +2040,45 @@ impl Timeline { info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff); return Ok(()); }; + // Next, if the user specifies compact_lsn_range.start, we need to filter some layers out. All the layers (strictly) below + // the min_layer_lsn computed as below will be filtered out and the data will be accessed using the normal read path, as if + // it is a branch. + let Some(min_layer_lsn) = layers + .iter_historic_layers() + .filter(|desc| { + if compact_lsn_range.start == Lsn::INVALID { + true // select all layers below if start == Lsn(0) + } else { + desc.get_lsn_range().end > compact_lsn_range.start // strictly larger than compact_above_lsn + } + }) + .map(|desc| desc.get_lsn_range().start) + .min() + else { + info!("no layers to compact with gc: no historic layers above compact_above_lsn, compact_above_lsn={}", compact_lsn_range.end); + return Ok(()); + }; // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key // layers to compact. let mut rewrite_layers = Vec::new(); for desc in layers.iter_historic_layers() { if desc.get_lsn_range().end <= max_layer_lsn - && overlaps_with(&desc.get_key_range(), &compaction_key_range) + && desc.get_lsn_range().start >= min_layer_lsn + && overlaps_with(&desc.get_key_range(), &compact_key_range) { // If the layer overlaps with the compaction key range, we need to read it to obtain all keys within the range, // even if it might contain extra keys selected_layers.push(guard.get_from_desc(&desc)); // If the layer is not fully contained within the key range, we need to rewrite it if it's a delta layer (it's fine // to overlap image layers) - if desc.is_delta() - && !fully_contains(&compaction_key_range, &desc.get_key_range()) + if desc.is_delta() && !fully_contains(&compact_key_range, &desc.get_key_range()) { rewrite_layers.push(desc); } } } if selected_layers.is_empty() { - info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compaction_key_range.start, compaction_key_range.end); + info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compact_key_range.start, compact_key_range.end); return Ok(()); } retain_lsns_below_horizon.sort(); @@ -2029,13 +2086,20 @@ impl Timeline { selected_layers, gc_cutoff, retain_lsns_below_horizon, + min_layer_lsn, max_layer_lsn, - compaction_key_range, + compaction_key_range: compact_key_range, rewrite_layers, } }; - let lowest_retain_lsn = if self.ancestor_timeline.is_some() { - Lsn(self.ancestor_lsn.0 + 1) + let (has_data_below, lowest_retain_lsn) = if compact_lsn_range.start != Lsn::INVALID { + // If we only compact above some LSN, we should get the history from the current branch below the specified LSN. + // We use job_desc.min_layer_lsn as if it's the lowest branch point. + (true, job_desc.min_layer_lsn) + } else if self.ancestor_timeline.is_some() { + // In theory, we can also use min_layer_lsn here, but using ancestor LSN makes sure the delta layers cover the + // LSN ranges all the way to the ancestor timeline. + (true, self.ancestor_lsn) } else { let res = job_desc .retain_lsns_below_horizon @@ -2053,17 +2117,19 @@ impl Timeline { .unwrap_or(job_desc.gc_cutoff) ); } - res + (false, res) }; info!( - "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}", + "picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}", job_desc.selected_layers.len(), job_desc.rewrite_layers.len(), job_desc.max_layer_lsn, + job_desc.min_layer_lsn, job_desc.gc_cutoff, lowest_retain_lsn, job_desc.compaction_key_range.start, - job_desc.compaction_key_range.end + job_desc.compaction_key_range.end, + has_data_below, ); for layer in &job_desc.selected_layers { @@ -2107,10 +2173,22 @@ impl Timeline { let mut delta_layers = Vec::new(); let mut image_layers = Vec::new(); let mut downloaded_layers = Vec::new(); + let mut total_downloaded_size = 0; + let mut total_layer_size = 0; for layer in &job_desc.selected_layers { + if layer.needs_download().await?.is_some() { + total_downloaded_size += layer.layer_desc().file_size; + } + total_layer_size += layer.layer_desc().file_size; let resident_layer = layer.download_and_keep_resident().await?; downloaded_layers.push(resident_layer); } + info!( + "finish downloading layers, downloaded={}, total={}, ratio={:.2}", + total_downloaded_size, + total_layer_size, + total_downloaded_size as f64 / total_layer_size as f64 + ); for resident_layer in &downloaded_layers { if resident_layer.layer_desc().is_delta() { let layer = resident_layer.get_as_delta(ctx).await?; @@ -2133,7 +2211,7 @@ impl Timeline { // Only create image layers when there is no ancestor branches. TODO: create covering image layer // when some condition meet. - let mut image_layer_writer = if self.ancestor_timeline.is_none() { + let mut image_layer_writer = if !has_data_below { Some( SplitImageLayerWriter::new( self.conf, @@ -2166,7 +2244,11 @@ impl Timeline { } let mut delta_layer_rewriters = HashMap::, RewritingLayers>::new(); - /// Returns None if there is no ancestor branch. Throw an error when the key is not found. + /// When compacting not at a bottom range (=`[0,X)`) of the root branch, we "have data below" (`has_data_below=true`). + /// The two cases are compaction in ancestor branches and when `compact_lsn_range.start` is set. + /// In those cases, we need to pull up data from below the LSN range we're compaction. + /// + /// This function unifies the cases so that later code doesn't have to think about it. /// /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image /// is needed for reconstruction. This should be fixed in the future. @@ -2174,17 +2256,19 @@ impl Timeline { /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor /// images. async fn get_ancestor_image( - tline: &Arc, + this_tline: &Arc, key: Key, ctx: &RequestContext, + has_data_below: bool, + history_lsn_point: Lsn, ) -> anyhow::Result> { - if tline.ancestor_timeline.is_none() { + if !has_data_below { return Ok(None); }; // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing // as much existing code as possible. - let img = tline.get(key, tline.ancestor_lsn, ctx).await?; - Ok(Some((key, tline.ancestor_lsn, img))) + let img = this_tline.get(key, history_lsn_point, ctx).await?; + Ok(Some((key, history_lsn_point, img))) } // Actually, we can decide not to write to the image layer at all at this point because @@ -2268,7 +2352,8 @@ impl Timeline { job_desc.gc_cutoff, &job_desc.retain_lsns_below_horizon, COMPACTION_DELTA_THRESHOLD, - get_ancestor_image(self, *last_key, ctx).await?, + get_ancestor_image(self, *last_key, ctx, has_data_below, lowest_retain_lsn) + .await?, ) .await?; retention @@ -2297,7 +2382,7 @@ impl Timeline { job_desc.gc_cutoff, &job_desc.retain_lsns_below_horizon, COMPACTION_DELTA_THRESHOLD, - get_ancestor_image(self, last_key, ctx).await?, + get_ancestor_image(self, last_key, ctx, has_data_below, lowest_retain_lsn).await?, ) .await?; retention diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 810a9723e0e4..88873c63c24c 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -153,6 +153,7 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): if i % 10 == 0: log.info(f"Running churn round {i}/{churn_rounds} ...") + if (i - 1) % 10 == 0: # Run gc-compaction every 10 rounds to ensure the test doesn't take too long time. ps_http.timeline_compact( tenant_id, @@ -161,10 +162,11 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): body={ "scheduled": True, "sub_compaction": True, - "compact_range": { + "compact_key_range": { "start": "000000000000000000000000000000000000", "end": "030000000000000000000000000000000000", }, + "sub_compaction_max_job_size_mb": 16, }, )