diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 0f11bbc50790..75d25d0a6ac7 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2061,7 +2061,7 @@ async fn timeline_compact_handler( let tenant = state .tenant_manager .get_attached_tenant_shard(tenant_shard_id)?; - let rx = tenant.schedule_compaction(timeline_id, options).await; + let rx = tenant.schedule_compaction(timeline_id, options).await.map_err(ApiError::InternalServerError)?; if wait_until_scheduled_compaction_done { // It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction. rx.await.ok(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4a9c44aefdbc..e71a56ed402a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3028,14 +3028,23 @@ impl Tenant { let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); let tline_pending_tasks = guard.entry(*timeline_id).or_default(); for (idx, job) in jobs.into_iter().enumerate() { - tline_pending_tasks.push_back(ScheduledCompactionTask { - options: job, - result_tx: if idx == jobs_len - 1 { - // The last compaction job sends the completion signal - next_scheduled_compaction_task.result_tx.take() - } else { - None - }, + tline_pending_tasks.push_back(if idx == jobs_len - 1 { + ScheduledCompactionTask { + options: job, + // The last job in the queue sends the signal and releases the gc guard + result_tx: next_scheduled_compaction_task + .result_tx + .take(), + gc_block: next_scheduled_compaction_task + .gc_block + .take(), + } + } else { + ScheduledCompactionTask { + options: job, + result_tx: None, + gc_block: None, + } }); } info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len); @@ -3095,15 +3104,22 @@ impl Tenant { &self, timeline_id: TimelineId, options: CompactOptions, - ) -> tokio::sync::oneshot::Receiver<()> { + ) -> anyhow::Result> { + let gc_guard = match self.gc_block.start().await { + Ok(guard) => guard, + Err(e) => { + bail!("cannot run gc-compaction because gc is blocked: {}", e); + } + }; let (tx, rx) = tokio::sync::oneshot::channel(); let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); let tline_pending_tasks = guard.entry(timeline_id).or_default(); tline_pending_tasks.push_back(ScheduledCompactionTask { options, result_tx: Some(tx), + gc_block: Some(gc_guard), }); - rx + Ok(rx) } // Call through to all timelines to freeze ephemeral layers if needed. Usually @@ -8150,6 +8166,12 @@ mod tests { ) .await?; { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x30)) + .wait() + .await; // Update GC info let mut guard = tline.gc_info.write().unwrap(); guard.cutoffs.time = Lsn(0x30); @@ -8252,6 +8274,12 @@ mod tests { // increase GC horizon and compact again { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x40)) + .wait() + .await; // Update GC info let mut guard = tline.gc_info.write().unwrap(); guard.cutoffs.time = Lsn(0x40); @@ -8632,6 +8660,12 @@ mod tests { .await? }; { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x30)) + .wait() + .await; // Update GC info let mut guard = tline.gc_info.write().unwrap(); *guard = GcInfo { @@ -8713,6 +8747,12 @@ mod tests { // increase GC horizon and compact again { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x40)) + .wait() + .await; // Update GC info let mut guard = tline.gc_info.write().unwrap(); guard.cutoffs.time = Lsn(0x40); @@ -9160,6 +9200,12 @@ mod tests { ) .await?; { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x30)) + .wait() + .await; // Update GC info let mut guard = tline.gc_info.write().unwrap(); *guard = GcInfo { @@ -9302,6 +9348,12 @@ mod tests { // increase GC horizon and compact again { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x38)) + .wait() + .await; // Update GC info let mut guard = tline.gc_info.write().unwrap(); guard.cutoffs.time = Lsn(0x38); @@ -9397,6 +9449,12 @@ mod tests { ) .await?; { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x30)) + .wait() + .await; // Update GC info let mut guard = tline.gc_info.write().unwrap(); *guard = GcInfo { @@ -9641,6 +9699,12 @@ mod tests { branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10))); { + parent_tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x10)) + .wait() + .await; // Update GC info let mut guard = parent_tline.gc_info.write().unwrap(); *guard = GcInfo { @@ -9655,6 +9719,12 @@ mod tests { } { + branch_tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x50)) + .wait() + .await; // Update GC info let mut guard = branch_tline.gc_info.write().unwrap(); *guard = GcInfo { @@ -9984,6 +10054,12 @@ mod tests { .await?; { + tline + .latest_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x30)) + .wait() + .await; // Update GC info let mut guard = tline.gc_info.write().unwrap(); *guard = GcInfo { diff --git a/pageserver/src/tenant/gc_block.rs b/pageserver/src/tenant/gc_block.rs index 373779ddb882..af73acb2be86 100644 --- a/pageserver/src/tenant/gc_block.rs +++ b/pageserver/src/tenant/gc_block.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use utils::id::TimelineId; @@ -20,7 +20,7 @@ pub(crate) struct GcBlock { /// Do not add any more features taking and forbidding taking this lock. It should be /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`] /// synchronizes with gc attempts by locking and unlocking this mutex. - blocking: tokio::sync::Mutex<()>, + blocking: Arc>, } impl GcBlock { @@ -30,7 +30,7 @@ impl GcBlock { /// it's ending, or if not currently possible, a value describing the reasons why not. /// /// Cancellation safe. - pub(super) async fn start(&self) -> Result, BlockingReasons> { + pub(super) async fn start(&self) -> Result { let reasons = { let g = self.reasons.lock().unwrap(); @@ -44,7 +44,7 @@ impl GcBlock { Err(reasons) } else { Ok(Guard { - _inner: self.blocking.lock().await, + _inner: self.blocking.clone().lock_owned().await, }) } } @@ -170,8 +170,8 @@ impl GcBlock { } } -pub(super) struct Guard<'a> { - _inner: tokio::sync::MutexGuard<'a, ()>, +pub(crate) struct Guard { + _inner: tokio::sync::OwnedMutexGuard<()>, } #[derive(Debug)] diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index a18e157d37b4..fa924d23b01c 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -41,7 +41,7 @@ use crate::tenant::storage_layer::{ use crate::tenant::timeline::ImageLayerCreationOutcome; use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter}; use crate::tenant::timeline::{Layer, ResidentLayer}; -use crate::tenant::{DeltaLayer, MaybeOffloaded}; +use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded}; use crate::virtual_file::{MaybeFatalIo, VirtualFile}; use pageserver_api::config::tenant_conf_defaults::{ DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD, @@ -63,9 +63,12 @@ use super::CompactionError; const COMPACTION_DELTA_THRESHOLD: usize = 5; /// A scheduled compaction task. -pub struct ScheduledCompactionTask { +pub(crate) struct ScheduledCompactionTask { pub options: CompactOptions, + /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender. pub result_tx: Option>, + /// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard. + pub gc_block: Option, } pub struct GcCompactionJobDescription { @@ -1768,8 +1771,7 @@ impl Timeline { let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn { compact_below_lsn } else { - let gc_info = self.gc_info.read().unwrap(); - gc_info.cutoffs.select_min() // use the real gc cutoff + *self.get_latest_gc_cutoff_lsn() // use the real gc cutoff }; let mut compact_jobs = Vec::new(); // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning @@ -1962,7 +1964,11 @@ impl Timeline { let gc_info = self.gc_info.read().unwrap(); let mut retain_lsns_below_horizon = Vec::new(); let gc_cutoff = { - let real_gc_cutoff = gc_info.cutoffs.select_min(); + // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff. + // Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of + // cleaning everything that theoritically it could. In the future, it should use `self.gc_info` + // to get the truth data. + let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn(); // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for // each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use // the real cutoff. diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 881503046ce3..810a9723e0e4 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -121,9 +121,6 @@ def test_pageserver_compaction_smoke( assert vectored_average < 8 -@pytest.mark.skip( - "This is being fixed and tracked in https://github.com/neondatabase/neon/issues/9114" -) @skip_in_debug_build("only run with release build") def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): SMOKE_CONF = { @@ -156,20 +153,20 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): if i % 10 == 0: log.info(f"Running churn round {i}/{churn_rounds} ...") - ps_http.timeline_compact( - tenant_id, - timeline_id, - enhanced_gc_bottom_most_compaction=True, - body={ - "scheduled": True, - "sub_compaction": True, - "compact_range": { - "start": "000000000000000000000000000000000000", - # skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this - "end": "010000000000000000000000000000000000", + # Run gc-compaction every 10 rounds to ensure the test doesn't take too long time. + ps_http.timeline_compact( + tenant_id, + timeline_id, + enhanced_gc_bottom_most_compaction=True, + body={ + "scheduled": True, + "sub_compaction": True, + "compact_range": { + "start": "000000000000000000000000000000000000", + "end": "030000000000000000000000000000000000", + }, }, - }, - ) + ) workload.churn_rows(row_count, env.pageserver.id) @@ -181,6 +178,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): log.info("Validating at workload end ...") workload.validate(env.pageserver.id) + # Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction. + ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True) + ps_http.timeline_gc(tenant_id, timeline_id, None) + # Stripe sizes in number of pages. TINY_STRIPES = 16