Skip to content

Commit

Permalink
fix(pageserver): fix gc-compaction racing with legacy gc
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed Dec 8, 2024
1 parent ec79087 commit 196c0cf
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 22 deletions.
36 changes: 26 additions & 10 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3028,14 +3028,23 @@ impl Tenant {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(ScheduledCompactionTask {
options: job,
result_tx: if idx == jobs_len - 1 {
// The last compaction job sends the completion signal
next_scheduled_compaction_task.result_tx.take()
} else {
None
},
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
ScheduledCompactionTask {
options: job,
// The last job in the queue sends the signal and release the gc guard
result_tx: next_scheduled_compaction_task
.result_tx
.take(),
gc_block: next_scheduled_compaction_task
.gc_block
.take(),
}
} else {
ScheduledCompactionTask {
options: job,
result_tx: None,
gc_block: None,
}
});
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
Expand Down Expand Up @@ -3095,15 +3104,22 @@ impl Tenant {
&self,
timeline_id: TimelineId,
options: CompactOptions,
) -> tokio::sync::oneshot::Receiver<()> {
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
let gc_guard = match self.gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
bail!("cannot run gc-compaction because gc is blocked: {}", e);
}
};
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask {
options,
result_tx: Some(tx),
gc_block: Some(gc_guard),
});
rx
Ok(rx)
}

// Call through to all timelines to freeze ephemeral layers if needed. Usually
Expand Down
12 changes: 6 additions & 6 deletions pageserver/src/tenant/gc_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use utils::id::TimelineId;

Expand All @@ -20,7 +20,7 @@ pub(crate) struct GcBlock {
/// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: tokio::sync::Mutex<()>,
blocking: Arc<tokio::sync::Mutex<()>>,
}

impl GcBlock {
Expand All @@ -30,7 +30,7 @@ impl GcBlock {
/// it's ending, or if not currently possible, a value describing the reasons why not.
///
/// Cancellation safe.
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
let reasons = {
let g = self.reasons.lock().unwrap();

Expand All @@ -44,7 +44,7 @@ impl GcBlock {
Err(reasons)
} else {
Ok(Guard {
_inner: self.blocking.lock().await,
_inner: self.blocking.clone().lock_owned().await,
})
}
}
Expand Down Expand Up @@ -170,8 +170,8 @@ impl GcBlock {
}
}

pub(super) struct Guard<'a> {
_inner: tokio::sync::MutexGuard<'a, ()>,
pub(super) struct Guard {
_inner: tokio::sync::OwnedMutexGuard<()>,
}

#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/remote_timeline_client/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ pub(crate) struct GcBlocking {
pub(crate) enum GcBlockingReason {
Manual,
DetachAncestor,
GcCompaction,
}

impl GcBlocking {
Expand Down
5 changes: 4 additions & 1 deletion pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
Expand All @@ -65,7 +65,10 @@ const COMPACTION_DELTA_THRESHOLD: usize = 5;
/// A scheduled compaction task.
pub struct ScheduledCompactionTask {
pub options: CompactOptions,
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
pub gc_block: Option<gc_block::Guard>,
}

pub struct GcCompactionJobDescription {
Expand Down
6 changes: 1 addition & 5 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ def test_pageserver_compaction_smoke(
assert vectored_average < 8


@pytest.mark.skip(
"This is being fixed and tracked in https://github.com/neondatabase/neon/issues/9114"
)
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
SMOKE_CONF = {
Expand Down Expand Up @@ -165,8 +162,7 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
"sub_compaction": True,
"compact_range": {
"start": "000000000000000000000000000000000000",
# skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this
"end": "010000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
},
)
Expand Down

0 comments on commit 196c0cf

Please sign in to comment.