Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: use a single tokio runtime #6555

Merged
merged 14 commits into from
Mar 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 37 additions & 45 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
@@ -15,9 +15,9 @@ use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::{secondary, TenantSharedResources};
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
use tracing::*;

@@ -28,7 +28,7 @@ use pageserver::{
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
task_mgr::THE_RUNTIME,
tenant::mgr,
virtual_file,
};
@@ -323,7 +323,7 @@ fn start_pageserver(

// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
let broker_client = THE_RUNTIME
.block_on(async {
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
@@ -391,7 +391,7 @@ fn start_pageserver(
conf,
);
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
deletion_workers.spawn_with(THE_RUNTIME.handle());
}

// Up to this point no significant I/O has been done: this should have been fast. Record
@@ -423,7 +423,7 @@ fn start_pageserver(

// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = THE_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
@@ -435,7 +435,7 @@ fn start_pageserver(
))?;
let tenant_manager = Arc::new(tenant_manager);

BACKGROUND_RUNTIME.spawn({
THE_RUNTIME.spawn({
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
@@ -545,7 +545,7 @@ fn start_pageserver(
// Start up the service to handle HTTP mgmt API request. We created the
// listener earlier already.
{
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
let _rt_guard = THE_RUNTIME.enter();

let router_state = Arc::new(
http::routes::State::new(
@@ -569,7 +569,6 @@ fn start_pageserver(
.with_graceful_shutdown(task_mgr::shutdown_watcher());

task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::HttpEndpointListener,
None,
None,
@@ -594,7 +593,6 @@ fn start_pageserver(
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");

task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
@@ -642,7 +640,6 @@ fn start_pageserver(
DownloadBehavior::Error,
);
task_mgr::spawn(
COMPUTE_REQUEST_RUNTIME.handle(),
TaskKind::LibpqEndpointListener,
None,
None,
@@ -666,42 +663,37 @@ fn start_pageserver(
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());

// All started up! Now just sit and wait for shutdown signal.

{
use signal_hook::consts::*;
let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || {
let mut signals =
signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
return signals
.forever()
.next()
.expect("forever() never returns None unless explicitly closed");
});
let signal = BACKGROUND_RUNTIME
.block_on(signal_handler)
.expect("join error");
match signal {
SIGQUIT => {
info!("Got signal {signal}. Terminating in immediate shutdown mode",);
std::process::exit(111);
}
SIGINT | SIGTERM => {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);

// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
));
unreachable!()
}
_ => unreachable!(),
}
THE_RUNTIME.block_on(async move {
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode",);
std::process::exit(111);
}
_ = sigint.recv() => { "SIGINT" },
_ = sigterm.recv() => { "SIGTERM" },
};

info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);

// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
)
.await;
unreachable!()
})
}
}

3 changes: 1 addition & 2 deletions pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
use camino::Utf8PathBuf;
@@ -59,7 +59,6 @@ pub async fn collect_metrics(
let worker_ctx =
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::CalculateSyntheticSize,
None,
None,
4 changes: 1 addition & 3 deletions pageserver/src/control_plane_client.rs
Original file line number Diff line number Diff line change
@@ -173,8 +173,6 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
register,
};

fail::fail_point!("control-plane-client-re-attach");

let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
tracing::info!(
"Received re-attach response with {} tenants",
@@ -210,7 +208,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
.collect(),
};

fail::fail_point!("control-plane-client-validate");
crate::tenant::pausable_failpoint!("control-plane-client-validate");

let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;

3 changes: 1 addition & 2 deletions pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ use utils::{completion, id::TimelineId};
use crate::{
config::PageServerConf,
metrics::disk_usage_based_eviction::METRICS,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
task_mgr::{self, TaskKind},
tenant::{
self,
mgr::TenantManager,
@@ -202,7 +202,6 @@ pub fn launch_disk_usage_global_eviction_task(
info!("launching disk usage based eviction task");

task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,
1 change: 0 additions & 1 deletion pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
@@ -180,7 +180,6 @@ pub async fn libpq_listener_main(
// only deal with a particular timeline, but we don't know which one
// yet.
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::PageRequestHandler,
None,
None,
37 changes: 8 additions & 29 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
@@ -98,42 +98,22 @@ use utils::id::TimelineId;
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
//
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker")
.enable_all()
.build()
.expect("Failed to create compute request runtime")
});

pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker")
.enable_all()
.build()
.expect("Failed to create mgmt request runtime")
});

pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker")
.enable_all()
.build()
.expect("Failed to create walreceiver runtime")
});

pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
/// The single tokio runtime used by all pageserver code.
/// In the past, we had multiple runtimes, and in the future we should weed out
/// remaining references to this global field and rely on ambient runtime instead,
/// i.e., use `tokio::spawn` instead of `THE_RUNTIME.spawn()`, etc.
pub static THE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker")
// if you change the number of worker threads please change the constant below
.enable_all()
.build()
.expect("Failed to create background op runtime")
});

pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
pub(crate) static THE_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
// force init and thus panics
let _ = BACKGROUND_RUNTIME.handle();
let _ = THE_RUNTIME.handle();
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
// tokio would had already panicked for parsing errors or NotUnicode
//
@@ -325,7 +305,6 @@ struct PageServerTask {
/// Note: if shutdown_process_on_error is set to true failure
/// of the task will lead to shutdown of entire process
pub fn spawn<F>(
runtime: &tokio::runtime::Handle,
kind: TaskKind,
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
@@ -354,7 +333,7 @@ where

let task_name = name.to_string();
let task_cloned = Arc::clone(&task);
let join_handle = runtime.spawn(task_wrapper(
let join_handle = THE_RUNTIME.spawn(task_wrapper(
task_name,
task_id,
task_cloned,
2 changes: 1 addition & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
@@ -144,6 +144,7 @@ macro_rules! pausable_failpoint {
}
};
}
pub(crate) use pausable_failpoint;

pub mod blob_io;
pub mod block_io;
@@ -661,7 +662,6 @@ impl Tenant {
let tenant_clone = Arc::clone(&tenant);
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Attach,
Some(tenant_shard_id),
None,
1 change: 0 additions & 1 deletion pageserver/src/tenant/delete.rs
Original file line number Diff line number Diff line change
@@ -482,7 +482,6 @@ impl DeleteTenantFlow {
let tenant_shard_id = tenant.tenant_shard_id;

task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(tenant_shard_id),
None,
4 changes: 0 additions & 4 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
@@ -1850,7 +1850,6 @@ impl TenantManager {
let task_tenant_id = None;

task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
task_tenant_id,
None,
@@ -2816,15 +2815,12 @@ pub(crate) fn immediate_gc(

// TODO: spawning is redundant now, need to hold the gate
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
Some(timeline_id),
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
false,
async move {
fail::fail_point!("immediate_gc_task_pre");

#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
11 changes: 0 additions & 11 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
@@ -223,7 +223,6 @@ use crate::{
config::PageServerConf,
task_mgr,
task_mgr::TaskKind,
task_mgr::BACKGROUND_RUNTIME,
tenant::metadata::TimelineMetadata,
tenant::upload_queue::{
UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
@@ -307,8 +306,6 @@ pub enum PersistIndexPartWithDeletedFlagError {
pub struct RemoteTimelineClient {
conf: &'static PageServerConf,

runtime: tokio::runtime::Handle,

tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
@@ -341,12 +338,6 @@ impl RemoteTimelineClient {
) -> RemoteTimelineClient {
RemoteTimelineClient {
conf,
runtime: if cfg!(test) {
// remote_timeline_client.rs tests rely on current-thread runtime
tokio::runtime::Handle::current()
} else {
BACKGROUND_RUNTIME.handle().clone()
},
tenant_shard_id,
timeline_id,
generation,
@@ -1281,7 +1272,6 @@ impl RemoteTimelineClient {
let tenant_shard_id = self.tenant_shard_id;
let timeline_id = self.timeline_id;
task_mgr::spawn(
&self.runtime,
TaskKind::RemoteUploadTask,
Some(self.tenant_shard_id),
Some(self.timeline_id),
@@ -1876,7 +1866,6 @@ mod tests {
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
Arc::new(RemoteTimelineClient {
conf: self.harness.conf,
runtime: tokio::runtime::Handle::current(),
tenant_shard_id: self.harness.tenant_shard_id,
timeline_id: TIMELINE_ID,
generation,
Loading