Skip to content

Commit

Permalink
fix(pageserver): fix gc-compaction racing with legacy gc (#10052)
Browse files Browse the repository at this point in the history
## Problem

close #10049, close
#10030, close
#8861

part of #9114

The legacy gc process calls `get_latest_gc_cutoff`, which uses a Rcu
different than the gc_info struct. In the gc_compaction_smoke test case,
the "latest" cutoff could be lower than the gc_info struct, causing
gc-compaction to collect data that could be accessed by
`latest_gc_cutoff`. Technically speaking, there's nothing wrong with
gc-compaction using gc_info without considering latest_gc_cutoff,
because gc_info is the source of truth. But anyways, let's fix it.

## Summary of changes

* gc-compaction uses `latest_gc_cutoff` instead of gc_info to determine
the gc horizon.
* if a gc-compaction is scheduled via tenant compaction iteration, it
will take the gc_block lock to avoid racing with functionalities like
detach ancestor (if it's triggered via manual compaction API without
scheduling, then it won't take the lock)

---------

Signed-off-by: Alex Chi Z <[email protected]>
Co-authored-by: Arpad Müller <[email protected]>
  • Loading branch information
skyzh and arpad-m authored Dec 9, 2024
1 parent 92273b6 commit 4c4cb80
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 38 deletions.
2 changes: 1 addition & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2061,7 +2061,7 @@ async fn timeline_compact_handler(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let rx = tenant.schedule_compaction(timeline_id, options).await;
let rx = tenant.schedule_compaction(timeline_id, options).await.map_err(ApiError::InternalServerError)?;
if wait_until_scheduled_compaction_done {
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
rx.await.ok();
Expand Down
96 changes: 86 additions & 10 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3028,14 +3028,23 @@ impl Tenant {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(ScheduledCompactionTask {
options: job,
result_tx: if idx == jobs_len - 1 {
// The last compaction job sends the completion signal
next_scheduled_compaction_task.result_tx.take()
} else {
None
},
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
ScheduledCompactionTask {
options: job,
// The last job in the queue sends the signal and releases the gc guard
result_tx: next_scheduled_compaction_task
.result_tx
.take(),
gc_block: next_scheduled_compaction_task
.gc_block
.take(),
}
} else {
ScheduledCompactionTask {
options: job,
result_tx: None,
gc_block: None,
}
});
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
Expand Down Expand Up @@ -3095,15 +3104,22 @@ impl Tenant {
&self,
timeline_id: TimelineId,
options: CompactOptions,
) -> tokio::sync::oneshot::Receiver<()> {
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
let gc_guard = match self.gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
bail!("cannot run gc-compaction because gc is blocked: {}", e);
}
};
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask {
options,
result_tx: Some(tx),
gc_block: Some(gc_guard),
});
rx
Ok(rx)
}

// Call through to all timelines to freeze ephemeral layers if needed. Usually
Expand Down Expand Up @@ -8150,6 +8166,12 @@ mod tests {
)
.await?;
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x30);
Expand Down Expand Up @@ -8252,6 +8274,12 @@ mod tests {

// increase GC horizon and compact again
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
Expand Down Expand Up @@ -8632,6 +8660,12 @@ mod tests {
.await?
};
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
Expand Down Expand Up @@ -8713,6 +8747,12 @@ mod tests {

// increase GC horizon and compact again
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
Expand Down Expand Up @@ -9160,6 +9200,12 @@ mod tests {
)
.await?;
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
Expand Down Expand Up @@ -9302,6 +9348,12 @@ mod tests {

// increase GC horizon and compact again
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x38))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x38);
Expand Down Expand Up @@ -9397,6 +9449,12 @@ mod tests {
)
.await?;
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
Expand Down Expand Up @@ -9641,6 +9699,12 @@ mod tests {
branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));

{
parent_tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x10))
.wait()
.await;
// Update GC info
let mut guard = parent_tline.gc_info.write().unwrap();
*guard = GcInfo {
Expand All @@ -9655,6 +9719,12 @@ mod tests {
}

{
branch_tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x50))
.wait()
.await;
// Update GC info
let mut guard = branch_tline.gc_info.write().unwrap();
*guard = GcInfo {
Expand Down Expand Up @@ -9984,6 +10054,12 @@ mod tests {
.await?;

{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
Expand Down
12 changes: 6 additions & 6 deletions pageserver/src/tenant/gc_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use utils::id::TimelineId;

Expand All @@ -20,7 +20,7 @@ pub(crate) struct GcBlock {
/// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: tokio::sync::Mutex<()>,
blocking: Arc<tokio::sync::Mutex<()>>,
}

impl GcBlock {
Expand All @@ -30,7 +30,7 @@ impl GcBlock {
/// it's ending, or if not currently possible, a value describing the reasons why not.
///
/// Cancellation safe.
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
let reasons = {
let g = self.reasons.lock().unwrap();

Expand All @@ -44,7 +44,7 @@ impl GcBlock {
Err(reasons)
} else {
Ok(Guard {
_inner: self.blocking.lock().await,
_inner: self.blocking.clone().lock_owned().await,
})
}
}
Expand Down Expand Up @@ -170,8 +170,8 @@ impl GcBlock {
}
}

pub(super) struct Guard<'a> {
_inner: tokio::sync::MutexGuard<'a, ()>,
pub(crate) struct Guard {
_inner: tokio::sync::OwnedMutexGuard<()>,
}

#[derive(Debug)]
Expand Down
16 changes: 11 additions & 5 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
Expand All @@ -63,9 +63,12 @@ use super::CompactionError;
const COMPACTION_DELTA_THRESHOLD: usize = 5;

/// A scheduled compaction task.
pub struct ScheduledCompactionTask {
pub(crate) struct ScheduledCompactionTask {
pub options: CompactOptions,
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
pub gc_block: Option<gc_block::Guard>,
}

pub struct GcCompactionJobDescription {
Expand Down Expand Up @@ -1768,8 +1771,7 @@ impl Timeline {
let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
compact_below_lsn
} else {
let gc_info = self.gc_info.read().unwrap();
gc_info.cutoffs.select_min() // use the real gc cutoff
*self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
};
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
Expand Down Expand Up @@ -1962,7 +1964,11 @@ impl Timeline {
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = {
let real_gc_cutoff = gc_info.cutoffs.select_min();
// Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
// Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
// cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
// to get the truth data.
let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
// The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
// each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use
// the real cutoff.
Expand Down
33 changes: 17 additions & 16 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ def test_pageserver_compaction_smoke(
assert vectored_average < 8


@pytest.mark.skip(
"This is being fixed and tracked in https://github.com/neondatabase/neon/issues/9114"
)
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
SMOKE_CONF = {
Expand Down Expand Up @@ -156,20 +153,20 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")

ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_range": {
"start": "000000000000000000000000000000000000",
# skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this
"end": "010000000000000000000000000000000000",
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_range": {
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
},
},
)
)

workload.churn_rows(row_count, env.pageserver.id)

Expand All @@ -181,6 +178,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)

# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_gc(tenant_id, timeline_id, None)


# Stripe sizes in number of pages.
TINY_STRIPES = 16
Expand Down

1 comment on commit 4c4cb80

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7201 tests run: 6876 passed, 0 failed, 325 skipped (full report)


Flaky tests (2)

Postgres 16

Postgres 14

Code coverage* (full report)

  • functions: 31.4% (8331 of 26528 functions)
  • lines: 47.7% (65609 of 137438 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
4c4cb80 at 2024-12-09T22:09:35.947Z :recycle:

Please sign in to comment.