Skip to content

Commit

Permalink
fix(pageserver): fix gc-compaction racing with legacy gc
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed Dec 8, 2024
1 parent ec79087 commit 7529510
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2061,7 +2061,7 @@ async fn timeline_compact_handler(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let rx = tenant.schedule_compaction(timeline_id, options).await;
let rx = tenant.schedule_compaction(timeline_id, options).await.map_err(ApiError::InternalServerError)?;
if wait_until_scheduled_compaction_done {
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
rx.await.ok();
Expand Down
36 changes: 26 additions & 10 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3028,14 +3028,23 @@ impl Tenant {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(ScheduledCompactionTask {
options: job,
result_tx: if idx == jobs_len - 1 {
// The last compaction job sends the completion signal
next_scheduled_compaction_task.result_tx.take()
} else {
None
},
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
ScheduledCompactionTask {
options: job,
// The last job in the queue sends the signal and release the gc guard
result_tx: next_scheduled_compaction_task
.result_tx
.take(),
gc_block: next_scheduled_compaction_task
.gc_block
.take(),
}
} else {
ScheduledCompactionTask {
options: job,
result_tx: None,
gc_block: None,
}
});
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
Expand Down Expand Up @@ -3095,15 +3104,22 @@ impl Tenant {
&self,
timeline_id: TimelineId,
options: CompactOptions,
) -> tokio::sync::oneshot::Receiver<()> {
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
let gc_guard = match self.gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
bail!("cannot run gc-compaction because gc is blocked: {}", e);
}
};
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask {
options,
result_tx: Some(tx),
gc_block: Some(gc_guard),
});
rx
Ok(rx)
}

// Call through to all timelines to freeze ephemeral layers if needed. Usually
Expand Down
12 changes: 6 additions & 6 deletions pageserver/src/tenant/gc_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use utils::id::TimelineId;

Expand All @@ -20,7 +20,7 @@ pub(crate) struct GcBlock {
/// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: tokio::sync::Mutex<()>,
blocking: Arc<tokio::sync::Mutex<()>>,
}

impl GcBlock {
Expand All @@ -30,7 +30,7 @@ impl GcBlock {
/// it's ending, or if not currently possible, a value describing the reasons why not.
///
/// Cancellation safe.
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
let reasons = {
let g = self.reasons.lock().unwrap();

Expand All @@ -44,7 +44,7 @@ impl GcBlock {
Err(reasons)
} else {
Ok(Guard {
_inner: self.blocking.lock().await,
_inner: self.blocking.clone().lock_owned().await,
})
}
}
Expand Down Expand Up @@ -170,8 +170,8 @@ impl GcBlock {
}
}

pub(super) struct Guard<'a> {
_inner: tokio::sync::MutexGuard<'a, ()>,
pub(crate) struct Guard {
_inner: tokio::sync::OwnedMutexGuard<()>,
}

#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/remote_timeline_client/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ pub(crate) struct GcBlocking {
pub(crate) enum GcBlockingReason {
Manual,
DetachAncestor,
GcCompaction,
}

impl GcBlocking {
Expand Down
7 changes: 5 additions & 2 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
Expand All @@ -63,9 +63,12 @@ use super::CompactionError;
const COMPACTION_DELTA_THRESHOLD: usize = 5;

/// A scheduled compaction task.
pub struct ScheduledCompactionTask {
pub(crate) struct ScheduledCompactionTask {
pub options: CompactOptions,
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
pub gc_block: Option<gc_block::Guard>,
}

pub struct GcCompactionJobDescription {
Expand Down
6 changes: 1 addition & 5 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ def test_pageserver_compaction_smoke(
assert vectored_average < 8


@pytest.mark.skip(
"This is being fixed and tracked in https://github.com/neondatabase/neon/issues/9114"
)
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
SMOKE_CONF = {
Expand Down Expand Up @@ -165,8 +162,7 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
"sub_compaction": True,
"compact_range": {
"start": "000000000000000000000000000000000000",
# skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this
"end": "010000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
},
)
Expand Down

0 comments on commit 7529510

Please sign in to comment.