Skip to content

Commit

Permalink
apply in one go
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Mar 22, 2024
1 parent 62b318c commit 3426680
Show file tree
Hide file tree
Showing 17 changed files with 28 additions and 87 deletions.
24 changes: 9 additions & 15 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::{secondary, TenantSharedResources};
use remote_storage::GenericRemoteStorage;
use tokio::time::Instant;
Expand All @@ -28,7 +27,7 @@ use pageserver::{
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
task_mgr::THE_RUNTIME,
tenant::mgr,
virtual_file,
};
Expand Down Expand Up @@ -323,7 +322,7 @@ fn start_pageserver(

// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
let broker_client = THE_RUNTIME
.block_on(async {
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
Expand Down Expand Up @@ -391,7 +390,7 @@ fn start_pageserver(
conf,
);
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
deletion_workers.spawn_with(THE_RUNTIME.handle());
}

// Up to this point no significant I/O has been done: this should have been fast. Record
Expand Down Expand Up @@ -423,7 +422,7 @@ fn start_pageserver(

// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = THE_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
Expand All @@ -435,7 +434,7 @@ fn start_pageserver(
))?;
let tenant_manager = Arc::new(tenant_manager);

BACKGROUND_RUNTIME.spawn({
THE_RUNTIME.spawn({
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
Expand Down Expand Up @@ -545,7 +544,7 @@ fn start_pageserver(
// Start up the service to handle HTTP mgmt API request. We created the
// listener earlier already.
{
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
let _rt_guard = THE_RUNTIME.enter();

let router_state = Arc::new(
http::routes::State::new(
Expand All @@ -569,7 +568,6 @@ fn start_pageserver(
.with_graceful_shutdown(task_mgr::shutdown_watcher());

task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::HttpEndpointListener,
None,
None,
Expand All @@ -594,7 +592,6 @@ fn start_pageserver(
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");

task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
Expand Down Expand Up @@ -642,7 +639,6 @@ fn start_pageserver(
DownloadBehavior::Error,
);
task_mgr::spawn(
COMPUTE_REQUEST_RUNTIME.handle(),
TaskKind::LibpqEndpointListener,
None,
None,
Expand All @@ -668,17 +664,15 @@ fn start_pageserver(
// All started up! Now just sit and wait for shutdown signal.
{
use signal_hook::consts::*;
let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || {
let signal_handler = THE_RUNTIME.spawn_blocking(move || {
let mut signals =
signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
return signals
.forever()
.next()
.expect("forever() never returns None unless explicitly closed");
});
let signal = BACKGROUND_RUNTIME
.block_on(signal_handler)
.expect("join error");
let signal = THE_RUNTIME.block_on(signal_handler).expect("join error");
match signal {
SIGQUIT => {
info!("Got signal {signal}. Terminating in immediate shutdown mode",);
Expand All @@ -693,7 +687,7 @@ fn start_pageserver(
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
THE_RUNTIME.block_on(pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
Expand Down
3 changes: 1 addition & 2 deletions pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
use camino::Utf8PathBuf;
Expand Down Expand Up @@ -59,7 +59,6 @@ pub async fn collect_metrics(
let worker_ctx =
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::CalculateSyntheticSize,
None,
None,
Expand Down
3 changes: 1 addition & 2 deletions pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use utils::{completion, id::TimelineId};
use crate::{
config::PageServerConf,
metrics::disk_usage_based_eviction::METRICS,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
task_mgr::{self, TaskKind},
tenant::{
self,
mgr::TenantManager,
Expand Down Expand Up @@ -202,7 +202,6 @@ pub fn launch_disk_usage_global_eviction_task(
info!("launching disk usage based eviction task");

task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ pub async fn libpq_listener_main(
// only deal with a particular timeline, but we don't know which one
// yet.
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::PageRequestHandler,
None,
None,
Expand Down
37 changes: 8 additions & 29 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,42 +98,22 @@ use utils::id::TimelineId;
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
//
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker")
.enable_all()
.build()
.expect("Failed to create compute request runtime")
});

pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker")
.enable_all()
.build()
.expect("Failed to create mgmt request runtime")
});

pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker")
.enable_all()
.build()
.expect("Failed to create walreceiver runtime")
});

pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
/// The single tokio runtime used by all pageserver code.
/// In the past, we had multiple runtimes, and in the future we should weed out
/// remaining references to this global field and rely on ambient runtime instead,
/// i.e., use `tokio::spawn` instead of `THE_RUNTIME.spawn()`, etc.
pub static THE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background op worker")
// if you change the number of worker threads please change the constant below
.enable_all()
.build()
.expect("Failed to create background op runtime")
});

pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
pub(crate) static THE_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
// force init and thus panics
let _ = BACKGROUND_RUNTIME.handle();
let _ = THE_RUNTIME.handle();
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
// tokio would had already panicked for parsing errors or NotUnicode
//
Expand Down Expand Up @@ -325,7 +305,6 @@ struct PageServerTask {
/// Note: if shutdown_process_on_error is set to true failure
/// of the task will lead to shutdown of entire process
pub fn spawn<F>(
runtime: &tokio::runtime::Handle,
kind: TaskKind,
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
Expand Down Expand Up @@ -354,7 +333,7 @@ where

let task_name = name.to_string();
let task_cloned = Arc::clone(&task);
let join_handle = runtime.spawn(task_wrapper(
let join_handle = tokio::spawn(task_wrapper(
task_name,
task_id,
task_cloned,
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,6 @@ impl Tenant {
let tenant_clone = Arc::clone(&tenant);
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Attach,
Some(tenant_shard_id),
None,
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ impl DeleteTenantFlow {
let tenant_shard_id = tenant.tenant_shard_id;

task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(tenant_shard_id),
None,
Expand Down
2 changes: 0 additions & 2 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1850,7 +1850,6 @@ impl TenantManager {
let task_tenant_id = None;

task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
task_tenant_id,
None,
Expand Down Expand Up @@ -2816,7 +2815,6 @@ pub(crate) fn immediate_gc(

// TODO: spawning is redundant now, need to hold the gate
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
Some(timeline_id),
Expand Down
11 changes: 0 additions & 11 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ use crate::{
config::PageServerConf,
task_mgr,
task_mgr::TaskKind,
task_mgr::BACKGROUND_RUNTIME,
tenant::metadata::TimelineMetadata,
tenant::upload_queue::{
UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
Expand Down Expand Up @@ -307,8 +306,6 @@ pub enum PersistIndexPartWithDeletedFlagError {
pub struct RemoteTimelineClient {
conf: &'static PageServerConf,

runtime: tokio::runtime::Handle,

tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
Expand Down Expand Up @@ -341,12 +338,6 @@ impl RemoteTimelineClient {
) -> RemoteTimelineClient {
RemoteTimelineClient {
conf,
runtime: if cfg!(test) {
// remote_timeline_client.rs tests rely on current-thread runtime
tokio::runtime::Handle::current()
} else {
BACKGROUND_RUNTIME.handle().clone()
},
tenant_shard_id,
timeline_id,
generation,
Expand Down Expand Up @@ -1281,7 +1272,6 @@ impl RemoteTimelineClient {
let tenant_shard_id = self.tenant_shard_id;
let timeline_id = self.timeline_id;
task_mgr::spawn(
&self.runtime,
TaskKind::RemoteUploadTask,
Some(self.tenant_shard_id),
Some(self.timeline_id),
Expand Down Expand Up @@ -1876,7 +1866,6 @@ mod tests {
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
Arc::new(RemoteTimelineClient {
conf: self.harness.conf,
runtime: tokio::runtime::Handle::current(),
tenant_shard_id: self.harness.tenant_shard_id,
timeline_id: TIMELINE_ID,
generation,
Expand Down
4 changes: 1 addition & 3 deletions pageserver/src/tenant/secondary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{sync::Arc, time::SystemTime};
use crate::{
config::PageServerConf,
disk_usage_eviction_task::DiskUsageEvictionInfo,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
task_mgr::{self, TaskKind},
virtual_file::MaybeFatalIo,
};

Expand Down Expand Up @@ -317,7 +317,6 @@ pub fn spawn_tasks(
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);

task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
Expand All @@ -338,7 +337,6 @@ pub fn spawn_tasks(
);

task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryUploads,
None,
None,
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,7 @@ impl LayerInner {
#[cfg(test)]
tokio::task::spawn(fut);
#[cfg(not(test))]
crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut);
crate::task_mgr::THE_RUNTIME.spawn(fut);
}

/// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
Expand All @@ -1458,7 +1458,7 @@ impl LayerInner {
#[cfg(test)]
tokio::task::spawn_blocking(f);
#[cfg(not(test))]
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f);
crate::task_mgr::THE_RUNTIME.spawn_blocking(f);
}
}

Expand Down
6 changes: 2 additions & 4 deletions pageserver/src/tenant/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::{Duration, Instant};
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::task_mgr::TaskKind;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
Expand All @@ -18,7 +18,7 @@ use utils::{backoff, completion};

static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
let total_threads = *crate::task_mgr::THE_RUNTIME_WORKER_THREADS;
let permits = usize::max(
1,
// while a lot of the work is done on spawn_blocking, we still do
Expand Down Expand Up @@ -85,7 +85,6 @@ pub fn start_background_loops(
) {
let tenant_shard_id = tenant.tenant_shard_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
Some(tenant_shard_id),
None,
Expand All @@ -109,7 +108,6 @@ pub fn start_background_loops(
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
None,
Expand Down
Loading

0 comments on commit 3426680

Please sign in to comment.