From ed3b77b252776738983deabaf50ef22af4e05007 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 Apr 2024 10:54:56 +0000 Subject: [PATCH] switch implementation to heavier_once_cell --- Cargo.lock | 14 ---- libs/utils/Cargo.toml | 1 - libs/utils/src/lib.rs | 2 - libs/utils/src/poison.rs | 135 --------------------------------- pageserver/src/tenant.rs | 4 +- pageserver/src/tenant/tasks.rs | 2 +- pageserver/src/walredo.rs | 129 ++++++++++++------------------- 7 files changed, 51 insertions(+), 236 deletions(-) delete mode 100644 libs/utils/src/poison.rs diff --git a/Cargo.lock b/Cargo.lock index fbf42f8d48a21..7fef2ebf22ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6104,19 +6104,6 @@ dependencies = [ "xattr", ] -[[package]] -name = "tokio-test" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" -dependencies = [ - "async-stream", - "bytes", - "futures-core", - "tokio", - "tokio-stream", -] - [[package]] name = "tokio-tungstenite" version = "0.20.0" @@ -6608,7 +6595,6 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tar", - "tokio-test", "tokio-util", "tracing", "tracing-error", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index b3b50461da2a0..c2d9d9d396774 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -70,7 +70,6 @@ criterion.workspace = true hex-literal.workspace = true camino-tempfile.workspace = true serde_assert.workspace = true -tokio-test.workspace = true [[bench]] name = "benchmarks" diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 336ee87570425..04ce0626c84a0 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -89,8 +89,6 @@ pub mod yielding_loop; pub mod zstd; -pub mod poison; - /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/libs/utils/src/poison.rs b/libs/utils/src/poison.rs deleted file mode 100644 index 43b671d9900f1..0000000000000 --- a/libs/utils/src/poison.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! Protect a piece of state from reuse after it is left in an inconsistent state. -//! -//! # Example -//! -//! ``` -//! # tokio_test::block_on(async { -//! use utils::poison::Poison; -//! use std::time::Duration; -//! -//! struct State { -//! clean: bool, -//! } -//! let state = tokio::sync::Mutex::new(Poison::new("mystate", State { clean: true })); -//! -//! let mut mutex_guard = state.lock().await; -//! let mut poison_guard = mutex_guard.check_and_arm()?; -//! let state = poison_guard.data_mut(); -//! state.clean = false; -//! // If we get cancelled at this await point, subsequent check_and_arm() calls will fail. -//! tokio::time::sleep(Duration::from_secs(10)).await; -//! state.clean = true; -//! poison_guard.disarm(); -//! # Ok::<(), utils::poison::Error>(()) -//! # }); -//! ``` - -use tracing::warn; - -pub struct Poison { - what: &'static str, - state: State, - data: T, -} - -#[derive(Clone, Copy)] -enum State { - Clean, - Armed, - Poisoned { at: chrono::DateTime }, -} - -impl Poison { - /// We log `what` `warning!` level if the [`Guard`] gets dropped without being [`Guard::disarm`]ed. - pub fn new(what: &'static str, data: T) -> Self { - Self { - what, - state: State::Clean, - data, - } - } - - /// Check for poisoning and return a [`Guard`] that provides access to the wrapped state. - pub fn check_and_arm(&mut self) -> Result, Error> { - match self.state { - State::Clean => { - self.state = State::Armed; - Ok(Guard(self)) - } - State::Armed => unreachable!("transient state"), - State::Poisoned { at } => Err(Error::Poisoned { - what: self.what, - at, - }), - } - } - - pub fn try_peek(&self, f: F) -> Result - where - F: FnOnce(&T) -> R, - { - match self.state { - State::Clean => Ok(f(&self.data)), - State::Armed => unreachable!("transient state"), - State::Poisoned { at } => Err(Error::Poisoned { - what: self.what, - at, - }), - } - } -} - -/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state. -/// Once modifications are done, use [`Self::disarm`]. -/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned -/// and subsequent calls to [`Poison::check_and_arm`] will fail with an error. -pub struct Guard<'a, T>(&'a mut Poison); - -impl<'a, T> Guard<'a, T> { - pub fn data(&self) -> &T { - &self.0.data - } - pub fn data_mut(&mut self) -> &mut T { - &mut self.0.data - } - - pub fn disarm(self) { - match self.0.state { - State::Clean => unreachable!("we set it to Armed in check_and_arm()"), - State::Armed => { - self.0.state = State::Clean; - } - State::Poisoned { at } => { - unreachable!("we fail check_and_arm() if it's in that state: {at}") - } - } - } -} - -impl<'a, T> Drop for Guard<'a, T> { - fn drop(&mut self) { - match self.0.state { - State::Clean => { - // set by disarm() - } - State::Armed => { - // still armed => poison it - let at = chrono::Utc::now(); - self.0.state = State::Poisoned { at }; - warn!(at=?at, "poisoning {}", self.0.what); - } - State::Poisoned { at } => { - unreachable!("we fail check_and_arm() if it's in that state: {at}") - } - } - } -} - -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("poisoned at {at}: {what}")] - Poisoned { - what: &'static str, - at: chrono::DateTime, - }, -} diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d0a643aff65d4..62ea83905d53f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -350,9 +350,9 @@ impl From for WalRedoManager { } impl WalRedoManager { - pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { match self { - Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await, + Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index b28bd06089da2..e4f5f7513288f 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -208,7 +208,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // Perhaps we did no work and the walredo process has been idle for some time: // give it a chance to shut down to avoid leaving walredo process running indefinitely. if let Some(walredo_mgr) = &tenant.walredo_mgr { - walredo_mgr.maybe_quiesce(period * 10).await; + walredo_mgr.maybe_quiesce(period * 10); } // TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f406ef3ab866b..c23bb6c1783be 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -41,7 +41,7 @@ use std::time::Duration; use std::time::Instant; use tracing::*; use utils::lsn::Lsn; -use utils::poison::Poison; +use utils::sync::heavier_once_cell; /// /// This is the real implementation that uses a Postgres process to @@ -54,7 +54,19 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: tokio::sync::RwLock>>>, + /// The current [`process::WalRedoProcess`] that is used by new redo requests. + /// We use [`heavier_once_cell`] for coalescing the spawning, but the redo + /// requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the + /// their process object; we use [`Arc::clone`] for that. + /// This is primarily because earlier implementations that didn't use [`heavier_once_cell`] + /// had that behavior; it's probably unnecessary. + /// The only merit of it is that if one walredo process encounters an error, + /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`]. + /// and retry redo, thereby starting the new process, while other redo tasks might + /// still be using the old redo process. But, those other tasks will most likely + /// encounter an error as well, and errors are an unexpected condition anyway. + /// So, probably we could get rid of the `Arc` in the future. + redo_process: heavier_once_cell::OnceCell>, } /// @@ -137,12 +149,7 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - pid: self - .redo_process - .read() - .await - .try_peek(|maybe_proc| maybe_proc.as_ref().map(|p| p.id())) - .unwrap(), + pid: self.redo_process.get().map(|p| p.id()), }) } } @@ -160,35 +167,21 @@ impl PostgresRedoManager { tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), - redo_process: tokio::sync::RwLock::new(Poison::new("redo_process field", None)), + redo_process: heavier_once_cell::OnceCell::default(), } } /// This type doesn't have its own background task to check for idleness: we /// rely on our owner calling this function periodically in its own housekeeping /// loops. - pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { - // awkward control flow because rustc isn't smart enough to detect that we don't - // hold the std::sync lock guard across the await point - let kill = if let Ok(g) = self.last_redo_at.try_lock() { + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { + if let Ok(g) = self.last_redo_at.try_lock() { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - true - } else { - false + drop(self.redo_process.get().map(|guard| guard.take_and_deinit())); } - } else { - false } - } else { - false - }; - if kill { - let mut lock_guard = self.redo_process.write().await; - let mut poison_guard = lock_guard.check_and_arm().unwrap(); - *poison_guard.data_mut() = None; - poison_guard.disarm(); } } @@ -215,51 +208,31 @@ impl PostgresRedoManager { const MAX_RETRY_ATTEMPTS: u32 = 1; let mut n_attempts = 0u32; loop { - // launch the WAL redo process on first use - let proc: Arc = async move { - let lock_guard = self.redo_process.read().await; - if let Some(proc) = lock_guard.try_peek(Option::clone).unwrap() { - // hot path - return anyhow::Ok(proc); - } - // slow path - // "upgrade" to write lock to launch the process - drop(lock_guard); - let mut lock_guard = self.redo_process.write().await; - if let Some(proc) = lock_guard.try_peek(Option::clone).unwrap() { - // we coalesced onto another task runnning this code - return anyhow::Ok(proc); - } - // don't hold poison_guard, the launch code can bail - let start = Instant::now(); - let proc = Arc::new( - process::WalRedoProcess::launch( - self.conf, - self.tenant_shard_id, - pg_version, - ) - .context("launch walredo process")?, - ); - let duration = start.elapsed(); - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM - .observe(duration.as_secs_f64()); - info!( - duration_ms = duration.as_millis(), - pid = proc.id(), - "launched walredo process" - ); - // - let mut poison_guard = lock_guard.check_and_arm().unwrap(); - let replaced = poison_guard.data_mut().replace(Arc::clone(&proc)); - match replaced { - None => (), - Some(replaced) => { - unreachable!("the check after acquiring the write lock should prevent htis from happening: {}", replaced.id()); + let proc: Arc = + match self.redo_process.get_or_init_detached().await { + Ok(guard) => Arc::clone(&guard), + Err(permit) => { + // don't hold poison_guard, the launch code can bail + let start = Instant::now(); + let proc = Arc::new( + process::WalRedoProcess::launch( + self.conf, + self.tenant_shard_id, + pg_version, + ) + .context("launch walredo process")?, + ); + let duration = start.elapsed(); + WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64()); + info!( + duration_ms = duration.as_millis(), + pid = proc.id(), + "launched walredo process" + ); + self.redo_process.set(Arc::clone(&proc), permit); + proc } - } - poison_guard.disarm(); - anyhow::Ok(proc) - }.await?; + }; let started_at = std::time::Instant::now(); @@ -308,21 +281,15 @@ impl PostgresRedoManager { // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. { - let mut lock_guard = self.redo_process.write().await; - let mut poison_guard = lock_guard.check_and_arm().unwrap(); - let maybe_proc_mut = poison_guard.data_mut(); - match &*maybe_proc_mut { - Some(current_field_value) => { - if Arc::ptr_eq(current_field_value, &proc) { + match self.redo_process.get() { + None => (), + Some(guard) => { + if Arc::ptr_eq(&proc, &*guard) { // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - *maybe_proc_mut = None; + guard.take_and_deinit(); } } - None => { - // Another thread was faster to observe the error, and already took the process out of rotation. - } } - poison_guard.disarm(); } // NB: there may still be other concurrent threads using `proc`. // The last one will send SIGKILL when the underlying Arc reaches refcount 0.