From 31d4d1e233c1572bd5715bde8c58477d8bfaa285 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 10:49:31 +0200 Subject: [PATCH 01/11] env_config from PR #6125 --- control_plane/src/background_process.rs | 14 +++++++- libs/utils/src/env_config.rs | 48 +++++++++++++++++++++++++ libs/utils/src/lib.rs | 1 + libs/utils/src/logging.rs | 8 ++++- 4 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 libs/utils/src/env_config.rs diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 2fced7d77869..fbd739ac9ef6 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -86,7 +86,10 @@ where .stdout(process_log_file) .stderr(same_file_for_stderr) .args(args); - let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command)); + + let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars( + fill_rust_env_vars(background_command), + )); filled_cmd.envs(envs); let pid_file_to_check = match &initial_pid_file { @@ -268,6 +271,15 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command { cmd } +fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command { + for (var, val) in std::env::vars() { + if var.starts_with("NEON_") { + cmd = cmd.env(var, val); + } + } + cmd +} + /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(), /// 1. Claims a pidfile with a fcntl lock on it and /// 2. Sets up the pidfile's file descriptor so that it (and the lock) diff --git a/libs/utils/src/env_config.rs b/libs/utils/src/env_config.rs new file mode 100644 index 000000000000..823a49796c28 --- /dev/null +++ b/libs/utils/src/env_config.rs @@ -0,0 +1,48 @@ +use std::{fmt::Display, str::FromStr}; + +pub fn var(varname: &str, default: D) -> V +where + V: FromStr, + E: Display, + D: FnOnce() -> V, +{ + match std::env::var(varname) { + Ok(s) => s + .parse() + .map_err(|e| format!("failed to parse env var {varname}: {e:#}")) + .unwrap(), + Err(std::env::VarError::NotPresent) => default(), + Err(std::env::VarError::NotUnicode(_)) => { + panic!("env var {varname} is not unicode") + } + } +} + +pub struct Bool(bool); + +impl Bool { + pub const fn new_const() -> Self { + Bool(V) + } +} + +impl FromStr for Bool { + type Err = String; + + fn from_str(s: &str) -> Result { + if let Ok(b) = s.parse() { + return Ok(Bool(b)); + } + Ok(Bool(match s { + "0" => false, + "1" => true, + _ => return Err(format!("not a bool, accepting 0|1|{}|{}", false, true)), + })) + } +} + +impl Into for Bool { + fn into(self) -> bool { + self.0 + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 04ce0626c84a..bd04eeea055b 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -27,6 +27,7 @@ pub mod auth; pub mod id; mod hex; +pub mod env_config; pub use hex::Hex; // http endpoint utils diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index f7b73dc9843e..d9531d0438d3 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,10 +1,16 @@ -use std::str::FromStr; +use std::{ + io::BufWriter, + str::FromStr, + sync::{Arc, Mutex}, +}; use anyhow::Context; use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, EnumVariantNames}; +use super::env_config; + #[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum LogFormat { From 43cf9d10d2729806a9eede7701792c351fda389d Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 16:29:53 +0200 Subject: [PATCH 02/11] env_config improvements --- libs/utils/src/env_config.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libs/utils/src/env_config.rs b/libs/utils/src/env_config.rs index 823a49796c28..4331ef1136d4 100644 --- a/libs/utils/src/env_config.rs +++ b/libs/utils/src/env_config.rs @@ -21,8 +21,8 @@ where pub struct Bool(bool); impl Bool { - pub const fn new_const() -> Self { - Bool(V) + pub const fn new(v: bool) -> Self { + Bool(v) } } @@ -41,8 +41,8 @@ impl FromStr for Bool { } } -impl Into for Bool { - fn into(self) -> bool { - self.0 +impl From for bool { + fn from(val: Bool) -> Self { + val.0 } } From dc03f7a44f6a4a1200d6f82200b538b3bf9db5fb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 16:31:16 +0200 Subject: [PATCH 03/11] pageserver: ability to use a single runtime This PR allows running the pageserver with a single tokio runtime. --- libs/utils/src/lib.rs | 2 +- libs/utils/src/logging.rs | 8 +-- pageserver/src/bin/pageserver.rs | 8 +-- pageserver/src/consumption_metrics.rs | 2 +- pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/task_mgr.rs | 59 +++++++++++-------- pageserver/src/tenant/delete.rs | 2 +- pageserver/src/tenant/mgr.rs | 2 +- .../src/tenant/remote_timeline_client.rs | 2 +- pageserver/src/tenant/secondary.rs | 4 +- pageserver/src/tenant/tasks.rs | 4 +- pageserver/src/tenant/timeline.rs | 8 +-- pageserver/src/tenant/timeline/delete.rs | 2 +- .../src/tenant/timeline/eviction_task.rs | 2 +- 14 files changed, 54 insertions(+), 53 deletions(-) diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index bd04eeea055b..673c48e45076 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -26,8 +26,8 @@ pub mod auth; // utility functions and helper traits for unified unique id generation/serialization etc. pub mod id; -mod hex; pub mod env_config; +mod hex; pub use hex::Hex; // http endpoint utils diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index d9531d0438d3..f7b73dc9843e 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,16 +1,10 @@ -use std::{ - io::BufWriter, - str::FromStr, - sync::{Arc, Mutex}, -}; +use std::str::FromStr; use anyhow::Context; use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, EnumVariantNames}; -use super::env_config; - #[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum LogFormat { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index c80230d4d719..073655a5987a 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -391,7 +391,7 @@ fn start_pageserver( conf, ); if let Some(deletion_workers) = deletion_workers { - deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); + deletion_workers.spawn_with(*BACKGROUND_RUNTIME); } // Up to this point no significant I/O has been done: this should have been fast. Record @@ -569,7 +569,7 @@ fn start_pageserver( .with_graceful_shutdown(task_mgr::shutdown_watcher()); task_mgr::spawn( - MGMT_REQUEST_RUNTIME.handle(), + *MGMT_REQUEST_RUNTIME, TaskKind::HttpEndpointListener, None, None, @@ -594,7 +594,7 @@ fn start_pageserver( let local_disk_storage = conf.workdir.join("last_consumption_metrics.json"); task_mgr::spawn( - crate::BACKGROUND_RUNTIME.handle(), + *crate::BACKGROUND_RUNTIME, TaskKind::MetricsCollection, None, None, @@ -647,7 +647,7 @@ fn start_pageserver( DownloadBehavior::Error, ); task_mgr::spawn( - COMPUTE_REQUEST_RUNTIME.handle(), + *COMPUTE_REQUEST_RUNTIME, TaskKind::LibpqEndpointListener, None, None, diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index f5540e896f68..a09da56ee4e4 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -64,7 +64,7 @@ pub async fn collect_metrics( let worker_ctx = ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::CalculateSyntheticSize, None, None, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 6248424cee49..e44ae21a36fc 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -201,7 +201,7 @@ pub fn launch_disk_usage_global_eviction_task( info!("launching disk usage based eviction task"); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::DiskUsageEviction, None, None, diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 0cc5611a124f..864b186e63d9 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -39,7 +39,6 @@ use std::sync::{Arc, Mutex}; use futures::FutureExt; use pageserver_api::shard::TenantShardId; -use tokio::runtime::Runtime; use tokio::task::JoinHandle; use tokio::task_local; use tokio_util::sync::CancellationToken; @@ -48,6 +47,7 @@ use tracing::{debug, error, info, warn}; use once_cell::sync::Lazy; +use utils::env_config; use utils::id::TimelineId; // @@ -98,42 +98,49 @@ use utils::id::TimelineId; // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't // happen, but still. // -pub static COMPUTE_REQUEST_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("compute request worker") - .enable_all() - .build() - .expect("Failed to create compute request runtime") -}); -pub static MGMT_REQUEST_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("mgmt request worker") - .enable_all() - .build() - .expect("Failed to create mgmt request runtime") +static USE_SINGLE_RUNTIME: Lazy = Lazy::new(|| { + env_config::var("NEON_PAGESERVER_USE_SINGLE_RUNTIME", || { + env_config::Bool::new(false) + }) + .into() }); -pub static WALRECEIVER_RUNTIME: Lazy = Lazy::new(|| { +static SINGLE_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() - .thread_name("walreceiver worker") + .thread_name("pageserver worker") .enable_all() .build() - .expect("Failed to create walreceiver runtime") + .expect("failed to create single runtime") }); -pub static BACKGROUND_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("background op worker") - // if you change the number of worker threads please change the constant below - .enable_all() - .build() - .expect("Failed to create background op runtime") -}); +macro_rules! single_runtime_or_multi_thread_enable_all { + ($varname:ident, $name:literal) => { + pub static $varname: Lazy<&'static tokio::runtime::Handle> = Lazy::new(|| { + if *USE_SINGLE_RUNTIME { + SINGLE_RUNTIME.handle() + } else { + static RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name($name) + .enable_all() + .build() + .expect(std::concat!("Failed to create runtime ", $name)) + }); + RUNTIME.handle() + } + }); + }; +} +single_runtime_or_multi_thread_enable_all!(COMPUTE_REQUEST_RUNTIME, "compute request worker"); +single_runtime_or_multi_thread_enable_all!(MGMT_REQUEST_RUNTIME, "mgmt request worker"); +single_runtime_or_multi_thread_enable_all!(WALRECEIVER_RUNTIME, "walreceiver worker"); +// if you change the number of worker threads please change the constant below +single_runtime_or_multi_thread_enable_all!(BACKGROUND_RUNTIME, "background op worker"); pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy = Lazy::new(|| { // force init and thus panics - let _ = BACKGROUND_RUNTIME.handle(); + let _ = *BACKGROUND_RUNTIME; // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly // tokio would had already panicked for parsing errors or NotUnicode // diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index d1881f3897d6..55e670483579 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -485,7 +485,7 @@ impl DeleteTenantFlow { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index b1b46d487bbc..76973efaa820 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1849,7 +1849,7 @@ impl TenantManager { let task_tenant_id = None; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, TaskKind::MgmtRequest, task_tenant_id, None, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 9b1b5e7ed5d3..754395ed0c2e 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -341,7 +341,7 @@ impl RemoteTimelineClient { // remote_timeline_client.rs tests rely on current-thread runtime tokio::runtime::Handle::current() } else { - BACKGROUND_RUNTIME.handle().clone() + BACKGROUND_RUNTIME.clone() }, tenant_shard_id, timeline_id, diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 19f36c722e7b..f74ed8dbe5da 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -317,7 +317,7 @@ pub fn spawn_tasks( tokio::sync::mpsc::channel::>(16); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::SecondaryDownloads, None, None, @@ -338,7 +338,7 @@ pub fn spawn_tasks( ); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::SecondaryUploads, None, None, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index e4f5f7513288..eeb170b260ae 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -85,7 +85,7 @@ pub fn start_background_loops( ) { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::Compaction, Some(tenant_shard_id), None, @@ -109,7 +109,7 @@ pub fn start_background_loops( }, ); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::GarbageCollector, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d3c8c5f66c51..9c2cf666ace1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1962,7 +1962,7 @@ impl Timeline { initdb_optimization_count: 0, }; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, task_mgr::TaskKind::LayerFlushTask, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2324,7 +2324,7 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, task_mgr::TaskKind::InitialLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2502,7 +2502,7 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, task_mgr::TaskKind::OndemandLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -4484,7 +4484,7 @@ impl Timeline { let self_clone = Arc::clone(&self); let task_id = task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, task_mgr::TaskKind::DownloadAllRemoteLayers, Some(self.tenant_shard_id), Some(self.timeline_id), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index af10c1c84b76..f4fcbbdedab3 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -383,7 +383,7 @@ impl DeleteTimelineFlow { let timeline_id = timeline.timeline_id; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), Some(timeline_id), diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 522c5b57de93..78b9dfff4774 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -57,7 +57,7 @@ impl Timeline { let self_clone = Arc::clone(self); let background_tasks_can_start = background_tasks_can_start.cloned(); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::Eviction, Some(self.tenant_shard_id), Some(self.timeline_id), From 3779854f1214e1219121450fce9e8ed2ada87d8a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 17:16:18 +0200 Subject: [PATCH 04/11] rename "single runtime" to "one runtime", allow configuring current_thread and multi_thread:$num_workers --- libs/utils/src/env_config.rs | 20 ++++- pageserver/src/task_mgr.rs | 129 +++++++++++++++++++++++---------- pageserver/src/tenant/tasks.rs | 3 +- 3 files changed, 111 insertions(+), 41 deletions(-) diff --git a/libs/utils/src/env_config.rs b/libs/utils/src/env_config.rs index 4331ef1136d4..be0fd58c57f9 100644 --- a/libs/utils/src/env_config.rs +++ b/libs/utils/src/env_config.rs @@ -1,6 +1,6 @@ use std::{fmt::Display, str::FromStr}; -pub fn var(varname: &str, default: D) -> V +pub fn var_or_else(varname: &str, default: D) -> V where V: FromStr, E: Display, @@ -18,6 +18,24 @@ where } } +pub fn var(varname: &str) -> Option +where + V: FromStr, + E: Display, +{ + match std::env::var(varname) { + Ok(s) => Some( + s.parse() + .map_err(|e| format!("failed to parse env var {varname}: {e:#}")) + .unwrap(), + ), + Err(std::env::VarError::NotPresent) => None, + Err(std::env::VarError::NotUnicode(_)) => { + panic!("env var {varname} is not unicode") + } + } +} + pub struct Bool(bool); impl Bool { diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 864b186e63d9..f584f02e3c23 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -33,7 +33,9 @@ use std::collections::HashMap; use std::fmt; use std::future::Future; +use std::num::NonZeroUsize; use std::panic::AssertUnwindSafe; +use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; @@ -99,57 +101,106 @@ use utils::id::TimelineId; // happen, but still. // -static USE_SINGLE_RUNTIME: Lazy = Lazy::new(|| { - env_config::var("NEON_PAGESERVER_USE_SINGLE_RUNTIME", || { - env_config::Bool::new(false) - }) - .into() +pub(crate) static TOKIO_WORKER_THREADS: Lazy = Lazy::new(|| { + // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly + // tokio would had already panicked for parsing errors or NotUnicode + // + // this will be wrong if any of the runtimes gets their worker threads configured to something + // else, but that has not been needed in a long time. + NonZeroUsize::new( + std::env::var("TOKIO_WORKER_THREADS") + .map(|s| s.parse::().unwrap()) + .unwrap_or_else(|_e| usize::max(2, num_cpus::get())), + ) + .expect("the max() ensures that this is not zero") }); -static SINGLE_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("pageserver worker") - .enable_all() - .build() - .expect("failed to create single runtime") +enum TokioRuntimeMode { + SingleThreaded, + MultiThreaded { num_workers: NonZeroUsize }, +} + +impl FromStr for TokioRuntimeMode { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "current_thread" => Ok(TokioRuntimeMode::SingleThreaded), + "multi_thread:default" => Ok(TokioRuntimeMode::MultiThreaded { + num_workers: *TOKIO_WORKER_THREADS, + }), + s => match s.strip_prefix("multi:") { + Some(suffix) => { + let num_workers = suffix.parse::().map_err(|e| { + format!( + "invalid number of multi-threaded runtime workers ({suffix:?}): {e}", + ) + })?; + Ok(TokioRuntimeMode::MultiThreaded { num_workers }) + } + None => Err(format!("invalid runtime config: {}", s)), + }, + } + } +} + +static ONE_RUNTIME: Lazy> = Lazy::new(|| { + let thread_name = "pageserver worker"; + let Some(mode) = env_config::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else { + // If the env var is not set, leave this static as None. + // The single_ + return None; + }; + Some(match mode { + TokioRuntimeMode::SingleThreaded => tokio::runtime::Builder::new_current_thread() + .thread_name(thread_name) + .enable_all() + .build() + .expect("failed to create single runtime"), + TokioRuntimeMode::MultiThreaded { num_workers } => { + tokio::runtime::Builder::new_multi_thread() + .thread_name(thread_name) + .enable_all() + .worker_threads(num_workers.get()) + .build() + .expect("failed to create single runtime") + } + }) }); -macro_rules! single_runtime_or_multi_thread_enable_all { +/// Declare a lazy static variable named `$varname` that will resolve +/// to a tokio runtime handle. If the env var `NEON_PAGESERVER_USE_ONE_RUNTIME` +/// is set, this will resolve to `ONE_RUNTIME`. Otherwise, the macro invocation +/// declares a separate runtime and the lazy static variable `$varname` +/// will resolve to that separate runtime. +/// +/// The result is is that `$varname.spawn()` will use `ONE_RUNTIME` if +/// `NEON_PAGESERVER_USE_ONE_RUNTIME` is set, and will use the separate runtime +/// otherwise. +macro_rules! pageserver_runtime { ($varname:ident, $name:literal) => { pub static $varname: Lazy<&'static tokio::runtime::Handle> = Lazy::new(|| { - if *USE_SINGLE_RUNTIME { - SINGLE_RUNTIME.handle() - } else { - static RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name($name) - .enable_all() - .build() - .expect(std::concat!("Failed to create runtime ", $name)) - }); - RUNTIME.handle() + if let Some(runtime) = &*ONE_RUNTIME { + return runtime.handle(); } + static RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name($name) + .worker_threads(TOKIO_WORKER_THREADS.get()) + .enable_all() + .build() + .expect(std::concat!("Failed to create runtime ", $name)) + }); + RUNTIME.handle() }); }; } -single_runtime_or_multi_thread_enable_all!(COMPUTE_REQUEST_RUNTIME, "compute request worker"); -single_runtime_or_multi_thread_enable_all!(MGMT_REQUEST_RUNTIME, "mgmt request worker"); -single_runtime_or_multi_thread_enable_all!(WALRECEIVER_RUNTIME, "walreceiver worker"); +pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker"); +pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker"); +pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker"); // if you change the number of worker threads please change the constant below -single_runtime_or_multi_thread_enable_all!(BACKGROUND_RUNTIME, "background op worker"); -pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy = Lazy::new(|| { - // force init and thus panics - let _ = *BACKGROUND_RUNTIME; - // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly - // tokio would had already panicked for parsing errors or NotUnicode - // - // this will be wrong if any of the runtimes gets their worker threads configured to something - // else, but that has not been needed in a long time. - std::env::var("TOKIO_WORKER_THREADS") - .map(|s| s.parse::().unwrap()) - .unwrap_or_else(|_e| usize::max(2, num_cpus::get())) -}); +pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker"); #[derive(Debug, Clone, Copy)] pub struct PageserverTaskId(u64); diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index eeb170b260ae..fd16a12a54d7 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -18,7 +18,7 @@ use utils::{backoff, completion}; static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { - let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS; + let total_threads = task_mgr::TOKIO_WORKER_THREADS.get(); let permits = usize::max( 1, // while a lot of the work is done on spawn_blocking, we still do @@ -72,6 +72,7 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit( loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation ); + // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id(); match CONCURRENT_BACKGROUND_TASKS.acquire().await { Ok(permit) => permit, Err(_closed) => unreachable!("we never close the semaphore"), From 5cf45df6926bb6203f22fec15b9c7edad0b390eb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 17:17:35 +0200 Subject: [PATCH 05/11] remove env_config::Bool --- libs/utils/src/env_config.rs | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/libs/utils/src/env_config.rs b/libs/utils/src/env_config.rs index be0fd58c57f9..4bafbe57a00a 100644 --- a/libs/utils/src/env_config.rs +++ b/libs/utils/src/env_config.rs @@ -35,32 +35,3 @@ where } } } - -pub struct Bool(bool); - -impl Bool { - pub const fn new(v: bool) -> Self { - Bool(v) - } -} - -impl FromStr for Bool { - type Err = String; - - fn from_str(s: &str) -> Result { - if let Ok(b) = s.parse() { - return Ok(Bool(b)); - } - Ok(Bool(match s { - "0" => false, - "1" => true, - _ => return Err(format!("not a bool, accepting 0|1|{}|{}", false, true)), - })) - } -} - -impl From for bool { - fn from(val: Bool) -> Self { - val.0 - } -} From 740efb0ab53718c5098021514ab12fcbe8d28c3b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 17:21:49 +0200 Subject: [PATCH 06/11] cleanup --- control_plane/src/background_process.rs | 2 +- libs/utils/src/{env_config.rs => env.rs} | 20 ++------------------ libs/utils/src/lib.rs | 3 ++- pageserver/src/task_mgr.rs | 5 ++--- 4 files changed, 7 insertions(+), 23 deletions(-) rename libs/utils/src/{env_config.rs => env.rs} (51%) diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index fbd739ac9ef6..94666f28706c 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -273,7 +273,7 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command { fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command { for (var, val) in std::env::vars() { - if var.starts_with("NEON_") { + if var.starts_with("NEON_PAGESERVER_") { cmd = cmd.env(var, val); } } diff --git a/libs/utils/src/env_config.rs b/libs/utils/src/env.rs similarity index 51% rename from libs/utils/src/env_config.rs rename to libs/utils/src/env.rs index 4bafbe57a00a..b3e326bfd0f4 100644 --- a/libs/utils/src/env_config.rs +++ b/libs/utils/src/env.rs @@ -1,22 +1,6 @@ -use std::{fmt::Display, str::FromStr}; +//! Wrapper around `std::env::var` for parsing environment variables. -pub fn var_or_else(varname: &str, default: D) -> V -where - V: FromStr, - E: Display, - D: FnOnce() -> V, -{ - match std::env::var(varname) { - Ok(s) => s - .parse() - .map_err(|e| format!("failed to parse env var {varname}: {e:#}")) - .unwrap(), - Err(std::env::VarError::NotPresent) => default(), - Err(std::env::VarError::NotUnicode(_)) => { - panic!("env var {varname} is not unicode") - } - } -} +use std::{fmt::Display, str::FromStr}; pub fn var(varname: &str) -> Option where diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 673c48e45076..cd5075613e74 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -26,7 +26,6 @@ pub mod auth; // utility functions and helper traits for unified unique id generation/serialization etc. pub mod id; -pub mod env_config; mod hex; pub use hex::Hex; @@ -90,6 +89,8 @@ pub mod yielding_loop; pub mod zstd; +pub mod env; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index f584f02e3c23..d91fdf29a3c9 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -49,7 +49,7 @@ use tracing::{debug, error, info, warn}; use once_cell::sync::Lazy; -use utils::env_config; +use utils::env; use utils::id::TimelineId; // @@ -146,7 +146,7 @@ impl FromStr for TokioRuntimeMode { static ONE_RUNTIME: Lazy> = Lazy::new(|| { let thread_name = "pageserver worker"; - let Some(mode) = env_config::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else { + let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else { // If the env var is not set, leave this static as None. // The single_ return None; @@ -199,7 +199,6 @@ macro_rules! pageserver_runtime { pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker"); pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker"); pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker"); -// if you change the number of worker threads please change the constant below pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker"); #[derive(Debug, Clone, Copy)] From 6b820bb4237934d5ec0df22fe33bae6099b37dac Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 16:42:35 +0000 Subject: [PATCH 07/11] fixup env var value parsing --- pageserver/src/task_mgr.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index d91fdf29a3c9..2528eb9c2435 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -126,10 +126,10 @@ impl FromStr for TokioRuntimeMode { fn from_str(s: &str) -> Result { match s { "current_thread" => Ok(TokioRuntimeMode::SingleThreaded), - "multi_thread:default" => Ok(TokioRuntimeMode::MultiThreaded { - num_workers: *TOKIO_WORKER_THREADS, - }), - s => match s.strip_prefix("multi:") { + s => match s.strip_prefix("multi_thread:") { + Some("default") => Ok(TokioRuntimeMode::MultiThreaded { + num_workers: *TOKIO_WORKER_THREADS, + }), Some(suffix) => { let num_workers = suffix.parse::().map_err(|e| { format!( @@ -138,7 +138,7 @@ impl FromStr for TokioRuntimeMode { })?; Ok(TokioRuntimeMode::MultiThreaded { num_workers }) } - None => Err(format!("invalid runtime config: {}", s)), + None => Err(format!("invalid runtime config: {s:?}")), }, } } From 70fb7e358028723d85005797c34606c6ae43730a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 17:34:04 +0000 Subject: [PATCH 08/11] metric, useful for rollout / analyzing grafana metrics --- pageserver/src/metrics.rs | 21 +++++++++++++++++++++ pageserver/src/task_mgr.rs | 26 ++++++++++++++++++++------ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index ab9a2e85098f..3160f204e264 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2100,6 +2100,7 @@ pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) { use futures::Future; use pin_project_lite::pin_project; use std::collections::HashMap; +use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; @@ -2669,6 +2670,26 @@ pub(crate) mod disk_usage_based_eviction { pub(crate) static METRICS: Lazy = Lazy::new(Metrics::default); } +static TOKIO_EXECUTOR_THREAD_COUNT: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "pageserver_tokio_executor_thread_configured_count", + "Total number of configued tokio executor threads in the process. + The `setup` label denotes whether we're running with multiple runtimes or a single runtime.", + &["setup"], + ) + .unwrap() +}); + +pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) { + static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(()); + let _guard = SERIALIZE.lock().unwrap(); + TOKIO_EXECUTOR_THREAD_COUNT.reset(); + TOKIO_EXECUTOR_THREAD_COUNT + .get_metric_with_label_values(&[setup]) + .unwrap() + .set(u64::try_from(num_threads.get()).unwrap()); +} + pub fn preinitialize_metrics() { // Python tests need these and on some we do alerting. // diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 2528eb9c2435..6cabe9c19056 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -52,6 +52,8 @@ use once_cell::sync::Lazy; use utils::env; use utils::id::TimelineId; +use crate::metrics::set_tokio_runtime_setup; + // // There are four runtimes: // @@ -148,16 +150,25 @@ static ONE_RUNTIME: Lazy> = Lazy::new(|| { let thread_name = "pageserver worker"; let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else { // If the env var is not set, leave this static as None. - // The single_ + set_tokio_runtime_setup( + "multiple-runtimes", + NUM_MULTIPLE_RUNTIMES + .checked_mul(*TOKIO_WORKER_THREADS) + .unwrap(), + ); return None; }; Some(match mode { - TokioRuntimeMode::SingleThreaded => tokio::runtime::Builder::new_current_thread() - .thread_name(thread_name) - .enable_all() - .build() - .expect("failed to create single runtime"), + TokioRuntimeMode::SingleThreaded => { + set_tokio_runtime_setup("one-runtime-single-threaded", NonZeroUsize::new(1).unwrap()); + tokio::runtime::Builder::new_current_thread() + .thread_name(thread_name) + .enable_all() + .build() + .expect("failed to create single runtime") + } TokioRuntimeMode::MultiThreaded { num_workers } => { + set_tokio_runtime_setup("one-runtime-multi-threaded", num_workers); tokio::runtime::Builder::new_multi_thread() .thread_name(thread_name) .enable_all() @@ -200,6 +211,9 @@ pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker"); pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker"); pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker"); pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker"); +// Bump this number when adding a new pageserver_runtime! +// SAFETY: it's obviously correct +const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) }; #[derive(Debug, Clone, Copy)] pub struct PageserverTaskId(u64); From edd7f69c2d6bba287615d0a3025b43026865dfa9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 17:51:04 +0000 Subject: [PATCH 09/11] make current_thread mode work We need to have &'static Runtime, not &'static Handle, because &'static Handle doesn't drive IO/timers on current_thread RT. --- pageserver/src/bin/pageserver.rs | 12 ++++++++---- pageserver/src/consumption_metrics.rs | 2 +- pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/task_mgr.rs | 8 ++++---- pageserver/src/tenant/delete.rs | 2 +- pageserver/src/tenant/mgr.rs | 2 +- pageserver/src/tenant/remote_timeline_client.rs | 2 +- pageserver/src/tenant/secondary.rs | 4 ++-- pageserver/src/tenant/tasks.rs | 4 ++-- pageserver/src/tenant/timeline.rs | 8 ++++---- pageserver/src/tenant/timeline/delete.rs | 2 +- pageserver/src/tenant/timeline/eviction_task.rs | 2 +- 12 files changed, 27 insertions(+), 23 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 073655a5987a..26cab9338ba8 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -391,7 +391,7 @@ fn start_pageserver( conf, ); if let Some(deletion_workers) = deletion_workers { - deletion_workers.spawn_with(*BACKGROUND_RUNTIME); + deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); } // Up to this point no significant I/O has been done: this should have been fast. Record @@ -569,7 +569,7 @@ fn start_pageserver( .with_graceful_shutdown(task_mgr::shutdown_watcher()); task_mgr::spawn( - *MGMT_REQUEST_RUNTIME, + MGMT_REQUEST_RUNTIME.handle(), TaskKind::HttpEndpointListener, None, None, @@ -594,7 +594,7 @@ fn start_pageserver( let local_disk_storage = conf.workdir.join("last_consumption_metrics.json"); task_mgr::spawn( - *crate::BACKGROUND_RUNTIME, + crate::BACKGROUND_RUNTIME.handle(), TaskKind::MetricsCollection, None, None, @@ -647,7 +647,7 @@ fn start_pageserver( DownloadBehavior::Error, ); task_mgr::spawn( - *COMPUTE_REQUEST_RUNTIME, + COMPUTE_REQUEST_RUNTIME.handle(), TaskKind::LibpqEndpointListener, None, None, @@ -682,6 +682,10 @@ fn start_pageserver( .expect("forever() never returns None unless explicitly closed"); }); let signal = BACKGROUND_RUNTIME + // NB: in `NEON_PAGESERVER_USE_ONE_RUNTIME=current_thread`, this + // is where the executor is actually driven. In multi-threaded runtime + // modes, the executor threads are spawned internally, so, async execution + // is driven even before we reach here. .block_on(signal_handler) .expect("join error"); match signal { diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index a09da56ee4e4..f5540e896f68 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -64,7 +64,7 @@ pub async fn collect_metrics( let worker_ctx = ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download); task_mgr::spawn( - *BACKGROUND_RUNTIME, + BACKGROUND_RUNTIME.handle(), TaskKind::CalculateSyntheticSize, None, None, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index e44ae21a36fc..6248424cee49 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -201,7 +201,7 @@ pub fn launch_disk_usage_global_eviction_task( info!("launching disk usage based eviction task"); task_mgr::spawn( - *BACKGROUND_RUNTIME, + BACKGROUND_RUNTIME.handle(), TaskKind::DiskUsageEviction, None, None, diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 6cabe9c19056..1d1dc22eadeb 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -147,7 +147,7 @@ impl FromStr for TokioRuntimeMode { } static ONE_RUNTIME: Lazy> = Lazy::new(|| { - let thread_name = "pageserver worker"; + let thread_name = "tokio-executor"; let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else { // If the env var is not set, leave this static as None. set_tokio_runtime_setup( @@ -190,9 +190,9 @@ static ONE_RUNTIME: Lazy> = Lazy::new(|| { /// otherwise. macro_rules! pageserver_runtime { ($varname:ident, $name:literal) => { - pub static $varname: Lazy<&'static tokio::runtime::Handle> = Lazy::new(|| { + pub static $varname: Lazy<&'static tokio::runtime::Runtime> = Lazy::new(|| { if let Some(runtime) = &*ONE_RUNTIME { - return runtime.handle(); + return runtime; } static RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() @@ -202,7 +202,7 @@ macro_rules! pageserver_runtime { .build() .expect(std::concat!("Failed to create runtime ", $name)) }); - RUNTIME.handle() + &*RUNTIME }); }; } diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 55e670483579..d1881f3897d6 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -485,7 +485,7 @@ impl DeleteTenantFlow { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( - *task_mgr::BACKGROUND_RUNTIME, + task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 76973efaa820..b1b46d487bbc 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1849,7 +1849,7 @@ impl TenantManager { let task_tenant_id = None; task_mgr::spawn( - *task_mgr::BACKGROUND_RUNTIME, + task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::MgmtRequest, task_tenant_id, None, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 754395ed0c2e..9b1b5e7ed5d3 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -341,7 +341,7 @@ impl RemoteTimelineClient { // remote_timeline_client.rs tests rely on current-thread runtime tokio::runtime::Handle::current() } else { - BACKGROUND_RUNTIME.clone() + BACKGROUND_RUNTIME.handle().clone() }, tenant_shard_id, timeline_id, diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index f74ed8dbe5da..19f36c722e7b 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -317,7 +317,7 @@ pub fn spawn_tasks( tokio::sync::mpsc::channel::>(16); task_mgr::spawn( - *BACKGROUND_RUNTIME, + BACKGROUND_RUNTIME.handle(), TaskKind::SecondaryDownloads, None, None, @@ -338,7 +338,7 @@ pub fn spawn_tasks( ); task_mgr::spawn( - *BACKGROUND_RUNTIME, + BACKGROUND_RUNTIME.handle(), TaskKind::SecondaryUploads, None, None, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index fd16a12a54d7..74ed677ffe3e 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -86,7 +86,7 @@ pub fn start_background_loops( ) { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( - *BACKGROUND_RUNTIME, + BACKGROUND_RUNTIME.handle(), TaskKind::Compaction, Some(tenant_shard_id), None, @@ -110,7 +110,7 @@ pub fn start_background_loops( }, ); task_mgr::spawn( - *BACKGROUND_RUNTIME, + BACKGROUND_RUNTIME.handle(), TaskKind::GarbageCollector, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9c2cf666ace1..d3c8c5f66c51 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1962,7 +1962,7 @@ impl Timeline { initdb_optimization_count: 0, }; task_mgr::spawn( - *task_mgr::BACKGROUND_RUNTIME, + task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::LayerFlushTask, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2324,7 +2324,7 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( - *task_mgr::BACKGROUND_RUNTIME, + task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::InitialLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2502,7 +2502,7 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( - *task_mgr::BACKGROUND_RUNTIME, + task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::OndemandLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -4484,7 +4484,7 @@ impl Timeline { let self_clone = Arc::clone(&self); let task_id = task_mgr::spawn( - *task_mgr::BACKGROUND_RUNTIME, + task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::DownloadAllRemoteLayers, Some(self.tenant_shard_id), Some(self.timeline_id), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index f4fcbbdedab3..af10c1c84b76 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -383,7 +383,7 @@ impl DeleteTimelineFlow { let timeline_id = timeline.timeline_id; task_mgr::spawn( - *task_mgr::BACKGROUND_RUNTIME, + task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), Some(timeline_id), diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 78b9dfff4774..522c5b57de93 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -57,7 +57,7 @@ impl Timeline { let self_clone = Arc::clone(self); let background_tasks_can_start = background_tasks_can_start.cloned(); task_mgr::spawn( - *BACKGROUND_RUNTIME, + BACKGROUND_RUNTIME.handle(), TaskKind::Eviction, Some(self.tenant_shard_id), Some(self.timeline_id), From 871a3caca9138e3dfdb3d16cf2e571c1bed0522c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 17:57:41 +0000 Subject: [PATCH 10/11] change thread name --- pageserver/src/task_mgr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 1d1dc22eadeb..db4554426b1e 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -147,7 +147,7 @@ impl FromStr for TokioRuntimeMode { } static ONE_RUNTIME: Lazy> = Lazy::new(|| { - let thread_name = "tokio-executor"; + let thread_name = "pageserver-tokio"; let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else { // If the env var is not set, leave this static as None. set_tokio_runtime_setup( From dc8e318a42f657dec98c628bc7e04c2bac1cc114 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 17:58:21 +0000 Subject: [PATCH 11/11] fix copy-pasta --- pageserver/src/task_mgr.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index db4554426b1e..9a1e354ecf4e 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -165,7 +165,7 @@ static ONE_RUNTIME: Lazy> = Lazy::new(|| { .thread_name(thread_name) .enable_all() .build() - .expect("failed to create single runtime") + .expect("failed to create one single runtime") } TokioRuntimeMode::MultiThreaded { num_workers } => { set_tokio_runtime_setup("one-runtime-multi-threaded", num_workers); @@ -174,7 +174,7 @@ static ONE_RUNTIME: Lazy> = Lazy::new(|| { .enable_all() .worker_threads(num_workers.get()) .build() - .expect("failed to create single runtime") + .expect("failed to create one multi-threaded runtime") } }) });