Skip to content

Commit

Permalink
switch implementation to heavier_once_cell
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Apr 4, 2024
1 parent f25a381 commit ed3b77b
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 236 deletions.
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion libs/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
serde_assert.workspace = true
tokio-test.workspace = true

[[bench]]
name = "benchmarks"
Expand Down
2 changes: 0 additions & 2 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ pub mod yielding_loop;

pub mod zstd;

pub mod poison;

/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:
Expand Down
135 changes: 0 additions & 135 deletions libs/utils/src/poison.rs

This file was deleted.

4 changes: 2 additions & 2 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}

impl WalRedoManager {
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await,
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10).await;
walredo_mgr.maybe_quiesce(period * 10);
}

// TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off,
Expand Down
129 changes: 48 additions & 81 deletions pageserver/src/walredo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::time::Duration;
use std::time::Instant;
use tracing::*;
use utils::lsn::Lsn;
use utils::poison::Poison;
use utils::sync::heavier_once_cell;

///
/// This is the real implementation that uses a Postgres process to
Expand All @@ -54,7 +54,19 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: tokio::sync::RwLock<Poison<Option<Arc<process::WalRedoProcess>>>>,
/// The current [`process::WalRedoProcess`] that is used by new redo requests.
/// We use [`heavier_once_cell`] for coalescing the spawning, but the redo
/// requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
/// their process object; we use [`Arc::clone`] for that.
/// This is primarily because earlier implementations that didn't use [`heavier_once_cell`]
/// had that behavior; it's probably unnecessary.
/// The only merit of it is that if one walredo process encounters an error,
/// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`].
/// and retry redo, thereby starting the new process, while other redo tasks might
/// still be using the old redo process. But, those other tasks will most likely
/// encounter an error as well, and errors are an unexpected condition anyway.
/// So, probably we could get rid of the `Arc` in the future.
redo_process: heavier_once_cell::OnceCell<Arc<process::WalRedoProcess>>,
}

///
Expand Down Expand Up @@ -137,12 +149,7 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
pid: self
.redo_process
.read()
.await
.try_peek(|maybe_proc| maybe_proc.as_ref().map(|p| p.id()))
.unwrap(),
pid: self.redo_process.get().map(|p| p.id()),
})
}
}
Expand All @@ -160,35 +167,21 @@ impl PostgresRedoManager {
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: tokio::sync::RwLock::new(Poison::new("redo_process field", None)),
redo_process: heavier_once_cell::OnceCell::default(),
}
}

/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
// awkward control flow because rustc isn't smart enough to detect that we don't
// hold the std::sync lock guard across the await point
let kill = if let Ok(g) = self.last_redo_at.try_lock() {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
if let Ok(g) = self.last_redo_at.try_lock() {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
true
} else {
false
drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
}
} else {
false
}
} else {
false
};
if kill {
let mut lock_guard = self.redo_process.write().await;
let mut poison_guard = lock_guard.check_and_arm().unwrap();
*poison_guard.data_mut() = None;
poison_guard.disarm();
}
}

Expand All @@ -215,51 +208,31 @@ impl PostgresRedoManager {
const MAX_RETRY_ATTEMPTS: u32 = 1;
let mut n_attempts = 0u32;
loop {
// launch the WAL redo process on first use
let proc: Arc<process::WalRedoProcess> = async move {
let lock_guard = self.redo_process.read().await;
if let Some(proc) = lock_guard.try_peek(Option::clone).unwrap() {
// hot path
return anyhow::Ok(proc);
}
// slow path
// "upgrade" to write lock to launch the process
drop(lock_guard);
let mut lock_guard = self.redo_process.write().await;
if let Some(proc) = lock_guard.try_peek(Option::clone).unwrap() {
// we coalesced onto another task runnning this code
return anyhow::Ok(proc);
}
// don't hold poison_guard, the launch code can bail
let start = Instant::now();
let proc = Arc::new(
process::WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
);
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"launched walredo process"
);
//
let mut poison_guard = lock_guard.check_and_arm().unwrap();
let replaced = poison_guard.data_mut().replace(Arc::clone(&proc));
match replaced {
None => (),
Some(replaced) => {
unreachable!("the check after acquiring the write lock should prevent htis from happening: {}", replaced.id());
let proc: Arc<process::WalRedoProcess> =
match self.redo_process.get_or_init_detached().await {
Ok(guard) => Arc::clone(&guard),
Err(permit) => {
// don't hold poison_guard, the launch code can bail
let start = Instant::now();
let proc = Arc::new(
process::WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
);
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"launched walredo process"
);
self.redo_process.set(Arc::clone(&proc), permit);
proc
}
}
poison_guard.disarm();
anyhow::Ok(proc)
}.await?;
};

let started_at = std::time::Instant::now();

Expand Down Expand Up @@ -308,21 +281,15 @@ impl PostgresRedoManager {
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
{
let mut lock_guard = self.redo_process.write().await;
let mut poison_guard = lock_guard.check_and_arm().unwrap();
let maybe_proc_mut = poison_guard.data_mut();
match &*maybe_proc_mut {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {
match self.redo_process.get() {
None => (),
Some(guard) => {
if Arc::ptr_eq(&proc, &*guard) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
*maybe_proc_mut = None;
guard.take_and_deinit();
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
poison_guard.disarm();
}
// NB: there may still be other concurrent threads using `proc`.
// The last one will send SIGKILL when the underlying Arc reaches refcount 0.
Expand Down

0 comments on commit ed3b77b

Please sign in to comment.