Skip to content

Commit

Permalink
pageserver: use a single tokio runtime (#6555)
Browse files Browse the repository at this point in the history
Before this PR, each core had 3 executor threads from 3 different
runtimes. With this PR, we just have one runtime, with one thread per
core. Switching to a single tokio runtime should reduce that effective
over-commit of CPU and in theory help with tail latencies -- iff all
tokio tasks are well-behaved and yield to the runtime regularly.

Are All Tasks Well-Behaved? Are We Ready?
-----------------------------------------

Sadly there doesn't seem to be good out-of-the box tokio tooling to
answer this question.

We *believe* all tasks are well behaved in today's code base, as of the
switch to `virtual_file_io_engine = "tokio-epoll-uring"` in production
(neondatabase/infra#1121).

The only remaining executor-thread-blocking code is walredo and some
filesystem namespace operations.

Filesystem namespace operations work is being tracked in #6663 and not
considered likely to actually block at this time.

Regarding walredo, it currently does a blocking `poll` for read/write to
the pipe file descriptors we use for IPC with the walredo process.
There is an ongoing experiment to make walredo async (#6628), but it
needs more time because there are surprisingly tricky trade-offs that
are articulated in that PR's description (which itself is still WIP).
What's relevant for *this* PR is that
1. walredo is always CPU-bound
2. production tail latencies for walredo request-response
(`pageserver_wal_redo_seconds_bucket`) are
  - p90: with few exceptions, low hundreds of micro-seconds
  - p95: except on very packed pageservers, below 1ms
  - p99: all below 50ms, vast majority below 1ms
  - p99.9: almost all around 50ms, rarely at >= 70ms
- [Dashboard
Link](https://neonprod.grafana.net/d/edgggcrmki3uof/2024-03-walredo-latency?orgId=1&var-ds=ZNX49CDVz&var-pXX_by_instance=0.9&var-pXX_by_instance=0.99&var-pXX_by_instance=0.95&var-adhoc=instance%7C%21%3D%7Cpageserver-30.us-west-2.aws.neon.tech&var-per_instance_pXX_max_seconds=0.0005&from=1711049688777&to=1711136088777)

The ones below 1ms are below our current threshold for when we start
thinking about yielding to the executor.
The tens of milliseconds stalls aren't great, but, not least because of
the implicit overcommit of CPU by the three runtimes, we can't be sure
whether these tens of milliseconds are inherently necessary to do the
walredo work or whether we could be faster if there was less contention
for CPU.

On the first item (walredo being always CPU-bound work): it means that
walredo processes will always compete with the executor threads.
We could yield, using async walredo, but then we hit the trade-offs
explained in that PR.

tl;dr: the risk of stalling executor threads through blocking walredo
seems low, and switching to one runtime cleans up one potential source
for higher-than-necessary stall times (explained in the previous
paragraphs).


Code Changes
------------

- Remove the 3 different runtime definitions.
- Add a new definition called `THE_RUNTIME`.
- Use it in all places that previously used one of the 3 removed
runtimes.
- Remove the argument from `task_mgr`.
- Fix failpoint usage where `pausable_failpoint!` should have been used.
We encountered some actual failures because of this, e.g., hung
`get_metric()` calls during test teardown that would client-timeout
after 300s.

As indicated by the comment above `THE_RUNTIME`, we could take this
clean-up further.
But before we create so much churn, let's first validate that there's no
perf regression.


Performance
-----------

We will test this in staging using the various nightly benchmark runs.

However, the worst-case impact of this change is likely compaction
(=>image layer creation) competing with compute requests.
Image layer creation work can't be easily generated & repeated quickly
by pagebench.
So, we'll simply watch getpage & basebackup tail latencies in staging.

Additionally, I have done manual benchmarking using pagebench.
Report:
https://neondatabase.notion.site/2024-03-23-oneruntime-change-benchmarking-22a399c411e24399a73311115fb703ec?pvs=4
Tail latencies and throughput are marginally better (no regression =
good).
Except in a workload with 128 clients against one tenant.
There, the p99.9 and p99.99 getpage latency is about 2x worse (at
slightly lower throughput).
A dip in throughput every 20s (compaction_period_ is clearly visible,
and probably responsible for that worse tail latency.
This has potential to improve with async walredo, and is an edge case
workload anyway.


Future Work
-----------

1. Once this change has shown satisfying results in production, change
the codebase to use the ambient runtime instead of explicitly
referencing `THE_RUNTIME`.
2. Have a mode where we run with a single-threaded runtime, so we
uncover executor stalls more quickly.
3. Switch or write our own failpoints library that is async-native:
#7216
  • Loading branch information
problame authored Mar 23, 2024
1 parent 72103d4 commit 3220f83
Show file tree
Hide file tree
Showing 20 changed files with 92 additions and 131 deletions.
82 changes: 37 additions & 45 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::{secondary, TenantSharedResources};
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
use tracing::*;

Expand All @@ -28,7 +28,7 @@ use pageserver::{
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
task_mgr::THE_RUNTIME,
tenant::mgr,
virtual_file,
};
Expand Down Expand Up @@ -323,7 +323,7 @@ fn start_pageserver(

// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
let broker_client = THE_RUNTIME
.block_on(async {
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
Expand Down Expand Up @@ -391,7 +391,7 @@ fn start_pageserver(
conf,
);
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
deletion_workers.spawn_with(THE_RUNTIME.handle());
}

// Up to this point no significant I/O has been done: this should have been fast. Record
Expand Down Expand Up @@ -423,7 +423,7 @@ fn start_pageserver(

// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = THE_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
Expand All @@ -435,7 +435,7 @@ fn start_pageserver(
))?;
let tenant_manager = Arc::new(tenant_manager);

BACKGROUND_RUNTIME.spawn({
THE_RUNTIME.spawn({
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
Expand Down Expand Up @@ -545,7 +545,7 @@ fn start_pageserver(
// Start up the service to handle HTTP mgmt API request. We created the
// listener earlier already.
{
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
let _rt_guard = THE_RUNTIME.enter();

let router_state = Arc::new(
http::routes::State::new(
Expand All @@ -569,7 +569,6 @@ fn start_pageserver(
.with_graceful_shutdown(task_mgr::shutdown_watcher());

task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::HttpEndpointListener,
None,
None,
Expand All @@ -594,7 +593,6 @@ fn start_pageserver(
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");

task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
Expand Down Expand Up @@ -643,7 +641,6 @@ fn start_pageserver(
DownloadBehavior::Error,
);
task_mgr::spawn(
COMPUTE_REQUEST_RUNTIME.handle(),
TaskKind::LibpqEndpointListener,
None,
None,
Expand All @@ -667,42 +664,37 @@ fn start_pageserver(
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());

// All started up! Now just sit and wait for shutdown signal.

{
use signal_hook::consts::*;
let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || {
let mut signals =
signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
return signals
.forever()
.next()
.expect("forever() never returns None unless explicitly closed");
});
let signal = BACKGROUND_RUNTIME
.block_on(signal_handler)
.expect("join error");
match signal {
SIGQUIT => {
info!("Got signal {signal}. Terminating in immediate shutdown mode",);
std::process::exit(111);
}
SIGINT | SIGTERM => {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);

// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
));
unreachable!()
}
_ => unreachable!(),
}
THE_RUNTIME.block_on(async move {
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode",);
std::process::exit(111);
}
_ = sigint.recv() => { "SIGINT" },
_ = sigterm.recv() => { "SIGTERM" },
};

info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);

// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
)
.await;
unreachable!()
})
}
}

Expand Down
3 changes: 1 addition & 2 deletions pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
use camino::Utf8PathBuf;
Expand Down Expand Up @@ -61,7 +61,6 @@ pub async fn collect_metrics(
let worker_ctx =
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::CalculateSyntheticSize,
None,
None,
Expand Down
4 changes: 1 addition & 3 deletions pageserver/src/control_plane_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
register,
};

fail::fail_point!("control-plane-client-re-attach");

let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
tracing::info!(
"Received re-attach response with {} tenants",
Expand Down Expand Up @@ -210,7 +208,7 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
.collect(),
};

fail::fail_point!("control-plane-client-validate");
crate::tenant::pausable_failpoint!("control-plane-client-validate");

let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;

Expand Down
3 changes: 1 addition & 2 deletions pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use utils::{completion, id::TimelineId};
use crate::{
config::PageServerConf,
metrics::disk_usage_based_eviction::METRICS,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
task_mgr::{self, TaskKind},
tenant::{
self,
mgr::TenantManager,
Expand Down Expand Up @@ -202,7 +202,6 @@ pub fn launch_disk_usage_global_eviction_task(
info!("launching disk usage based eviction task");

task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ pub async fn libpq_listener_main(
// only deal with a particular timeline, but we don't know which one
// yet.
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::PageRequestHandler,
None,
None,
Expand Down
37 changes: 8 additions & 29 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,42 +98,22 @@ use utils::id::TimelineId;
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
//
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker")
.enable_all()
.build()
.expect("Failed to create compute request runtime")
});

pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker")
.enable_all()
.build()
.expect("Failed to create mgmt request runtime")
});

pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker")
.enable_all()
.build()
.expect("Failed to create walreceiver runtime")
});

pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
/// The single tokio runtime used by all pageserver code.
/// In the past, we had multiple runtimes, and in the future we should weed out
/// remaining references to this global field and rely on ambient runtime instead,
/// i.e., use `tokio::spawn` instead of `THE_RUNTIME.spawn()`, etc.
pub static THE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker")
// if you change the number of worker threads please change the constant below
.enable_all()
.build()
.expect("Failed to create background op runtime")
});

pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
pub(crate) static THE_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
// force init and thus panics
let _ = BACKGROUND_RUNTIME.handle();
let _ = THE_RUNTIME.handle();
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
// tokio would had already panicked for parsing errors or NotUnicode
//
Expand Down Expand Up @@ -325,7 +305,6 @@ struct PageServerTask {
/// Note: if shutdown_process_on_error is set to true failure
/// of the task will lead to shutdown of entire process
pub fn spawn<F>(
runtime: &tokio::runtime::Handle,
kind: TaskKind,
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
Expand Down Expand Up @@ -354,7 +333,7 @@ where

let task_name = name.to_string();
let task_cloned = Arc::clone(&task);
let join_handle = runtime.spawn(task_wrapper(
let join_handle = THE_RUNTIME.spawn(task_wrapper(
task_name,
task_id,
task_cloned,
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ macro_rules! pausable_failpoint {
}
};
}
pub(crate) use pausable_failpoint;

pub mod blob_io;
pub mod block_io;
Expand Down Expand Up @@ -661,7 +662,6 @@ impl Tenant {
let tenant_clone = Arc::clone(&tenant);
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Attach,
Some(tenant_shard_id),
None,
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ impl DeleteTenantFlow {
let tenant_shard_id = tenant.tenant_shard_id;

task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(tenant_shard_id),
None,
Expand Down
4 changes: 0 additions & 4 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1850,7 +1850,6 @@ impl TenantManager {
let task_tenant_id = None;

task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
task_tenant_id,
None,
Expand Down Expand Up @@ -2816,15 +2815,12 @@ pub(crate) fn immediate_gc(

// TODO: spawning is redundant now, need to hold the gate
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
Some(timeline_id),
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
false,
async move {
fail::fail_point!("immediate_gc_task_pre");

#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
Expand Down
11 changes: 0 additions & 11 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ use crate::{
config::PageServerConf,
task_mgr,
task_mgr::TaskKind,
task_mgr::BACKGROUND_RUNTIME,
tenant::metadata::TimelineMetadata,
tenant::upload_queue::{
UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
Expand Down Expand Up @@ -307,8 +306,6 @@ pub enum PersistIndexPartWithDeletedFlagError {
pub struct RemoteTimelineClient {
conf: &'static PageServerConf,

runtime: tokio::runtime::Handle,

tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
Expand Down Expand Up @@ -341,12 +338,6 @@ impl RemoteTimelineClient {
) -> RemoteTimelineClient {
RemoteTimelineClient {
conf,
runtime: if cfg!(test) {
// remote_timeline_client.rs tests rely on current-thread runtime
tokio::runtime::Handle::current()
} else {
BACKGROUND_RUNTIME.handle().clone()
},
tenant_shard_id,
timeline_id,
generation,
Expand Down Expand Up @@ -1281,7 +1272,6 @@ impl RemoteTimelineClient {
let tenant_shard_id = self.tenant_shard_id;
let timeline_id = self.timeline_id;
task_mgr::spawn(
&self.runtime,
TaskKind::RemoteUploadTask,
Some(self.tenant_shard_id),
Some(self.timeline_id),
Expand Down Expand Up @@ -1876,7 +1866,6 @@ mod tests {
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
Arc::new(RemoteTimelineClient {
conf: self.harness.conf,
runtime: tokio::runtime::Handle::current(),
tenant_shard_id: self.harness.tenant_shard_id,
timeline_id: TIMELINE_ID,
generation,
Expand Down
Loading

1 comment on commit 3220f83

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2798 tests run: 2647 passed, 1 failed, 150 skipped (full report)


Failures on Postgres 14

  • test_bulk_insert[neon-github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_bulk_insert[neon-release-pg14-github-actions-selfhosted]"

Code coverage* (full report)

  • functions: 28.1% (6280 of 22338 functions)
  • lines: 47.0% (44188 of 94105 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
3220f83 at 2024-03-23T19:37:52.105Z :recycle:

Please sign in to comment.