From d93b505101f0e4a3ca5233ffe8119aae9e6df049 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 Apr 2024 09:58:24 +0000 Subject: [PATCH 1/5] fix(walredo spawn): coalescing stalls other executors std::sync::RwLock Before this PR, we were use a std::sync::RwLock to coalesce multiple callers on one walredo spawning. One thread would win the write lock and others would queue up either at the read() or write() lock call. In a scenario where a compute initiates multiple getpage requests from different Postgres backends (= different page_service conns), and we don't have a walredo process around, this means all these page_service handler tasks will enter the spawning code path, one of them will do the spawning, and the others will stall the executor because they do a blocking read()/write() lock call. Changes ------- This PR fixes that scenario by switching to a tokio::sync::RwLock. I retain the poisoning behavior of the std::sync::RwLock by adding a custom `Poison` wrapper type, which I originally developed for async walredo PR #6548. Performance ----------- - [ ] Benchmark this implementation (bench_walredo should suffice work) - [ ] Compare with heavier_once_cell Risks ----- As "usual", replacing a std::sync primitive with a tokio::sync one risks exposing concurrency that was previously implicitly limited to the number of executor threads. This would be the first one for walredo. The risk is that we get descheduled while the reconstruct data is already there. That could pile up reconstruct data. In practice, I think the risk is low because once we get scheduled again, we'll likely have a ready walredo process ready, and there is no further await point until walredo is compelte and the reconstruct data has been dropped. This will change with async walredo PR #6548, and I'm well aware of it in that PR. Alternatives ------------ An implementation using heavier_once_cell could also work TBD: benchmark. --- Cargo.lock | 14 +++ Cargo.toml | 1 + libs/utils/Cargo.toml | 1 + libs/utils/src/lib.rs | 2 + libs/utils/src/poison.rs | 135 ++++++++++++++++++++++++++++ pageserver/benches/bench_walredo.rs | 34 +++---- pageserver/src/http/routes.rs | 2 +- pageserver/src/tenant.rs | 16 ++-- pageserver/src/tenant/tasks.rs | 2 +- pageserver/src/walredo.rs | 128 ++++++++++++++++---------- 10 files changed, 264 insertions(+), 71 deletions(-) create mode 100644 libs/utils/src/poison.rs diff --git a/Cargo.lock b/Cargo.lock index 7fef2ebf22ae..fbf42f8d48a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6104,6 +6104,19 @@ dependencies = [ "xattr", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.20.0" @@ -6595,6 +6608,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tar", + "tokio-test", "tokio-util", "tracing", "tracing-error", diff --git a/Cargo.toml b/Cargo.toml index 9f24176c65f5..2d49581f9f4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -170,6 +170,7 @@ tokio-postgres-rustls = "0.11.0" tokio-rustls = "0.25" tokio-stream = "0.1" tokio-tar = "0.3" +tokio-test = "0.4.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.7" toml_edit = "0.19" diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index c2d9d9d39677..b3b50461da2a 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -70,6 +70,7 @@ criterion.workspace = true hex-literal.workspace = true camino-tempfile.workspace = true serde_assert.workspace = true +tokio-test.workspace = true [[bench]] name = "benchmarks" diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 04ce0626c84a..336ee8757042 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -89,6 +89,8 @@ pub mod yielding_loop; pub mod zstd; +pub mod poison; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/libs/utils/src/poison.rs b/libs/utils/src/poison.rs new file mode 100644 index 000000000000..43b671d9900f --- /dev/null +++ b/libs/utils/src/poison.rs @@ -0,0 +1,135 @@ +//! Protect a piece of state from reuse after it is left in an inconsistent state. +//! +//! # Example +//! +//! ``` +//! # tokio_test::block_on(async { +//! use utils::poison::Poison; +//! use std::time::Duration; +//! +//! struct State { +//! clean: bool, +//! } +//! let state = tokio::sync::Mutex::new(Poison::new("mystate", State { clean: true })); +//! +//! let mut mutex_guard = state.lock().await; +//! let mut poison_guard = mutex_guard.check_and_arm()?; +//! let state = poison_guard.data_mut(); +//! state.clean = false; +//! // If we get cancelled at this await point, subsequent check_and_arm() calls will fail. +//! tokio::time::sleep(Duration::from_secs(10)).await; +//! state.clean = true; +//! poison_guard.disarm(); +//! # Ok::<(), utils::poison::Error>(()) +//! # }); +//! ``` + +use tracing::warn; + +pub struct Poison { + what: &'static str, + state: State, + data: T, +} + +#[derive(Clone, Copy)] +enum State { + Clean, + Armed, + Poisoned { at: chrono::DateTime }, +} + +impl Poison { + /// We log `what` `warning!` level if the [`Guard`] gets dropped without being [`Guard::disarm`]ed. + pub fn new(what: &'static str, data: T) -> Self { + Self { + what, + state: State::Clean, + data, + } + } + + /// Check for poisoning and return a [`Guard`] that provides access to the wrapped state. + pub fn check_and_arm(&mut self) -> Result, Error> { + match self.state { + State::Clean => { + self.state = State::Armed; + Ok(Guard(self)) + } + State::Armed => unreachable!("transient state"), + State::Poisoned { at } => Err(Error::Poisoned { + what: self.what, + at, + }), + } + } + + pub fn try_peek(&self, f: F) -> Result + where + F: FnOnce(&T) -> R, + { + match self.state { + State::Clean => Ok(f(&self.data)), + State::Armed => unreachable!("transient state"), + State::Poisoned { at } => Err(Error::Poisoned { + what: self.what, + at, + }), + } + } +} + +/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state. +/// Once modifications are done, use [`Self::disarm`]. +/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned +/// and subsequent calls to [`Poison::check_and_arm`] will fail with an error. +pub struct Guard<'a, T>(&'a mut Poison); + +impl<'a, T> Guard<'a, T> { + pub fn data(&self) -> &T { + &self.0.data + } + pub fn data_mut(&mut self) -> &mut T { + &mut self.0.data + } + + pub fn disarm(self) { + match self.0.state { + State::Clean => unreachable!("we set it to Armed in check_and_arm()"), + State::Armed => { + self.0.state = State::Clean; + } + State::Poisoned { at } => { + unreachable!("we fail check_and_arm() if it's in that state: {at}") + } + } + } +} + +impl<'a, T> Drop for Guard<'a, T> { + fn drop(&mut self) { + match self.0.state { + State::Clean => { + // set by disarm() + } + State::Armed => { + // still armed => poison it + let at = chrono::Utc::now(); + self.0.state = State::Poisoned { at }; + warn!(at=?at, "poisoning {}", self.0.what); + } + State::Poisoned { at } => { + unreachable!("we fail check_and_arm() if it's in that state: {at}") + } + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("poisoned at {at}: {what}")] + Poisoned { + what: &'static str, + at: chrono::DateTime, + }, +} diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 3efad546a6f0..f1897533bfaa 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -27,25 +27,25 @@ //! //! # Reference Numbers //! -//! 2024-03-20 on i3en.3xlarge +//! 2024-04-04 on i3en.3xlarge //! //! ```text -//! short/1 time: [26.483 µs 26.614 µs 26.767 µs] -//! short/2 time: [32.223 µs 32.465 µs 32.767 µs] -//! short/4 time: [47.203 µs 47.583 µs 47.984 µs] -//! short/8 time: [89.135 µs 89.612 µs 90.139 µs] -//! short/16 time: [190.12 µs 191.52 µs 192.88 µs] -//! short/32 time: [380.96 µs 382.63 µs 384.20 µs] -//! short/64 time: [736.86 µs 741.07 µs 745.03 µs] -//! short/128 time: [1.4106 ms 1.4206 ms 1.4294 ms] -//! medium/1 time: [111.81 µs 112.25 µs 112.79 µs] -//! medium/2 time: [158.26 µs 159.13 µs 160.21 µs] -//! medium/4 time: [334.65 µs 337.14 µs 340.07 µs] -//! medium/8 time: [675.32 µs 679.91 µs 685.25 µs] -//! medium/16 time: [1.2929 ms 1.2996 ms 1.3067 ms] -//! medium/32 time: [2.4295 ms 2.4461 ms 2.4623 ms] -//! medium/64 time: [4.3973 ms 4.4458 ms 4.4875 ms] -//! medium/128 time: [7.5955 ms 7.7847 ms 7.9481 ms] +//! short/1 time: [25.587 µs 25.726 µs 25.877 µs] +//! short/2 time: [31.173 µs 31.354 µs 31.559 µs] +//! short/4 time: [44.822 µs 45.128 µs 45.461 µs] +//! short/8 time: [82.673 µs 83.307 µs 84.003 µs] +//! short/16 time: [187.79 µs 189.00 µs 190.27 µs] +//! short/32 time: [387.00 µs 388.68 µs 390.46 µs] +//! short/64 time: [767.13 µs 770.66 µs 774.44 µs] +//! short/128 time: [1.5013 ms 1.5074 ms 1.5135 ms] +//! medium/1 time: [105.68 µs 106.08 µs 106.57 µs] +//! medium/2 time: [152.34 µs 152.84 µs 153.47 µs] +//! medium/4 time: [324.52 µs 325.81 µs 327.49 µs] +//! medium/8 time: [643.80 µs 646.64 µs 649.88 µs] +//! medium/16 time: [1.2579 ms 1.2630 ms 1.2686 ms] +//! medium/32 time: [2.4159 ms 2.4286 ms 2.4414 ms] +//! medium/64 time: [4.6483 ms 4.6821 ms 4.7140 ms] +//! medium/128 time: [8.7167 ms 8.8319 ms 8.9400 ms] //! ``` use bytes::{Buf, Bytes}; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 47d8ae114891..36f80eb3ccc1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1028,7 +1028,7 @@ async fn tenant_status( attachment_status: state.attachment_status(), generation: tenant.generation().into(), }, - walredo: tenant.wal_redo_manager_status(), + walredo: tenant.wal_redo_manager_status().await, timelines: tenant.list_timeline_ids(), }) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 17ff033e00c5..d0a643aff65d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -350,9 +350,9 @@ impl From for WalRedoManager { } impl WalRedoManager { - pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { match self { - Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), + Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await, #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager @@ -384,9 +384,9 @@ impl WalRedoManager { } } - pub(crate) fn status(&self) -> Option { + pub(crate) async fn status(&self) -> Option { match self { - WalRedoManager::Prod(m) => m.status(), + WalRedoManager::Prod(m) => m.status().await, #[cfg(test)] WalRedoManager::Test(_) => None, } @@ -1683,8 +1683,12 @@ impl Tenant { self.generation } - pub(crate) fn wal_redo_manager_status(&self) -> Option { - self.walredo_mgr.as_ref().and_then(|mgr| mgr.status()) + pub(crate) async fn wal_redo_manager_status(&self) -> Option { + if let Some(mgr) = self.walredo_mgr.as_ref() { + mgr.status().await + } else { + None + } } /// Changes tenant status to active, unless shutdown was already requested. diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index e4f5f7513288..b28bd06089da 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -208,7 +208,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // Perhaps we did no work and the walredo process has been idle for some time: // give it a chance to shut down to avoid leaving walredo process running indefinitely. if let Some(walredo_mgr) = &tenant.walredo_mgr { - walredo_mgr.maybe_quiesce(period * 10); + walredo_mgr.maybe_quiesce(period * 10).await; } // TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 0004f4f3c968..f406ef3ab866 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -36,11 +36,12 @@ use bytes::{Bytes, BytesMut}; use pageserver_api::key::key_to_rel_block; use pageserver_api::models::WalRedoManagerStatus; use pageserver_api::shard::TenantShardId; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use tracing::*; use utils::lsn::Lsn; +use utils::poison::Poison; /// /// This is the real implementation that uses a Postgres process to @@ -53,7 +54,7 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: RwLock>>, + redo_process: tokio::sync::RwLock>>>, } /// @@ -101,6 +102,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await }; img = Some(result?); @@ -121,10 +123,11 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await } } - pub(crate) fn status(&self) -> Option { + pub(crate) async fn status(&self) -> Option { Some(WalRedoManagerStatus { last_redo_at: { let at = *self.last_redo_at.lock().unwrap(); @@ -134,7 +137,12 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()), + pid: self + .redo_process + .read() + .await + .try_peek(|maybe_proc| maybe_proc.as_ref().map(|p| p.id())) + .unwrap(), }) } } @@ -152,30 +160,46 @@ impl PostgresRedoManager { tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), - redo_process: RwLock::new(None), + redo_process: tokio::sync::RwLock::new(Poison::new("redo_process field", None)), } } /// This type doesn't have its own background task to check for idleness: we /// rely on our owner calling this function periodically in its own housekeeping /// loops. - pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { - if let Ok(g) = self.last_redo_at.try_lock() { + pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { + // awkward control flow because rustc isn't smart enough to detect that we don't + // hold the std::sync lock guard across the await point + let kill = if let Ok(g) = self.last_redo_at.try_lock() { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - let mut guard = self.redo_process.write().unwrap(); - *guard = None; + true + } else { + false } + } else { + false } + } else { + false + }; + if kill { + let mut lock_guard = self.redo_process.write().await; + let mut poison_guard = lock_guard.check_and_arm().unwrap(); + *poison_guard.data_mut() = None; + poison_guard.disarm(); } } /// /// Process one request for WAL redo using wal-redo postgres /// + /// # Cancel-Safety + /// + /// Cancellation safe. #[allow(clippy::too_many_arguments)] - fn apply_batch_postgres( + async fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -192,41 +216,50 @@ impl PostgresRedoManager { let mut n_attempts = 0u32; loop { // launch the WAL redo process on first use - let proc: Arc = { - let proc_guard = self.redo_process.read().unwrap(); - match &*proc_guard { - None => { - // "upgrade" to write lock to launch the process - drop(proc_guard); - let mut proc_guard = self.redo_process.write().unwrap(); - match &*proc_guard { - None => { - let start = Instant::now(); - let proc = Arc::new( - process::WalRedoProcess::launch( - self.conf, - self.tenant_shard_id, - pg_version, - ) - .context("launch walredo process")?, - ); - let duration = start.elapsed(); - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM - .observe(duration.as_secs_f64()); - info!( - duration_ms = duration.as_millis(), - pid = proc.id(), - "launched walredo process" - ); - *proc_guard = Some(Arc::clone(&proc)); - proc - } - Some(proc) => Arc::clone(proc), - } + let proc: Arc = async move { + let lock_guard = self.redo_process.read().await; + if let Some(proc) = lock_guard.try_peek(Option::clone).unwrap() { + // hot path + return anyhow::Ok(proc); + } + // slow path + // "upgrade" to write lock to launch the process + drop(lock_guard); + let mut lock_guard = self.redo_process.write().await; + if let Some(proc) = lock_guard.try_peek(Option::clone).unwrap() { + // we coalesced onto another task runnning this code + return anyhow::Ok(proc); + } + // don't hold poison_guard, the launch code can bail + let start = Instant::now(); + let proc = Arc::new( + process::WalRedoProcess::launch( + self.conf, + self.tenant_shard_id, + pg_version, + ) + .context("launch walredo process")?, + ); + let duration = start.elapsed(); + WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM + .observe(duration.as_secs_f64()); + info!( + duration_ms = duration.as_millis(), + pid = proc.id(), + "launched walredo process" + ); + // + let mut poison_guard = lock_guard.check_and_arm().unwrap(); + let replaced = poison_guard.data_mut().replace(Arc::clone(&proc)); + match replaced { + None => (), + Some(replaced) => { + unreachable!("the check after acquiring the write lock should prevent htis from happening: {}", replaced.id()); } - Some(proc) => Arc::clone(proc), } - }; + poison_guard.disarm(); + anyhow::Ok(proc) + }.await?; let started_at = std::time::Instant::now(); @@ -275,18 +308,21 @@ impl PostgresRedoManager { // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. { - let mut guard = self.redo_process.write().unwrap(); - match &*guard { + let mut lock_guard = self.redo_process.write().await; + let mut poison_guard = lock_guard.check_and_arm().unwrap(); + let maybe_proc_mut = poison_guard.data_mut(); + match &*maybe_proc_mut { Some(current_field_value) => { if Arc::ptr_eq(current_field_value, &proc) { // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - *guard = None; + *maybe_proc_mut = None; } } None => { // Another thread was faster to observe the error, and already took the process out of rotation. } } + poison_guard.disarm(); } // NB: there may still be other concurrent threads using `proc`. // The last one will send SIGKILL when the underlying Arc reaches refcount 0. From 789b04e9dfc553ee565806e8dcf82b58a7668846 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 Apr 2024 10:54:56 +0000 Subject: [PATCH 2/5] switch implementation to heavier_once_cell --- Cargo.lock | 14 --- libs/utils/Cargo.toml | 1 - libs/utils/src/lib.rs | 2 - libs/utils/src/poison.rs | 135 ---------------------------- pageserver/benches/bench_walredo.rs | 32 +++---- pageserver/src/tenant.rs | 4 +- pageserver/src/tenant/tasks.rs | 2 +- pageserver/src/walredo.rs | 129 ++++++++++---------------- 8 files changed, 67 insertions(+), 252 deletions(-) delete mode 100644 libs/utils/src/poison.rs diff --git a/Cargo.lock b/Cargo.lock index fbf42f8d48a2..7fef2ebf22ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6104,19 +6104,6 @@ dependencies = [ "xattr", ] -[[package]] -name = "tokio-test" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" -dependencies = [ - "async-stream", - "bytes", - "futures-core", - "tokio", - "tokio-stream", -] - [[package]] name = "tokio-tungstenite" version = "0.20.0" @@ -6608,7 +6595,6 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tar", - "tokio-test", "tokio-util", "tracing", "tracing-error", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index b3b50461da2a..c2d9d9d39677 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -70,7 +70,6 @@ criterion.workspace = true hex-literal.workspace = true camino-tempfile.workspace = true serde_assert.workspace = true -tokio-test.workspace = true [[bench]] name = "benchmarks" diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 336ee8757042..04ce0626c84a 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -89,8 +89,6 @@ pub mod yielding_loop; pub mod zstd; -pub mod poison; - /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/libs/utils/src/poison.rs b/libs/utils/src/poison.rs deleted file mode 100644 index 43b671d9900f..000000000000 --- a/libs/utils/src/poison.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! Protect a piece of state from reuse after it is left in an inconsistent state. -//! -//! # Example -//! -//! ``` -//! # tokio_test::block_on(async { -//! use utils::poison::Poison; -//! use std::time::Duration; -//! -//! struct State { -//! clean: bool, -//! } -//! let state = tokio::sync::Mutex::new(Poison::new("mystate", State { clean: true })); -//! -//! let mut mutex_guard = state.lock().await; -//! let mut poison_guard = mutex_guard.check_and_arm()?; -//! let state = poison_guard.data_mut(); -//! state.clean = false; -//! // If we get cancelled at this await point, subsequent check_and_arm() calls will fail. -//! tokio::time::sleep(Duration::from_secs(10)).await; -//! state.clean = true; -//! poison_guard.disarm(); -//! # Ok::<(), utils::poison::Error>(()) -//! # }); -//! ``` - -use tracing::warn; - -pub struct Poison { - what: &'static str, - state: State, - data: T, -} - -#[derive(Clone, Copy)] -enum State { - Clean, - Armed, - Poisoned { at: chrono::DateTime }, -} - -impl Poison { - /// We log `what` `warning!` level if the [`Guard`] gets dropped without being [`Guard::disarm`]ed. - pub fn new(what: &'static str, data: T) -> Self { - Self { - what, - state: State::Clean, - data, - } - } - - /// Check for poisoning and return a [`Guard`] that provides access to the wrapped state. - pub fn check_and_arm(&mut self) -> Result, Error> { - match self.state { - State::Clean => { - self.state = State::Armed; - Ok(Guard(self)) - } - State::Armed => unreachable!("transient state"), - State::Poisoned { at } => Err(Error::Poisoned { - what: self.what, - at, - }), - } - } - - pub fn try_peek(&self, f: F) -> Result - where - F: FnOnce(&T) -> R, - { - match self.state { - State::Clean => Ok(f(&self.data)), - State::Armed => unreachable!("transient state"), - State::Poisoned { at } => Err(Error::Poisoned { - what: self.what, - at, - }), - } - } -} - -/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state. -/// Once modifications are done, use [`Self::disarm`]. -/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned -/// and subsequent calls to [`Poison::check_and_arm`] will fail with an error. -pub struct Guard<'a, T>(&'a mut Poison); - -impl<'a, T> Guard<'a, T> { - pub fn data(&self) -> &T { - &self.0.data - } - pub fn data_mut(&mut self) -> &mut T { - &mut self.0.data - } - - pub fn disarm(self) { - match self.0.state { - State::Clean => unreachable!("we set it to Armed in check_and_arm()"), - State::Armed => { - self.0.state = State::Clean; - } - State::Poisoned { at } => { - unreachable!("we fail check_and_arm() if it's in that state: {at}") - } - } - } -} - -impl<'a, T> Drop for Guard<'a, T> { - fn drop(&mut self) { - match self.0.state { - State::Clean => { - // set by disarm() - } - State::Armed => { - // still armed => poison it - let at = chrono::Utc::now(); - self.0.state = State::Poisoned { at }; - warn!(at=?at, "poisoning {}", self.0.what); - } - State::Poisoned { at } => { - unreachable!("we fail check_and_arm() if it's in that state: {at}") - } - } - } -} - -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("poisoned at {at}: {what}")] - Poisoned { - what: &'static str, - at: chrono::DateTime, - }, -} diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index f1897533bfaa..ffe607be4b7c 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -30,22 +30,22 @@ //! 2024-04-04 on i3en.3xlarge //! //! ```text -//! short/1 time: [25.587 µs 25.726 µs 25.877 µs] -//! short/2 time: [31.173 µs 31.354 µs 31.559 µs] -//! short/4 time: [44.822 µs 45.128 µs 45.461 µs] -//! short/8 time: [82.673 µs 83.307 µs 84.003 µs] -//! short/16 time: [187.79 µs 189.00 µs 190.27 µs] -//! short/32 time: [387.00 µs 388.68 µs 390.46 µs] -//! short/64 time: [767.13 µs 770.66 µs 774.44 µs] -//! short/128 time: [1.5013 ms 1.5074 ms 1.5135 ms] -//! medium/1 time: [105.68 µs 106.08 µs 106.57 µs] -//! medium/2 time: [152.34 µs 152.84 µs 153.47 µs] -//! medium/4 time: [324.52 µs 325.81 µs 327.49 µs] -//! medium/8 time: [643.80 µs 646.64 µs 649.88 µs] -//! medium/16 time: [1.2579 ms 1.2630 ms 1.2686 ms] -//! medium/32 time: [2.4159 ms 2.4286 ms 2.4414 ms] -//! medium/64 time: [4.6483 ms 4.6821 ms 4.7140 ms] -//! medium/128 time: [8.7167 ms 8.8319 ms 8.9400 ms] +//! short/1 time: [25.925 µs 26.060 µs 26.209 µs] +//! short/2 time: [31.277 µs 31.483 µs 31.722 µs] +//! short/4 time: [45.496 µs 45.831 µs 46.182 µs] +//! short/8 time: [84.298 µs 84.920 µs 85.566 µs] +//! short/16 time: [185.04 µs 186.41 µs 187.88 µs] +//! short/32 time: [385.01 µs 386.77 µs 388.70 µs] +//! short/64 time: [770.24 µs 773.04 µs 776.04 µs] +//! short/128 time: [1.5017 ms 1.5064 ms 1.5113 ms] +//! medium/1 time: [106.65 µs 107.20 µs 107.85 µs] +//! medium/2 time: [153.28 µs 154.24 µs 155.56 µs] +//! medium/4 time: [325.67 µs 327.01 µs 328.71 µs] +//! medium/8 time: [646.82 µs 650.17 µs 653.91 µs] +//! medium/16 time: [1.2645 ms 1.2701 ms 1.2762 ms] +//! medium/32 time: [2.4409 ms 2.4550 ms 2.4692 ms] +//! medium/64 time: [4.6814 ms 4.7114 ms 4.7408 ms] +//! medium/128 time: [8.7790 ms 8.9037 ms 9.0282 ms] //! ``` use bytes::{Buf, Bytes}; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d0a643aff65d..62ea83905d53 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -350,9 +350,9 @@ impl From for WalRedoManager { } impl WalRedoManager { - pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { match self { - Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await, + Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index b28bd06089da..e4f5f7513288 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -208,7 +208,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // Perhaps we did no work and the walredo process has been idle for some time: // give it a chance to shut down to avoid leaving walredo process running indefinitely. if let Some(walredo_mgr) = &tenant.walredo_mgr { - walredo_mgr.maybe_quiesce(period * 10).await; + walredo_mgr.maybe_quiesce(period * 10); } // TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f406ef3ab866..c23bb6c1783b 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -41,7 +41,7 @@ use std::time::Duration; use std::time::Instant; use tracing::*; use utils::lsn::Lsn; -use utils::poison::Poison; +use utils::sync::heavier_once_cell; /// /// This is the real implementation that uses a Postgres process to @@ -54,7 +54,19 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: tokio::sync::RwLock>>>, + /// The current [`process::WalRedoProcess`] that is used by new redo requests. + /// We use [`heavier_once_cell`] for coalescing the spawning, but the redo + /// requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the + /// their process object; we use [`Arc::clone`] for that. + /// This is primarily because earlier implementations that didn't use [`heavier_once_cell`] + /// had that behavior; it's probably unnecessary. + /// The only merit of it is that if one walredo process encounters an error, + /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`]. + /// and retry redo, thereby starting the new process, while other redo tasks might + /// still be using the old redo process. But, those other tasks will most likely + /// encounter an error as well, and errors are an unexpected condition anyway. + /// So, probably we could get rid of the `Arc` in the future. + redo_process: heavier_once_cell::OnceCell>, } /// @@ -137,12 +149,7 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - pid: self - .redo_process - .read() - .await - .try_peek(|maybe_proc| maybe_proc.as_ref().map(|p| p.id())) - .unwrap(), + pid: self.redo_process.get().map(|p| p.id()), }) } } @@ -160,35 +167,21 @@ impl PostgresRedoManager { tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), - redo_process: tokio::sync::RwLock::new(Poison::new("redo_process field", None)), + redo_process: heavier_once_cell::OnceCell::default(), } } /// This type doesn't have its own background task to check for idleness: we /// rely on our owner calling this function periodically in its own housekeeping /// loops. - pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) { - // awkward control flow because rustc isn't smart enough to detect that we don't - // hold the std::sync lock guard across the await point - let kill = if let Ok(g) = self.last_redo_at.try_lock() { + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { + if let Ok(g) = self.last_redo_at.try_lock() { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - true - } else { - false + drop(self.redo_process.get().map(|guard| guard.take_and_deinit())); } - } else { - false } - } else { - false - }; - if kill { - let mut lock_guard = self.redo_process.write().await; - let mut poison_guard = lock_guard.check_and_arm().unwrap(); - *poison_guard.data_mut() = None; - poison_guard.disarm(); } } @@ -215,51 +208,31 @@ impl PostgresRedoManager { const MAX_RETRY_ATTEMPTS: u32 = 1; let mut n_attempts = 0u32; loop { - // launch the WAL redo process on first use - let proc: Arc = async move { - let lock_guard = self.redo_process.read().await; - if let Some(proc) = lock_guard.try_peek(Option::clone).unwrap() { - // hot path - return anyhow::Ok(proc); - } - // slow path - // "upgrade" to write lock to launch the process - drop(lock_guard); - let mut lock_guard = self.redo_process.write().await; - if let Some(proc) = lock_guard.try_peek(Option::clone).unwrap() { - // we coalesced onto another task runnning this code - return anyhow::Ok(proc); - } - // don't hold poison_guard, the launch code can bail - let start = Instant::now(); - let proc = Arc::new( - process::WalRedoProcess::launch( - self.conf, - self.tenant_shard_id, - pg_version, - ) - .context("launch walredo process")?, - ); - let duration = start.elapsed(); - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM - .observe(duration.as_secs_f64()); - info!( - duration_ms = duration.as_millis(), - pid = proc.id(), - "launched walredo process" - ); - // - let mut poison_guard = lock_guard.check_and_arm().unwrap(); - let replaced = poison_guard.data_mut().replace(Arc::clone(&proc)); - match replaced { - None => (), - Some(replaced) => { - unreachable!("the check after acquiring the write lock should prevent htis from happening: {}", replaced.id()); + let proc: Arc = + match self.redo_process.get_or_init_detached().await { + Ok(guard) => Arc::clone(&guard), + Err(permit) => { + // don't hold poison_guard, the launch code can bail + let start = Instant::now(); + let proc = Arc::new( + process::WalRedoProcess::launch( + self.conf, + self.tenant_shard_id, + pg_version, + ) + .context("launch walredo process")?, + ); + let duration = start.elapsed(); + WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64()); + info!( + duration_ms = duration.as_millis(), + pid = proc.id(), + "launched walredo process" + ); + self.redo_process.set(Arc::clone(&proc), permit); + proc } - } - poison_guard.disarm(); - anyhow::Ok(proc) - }.await?; + }; let started_at = std::time::Instant::now(); @@ -308,21 +281,15 @@ impl PostgresRedoManager { // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. { - let mut lock_guard = self.redo_process.write().await; - let mut poison_guard = lock_guard.check_and_arm().unwrap(); - let maybe_proc_mut = poison_guard.data_mut(); - match &*maybe_proc_mut { - Some(current_field_value) => { - if Arc::ptr_eq(current_field_value, &proc) { + match self.redo_process.get() { + None => (), + Some(guard) => { + if Arc::ptr_eq(&proc, &*guard) { // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - *maybe_proc_mut = None; + guard.take_and_deinit(); } } - None => { - // Another thread was faster to observe the error, and already took the process out of rotation. - } } - poison_guard.disarm(); } // NB: there may still be other concurrent threads using `proc`. // The last one will send SIGKILL when the underlying Arc reaches refcount 0. From 5a64ae0c75e8639d3f3aebefac1cd83995660ac5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 Apr 2024 14:53:46 +0000 Subject: [PATCH 3/5] make redo_process take-out-of-rotation code path less confusing --- pageserver/src/walredo.rs | 43 +++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index c23bb6c1783b..32516cf2be8a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -278,31 +278,34 @@ impl PostgresRedoManager { n_attempts, e, ); - // Avoid concurrent callers hitting the same issue. - // We can't prevent it from happening because we want to enable parallelism. - { - match self.redo_process.get() { - None => (), - Some(guard) => { - if Arc::ptr_eq(&proc, &*guard) { - // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - guard.take_and_deinit(); - } - } - } - } + // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation. + // Note that there may be other tasks concurrent with us that also hold `proc`. + // We have to deal with that here. + // Also read the doc comment on field `self.redo_process`. + // // NB: there may still be other concurrent threads using `proc`. // The last one will send SIGKILL when the underlying Arc reaches refcount 0. - // NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep - // holding the lock while waiting for the process to exit. - // NB: the drop impl blocks the current threads with a wait() system call for - // the child process. We dropped the `guard` above so that other threads aren't - // affected. But, it's good that the current thread _does_ block to wait. - // If we instead deferred the waiting into the background / to tokio, it could - // happen that if walredo always fails immediately, we spawn processes faster + // + // NB: the drop impl blocks the dropping thread with a wait() system call for + // the child process. In some ways the blocking is actually good: if we + // deferred the waiting into the background / to tokio if we used `tokio::process`, + // it could happen that if walredo always fails immediately, we spawn processes faster // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. // This probably needs revisiting at some later point. + match self.redo_process.get() { + None => (), + Some(guard) => { + if Arc::ptr_eq(&proc, &*guard) { + // We're the first to observe an error from `proc`, it's our job to take it out of rotation. + guard.take_and_deinit(); + } else { + // Another task already spawned another redo process (further up in this method) + // and put it into `redo_process`. Do nothing, our view of the world is behind. + } + } + } + // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall. drop(proc); } else if n_attempts != 0 { info!(n_attempts, "retried walredo succeeded"); From 806c957dc72afc02a663a0d05181d0efb69dc767 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 Apr 2024 14:55:04 +0000 Subject: [PATCH 4/5] fixup switch to heavier_once_cell: remove unused tokio-test dependency --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 2d49581f9f4f..9f24176c65f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -170,7 +170,6 @@ tokio-postgres-rustls = "0.11.0" tokio-rustls = "0.25" tokio-stream = "0.1" tokio-tar = "0.3" -tokio-test = "0.4.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.7" toml_edit = "0.19" From 4e4f24697f337df85eea131ee9499e92988b678f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 Apr 2024 14:58:11 +0000 Subject: [PATCH 5/5] fixup switch to heavier_once_cell: undo changes related to no need for async fns --- pageserver/src/http/routes.rs | 2 +- pageserver/src/tenant.rs | 12 ++++-------- pageserver/src/walredo.rs | 2 +- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 36f80eb3ccc1..47d8ae114891 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1028,7 +1028,7 @@ async fn tenant_status( attachment_status: state.attachment_status(), generation: tenant.generation().into(), }, - walredo: tenant.wal_redo_manager_status().await, + walredo: tenant.wal_redo_manager_status(), timelines: tenant.list_timeline_ids(), }) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 62ea83905d53..17ff033e00c5 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -384,9 +384,9 @@ impl WalRedoManager { } } - pub(crate) async fn status(&self) -> Option { + pub(crate) fn status(&self) -> Option { match self { - WalRedoManager::Prod(m) => m.status().await, + WalRedoManager::Prod(m) => m.status(), #[cfg(test)] WalRedoManager::Test(_) => None, } @@ -1683,12 +1683,8 @@ impl Tenant { self.generation } - pub(crate) async fn wal_redo_manager_status(&self) -> Option { - if let Some(mgr) = self.walredo_mgr.as_ref() { - mgr.status().await - } else { - None - } + pub(crate) fn wal_redo_manager_status(&self) -> Option { + self.walredo_mgr.as_ref().and_then(|mgr| mgr.status()) } /// Changes tenant status to active, unless shutdown was already requested. diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 32516cf2be8a..ca41a576fd36 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -139,7 +139,7 @@ impl PostgresRedoManager { } } - pub(crate) async fn status(&self) -> Option { + pub(crate) fn status(&self) -> Option { Some(WalRedoManagerStatus { last_redo_at: { let at = *self.last_redo_at.lock().unwrap();