From 3426680c809d6021fda840abaae12ff99755db8a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 22 Mar 2024 19:13:54 +0100 Subject: [PATCH] apply in one go --- pageserver/src/bin/pageserver.rs | 24 +++++------- pageserver/src/consumption_metrics.rs | 3 +- pageserver/src/disk_usage_eviction_task.rs | 3 +- pageserver/src/page_service.rs | 1 - pageserver/src/task_mgr.rs | 37 ++++--------------- pageserver/src/tenant.rs | 1 - pageserver/src/tenant/delete.rs | 1 - pageserver/src/tenant/mgr.rs | 2 - .../src/tenant/remote_timeline_client.rs | 11 ------ pageserver/src/tenant/secondary.rs | 4 +- pageserver/src/tenant/storage_layer/layer.rs | 4 +- pageserver/src/tenant/tasks.rs | 6 +-- pageserver/src/tenant/timeline.rs | 4 -- pageserver/src/tenant/timeline/delete.rs | 1 - .../src/tenant/timeline/eviction_task.rs | 3 +- pageserver/src/tenant/timeline/walreceiver.rs | 5 +-- .../walreceiver/walreceiver_connection.rs | 5 +-- 17 files changed, 28 insertions(+), 87 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 1fd7c775d557e..21f86eba3dc9c 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -15,7 +15,6 @@ use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; use pageserver::control_plane_client::ControlPlaneClient; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; -use pageserver::task_mgr::WALRECEIVER_RUNTIME; use pageserver::tenant::{secondary, TenantSharedResources}; use remote_storage::GenericRemoteStorage; use tokio::time::Instant; @@ -28,7 +27,7 @@ use pageserver::{ deletion_queue::DeletionQueue, http, page_cache, page_service, task_mgr, task_mgr::TaskKind, - task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, + task_mgr::THE_RUNTIME, tenant::mgr, virtual_file, }; @@ -323,7 +322,7 @@ fn start_pageserver( // Launch broker client // The storage_broker::connect call needs to happen inside a tokio runtime thread. - let broker_client = WALRECEIVER_RUNTIME + let broker_client = THE_RUNTIME .block_on(async { // Note: we do not attempt connecting here (but validate endpoints sanity). storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval) @@ -391,7 +390,7 @@ fn start_pageserver( conf, ); if let Some(deletion_workers) = deletion_workers { - deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); + deletion_workers.spawn_with(THE_RUNTIME.handle()); } // Up to this point no significant I/O has been done: this should have been fast. Record @@ -423,7 +422,7 @@ fn start_pageserver( // Scan the local 'tenants/' directory and start loading the tenants let deletion_queue_client = deletion_queue.new_client(); - let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( + let tenant_manager = THE_RUNTIME.block_on(mgr::init_tenant_mgr( conf, TenantSharedResources { broker_client: broker_client.clone(), @@ -435,7 +434,7 @@ fn start_pageserver( ))?; let tenant_manager = Arc::new(tenant_manager); - BACKGROUND_RUNTIME.spawn({ + THE_RUNTIME.spawn({ let shutdown_pageserver = shutdown_pageserver.clone(); let drive_init = async move { // NOTE: unlike many futures in pageserver, this one is cancellation-safe @@ -545,7 +544,7 @@ fn start_pageserver( // Start up the service to handle HTTP mgmt API request. We created the // listener earlier already. { - let _rt_guard = MGMT_REQUEST_RUNTIME.enter(); + let _rt_guard = THE_RUNTIME.enter(); let router_state = Arc::new( http::routes::State::new( @@ -569,7 +568,6 @@ fn start_pageserver( .with_graceful_shutdown(task_mgr::shutdown_watcher()); task_mgr::spawn( - MGMT_REQUEST_RUNTIME.handle(), TaskKind::HttpEndpointListener, None, None, @@ -594,7 +592,6 @@ fn start_pageserver( let local_disk_storage = conf.workdir.join("last_consumption_metrics.json"); task_mgr::spawn( - crate::BACKGROUND_RUNTIME.handle(), TaskKind::MetricsCollection, None, None, @@ -642,7 +639,6 @@ fn start_pageserver( DownloadBehavior::Error, ); task_mgr::spawn( - COMPUTE_REQUEST_RUNTIME.handle(), TaskKind::LibpqEndpointListener, None, None, @@ -668,7 +664,7 @@ fn start_pageserver( // All started up! Now just sit and wait for shutdown signal. { use signal_hook::consts::*; - let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || { + let signal_handler = THE_RUNTIME.spawn_blocking(move || { let mut signals = signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap(); return signals @@ -676,9 +672,7 @@ fn start_pageserver( .next() .expect("forever() never returns None unless explicitly closed"); }); - let signal = BACKGROUND_RUNTIME - .block_on(signal_handler) - .expect("join error"); + let signal = THE_RUNTIME.block_on(signal_handler).expect("join error"); match signal { SIGQUIT => { info!("Got signal {signal}. Terminating in immediate shutdown mode",); @@ -693,7 +687,7 @@ fn start_pageserver( shutdown_pageserver.take(); let bg_remote_storage = remote_storage.clone(); let bg_deletion_queue = deletion_queue.clone(); - BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver( + THE_RUNTIME.block_on(pageserver::shutdown_pageserver( &tenant_manager, bg_remote_storage.map(|_| bg_deletion_queue), 0, diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index c7f9d596c69ce..4d0ccc51f7908 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -1,7 +1,7 @@ //! Periodically collect consumption metrics for all active tenants //! and push them to a HTTP endpoint. use crate::context::{DownloadBehavior, RequestContext}; -use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; +use crate::task_mgr::{self, TaskKind}; use crate::tenant::tasks::BackgroundLoopKind; use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant}; use camino::Utf8PathBuf; @@ -59,7 +59,6 @@ pub async fn collect_metrics( let worker_ctx = ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), TaskKind::CalculateSyntheticSize, None, None, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 92c1475aeff55..6b68acd1c7d4e 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -59,7 +59,7 @@ use utils::{completion, id::TimelineId}; use crate::{ config::PageServerConf, metrics::disk_usage_based_eviction::METRICS, - task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, + task_mgr::{self, TaskKind}, tenant::{ self, mgr::TenantManager, @@ -202,7 +202,6 @@ pub fn launch_disk_usage_global_eviction_task( info!("launching disk usage based eviction task"); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), TaskKind::DiskUsageEviction, None, None, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f3ceb7d3e6e5b..fa1a0f535b501 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -180,7 +180,6 @@ pub async fn libpq_listener_main( // only deal with a particular timeline, but we don't know which one // yet. task_mgr::spawn( - &tokio::runtime::Handle::current(), TaskKind::PageRequestHandler, None, None, diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 69e163effaa3f..881a13d1ced47 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -98,42 +98,22 @@ use utils::id::TimelineId; // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't // happen, but still. // -pub static COMPUTE_REQUEST_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("compute request worker") - .enable_all() - .build() - .expect("Failed to create compute request runtime") -}); - -pub static MGMT_REQUEST_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("mgmt request worker") - .enable_all() - .build() - .expect("Failed to create mgmt request runtime") -}); - -pub static WALRECEIVER_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("walreceiver worker") - .enable_all() - .build() - .expect("Failed to create walreceiver runtime") -}); -pub static BACKGROUND_RUNTIME: Lazy = Lazy::new(|| { +/// The single tokio runtime used by all pageserver code. +/// In the past, we had multiple runtimes, and in the future we should weed out +/// remaining references to this global field and rely on ambient runtime instead, +/// i.e., use `tokio::spawn` instead of `THE_RUNTIME.spawn()`, etc. +pub static THE_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() - .thread_name("background op worker") // if you change the number of worker threads please change the constant below .enable_all() .build() .expect("Failed to create background op runtime") }); -pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy = Lazy::new(|| { +pub(crate) static THE_RUNTIME_WORKER_THREADS: Lazy = Lazy::new(|| { // force init and thus panics - let _ = BACKGROUND_RUNTIME.handle(); + let _ = THE_RUNTIME.handle(); // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly // tokio would had already panicked for parsing errors or NotUnicode // @@ -325,7 +305,6 @@ struct PageServerTask { /// Note: if shutdown_process_on_error is set to true failure /// of the task will lead to shutdown of entire process pub fn spawn( - runtime: &tokio::runtime::Handle, kind: TaskKind, tenant_shard_id: Option, timeline_id: Option, @@ -354,7 +333,7 @@ where let task_name = name.to_string(); let task_cloned = Arc::clone(&task); - let join_handle = runtime.spawn(task_wrapper( + let join_handle = tokio::spawn(task_wrapper( task_name, task_id, task_cloned, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index fe48741a89d14..1b9c79f59fe7e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -661,7 +661,6 @@ impl Tenant { let tenant_clone = Arc::clone(&tenant); let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn); task_mgr::spawn( - &tokio::runtime::Handle::current(), TaskKind::Attach, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 7d37873a67e67..3866136dbdfb4 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -482,7 +482,6 @@ impl DeleteTenantFlow { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 97a505ded9066..67335231307c2 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1850,7 +1850,6 @@ impl TenantManager { let task_tenant_id = None; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::MgmtRequest, task_tenant_id, None, @@ -2816,7 +2815,6 @@ pub(crate) fn immediate_gc( // TODO: spawning is redundant now, need to hold the gate task_mgr::spawn( - &tokio::runtime::Handle::current(), TaskKind::GarbageCollector, Some(tenant_shard_id), Some(timeline_id), diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 40be2ca8f3185..c0a150eb0d901 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -223,7 +223,6 @@ use crate::{ config::PageServerConf, task_mgr, task_mgr::TaskKind, - task_mgr::BACKGROUND_RUNTIME, tenant::metadata::TimelineMetadata, tenant::upload_queue::{ UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask, @@ -307,8 +306,6 @@ pub enum PersistIndexPartWithDeletedFlagError { pub struct RemoteTimelineClient { conf: &'static PageServerConf, - runtime: tokio::runtime::Handle, - tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, @@ -341,12 +338,6 @@ impl RemoteTimelineClient { ) -> RemoteTimelineClient { RemoteTimelineClient { conf, - runtime: if cfg!(test) { - // remote_timeline_client.rs tests rely on current-thread runtime - tokio::runtime::Handle::current() - } else { - BACKGROUND_RUNTIME.handle().clone() - }, tenant_shard_id, timeline_id, generation, @@ -1281,7 +1272,6 @@ impl RemoteTimelineClient { let tenant_shard_id = self.tenant_shard_id; let timeline_id = self.timeline_id; task_mgr::spawn( - &self.runtime, TaskKind::RemoteUploadTask, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -1876,7 +1866,6 @@ mod tests { fn build_client(&self, generation: Generation) -> Arc { Arc::new(RemoteTimelineClient { conf: self.harness.conf, - runtime: tokio::runtime::Handle::current(), tenant_shard_id: self.harness.tenant_shard_id, timeline_id: TIMELINE_ID, generation, diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 19f36c722e7b6..b0babb1308345 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -8,7 +8,7 @@ use std::{sync::Arc, time::SystemTime}; use crate::{ config::PageServerConf, disk_usage_eviction_task::DiskUsageEvictionInfo, - task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, + task_mgr::{self, TaskKind}, virtual_file::MaybeFatalIo, }; @@ -317,7 +317,6 @@ pub fn spawn_tasks( tokio::sync::mpsc::channel::>(16); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), TaskKind::SecondaryDownloads, None, None, @@ -338,7 +337,6 @@ pub fn spawn_tasks( ); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), TaskKind::SecondaryUploads, None, None, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 8ba37b5a8640f..e101a40da49b9 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1447,7 +1447,7 @@ impl LayerInner { #[cfg(test)] tokio::task::spawn(fut); #[cfg(not(test))] - crate::task_mgr::BACKGROUND_RUNTIME.spawn(fut); + crate::task_mgr::THE_RUNTIME.spawn(fut); } /// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME. @@ -1458,7 +1458,7 @@ impl LayerInner { #[cfg(test)] tokio::task::spawn_blocking(f); #[cfg(not(test))] - crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(f); + crate::task_mgr::THE_RUNTIME.spawn_blocking(f); } } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index e4f5f7513288f..db32223a601e2 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -8,7 +8,7 @@ use std::time::{Duration, Instant}; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; -use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; +use crate::task_mgr::TaskKind; use crate::tenant::throttle::Stats; use crate::tenant::timeline::CompactionError; use crate::tenant::{Tenant, TenantState}; @@ -18,7 +18,7 @@ use utils::{backoff, completion}; static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { - let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS; + let total_threads = *crate::task_mgr::THE_RUNTIME_WORKER_THREADS; let permits = usize::max( 1, // while a lot of the work is done on spawn_blocking, we still do @@ -85,7 +85,6 @@ pub fn start_background_loops( ) { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), TaskKind::Compaction, Some(tenant_shard_id), None, @@ -109,7 +108,6 @@ pub fn start_background_loops( }, ); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), TaskKind::GarbageCollector, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7523130f2343f..8002fff2bb9a7 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1723,7 +1723,6 @@ impl Timeline { initdb_optimization_count: 0, }; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::LayerFlushTask, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2086,7 +2085,6 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::InitialLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2264,7 +2262,6 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::OndemandLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -4151,7 +4148,6 @@ impl Timeline { let self_clone = Arc::clone(&self); let task_id = task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::DownloadAllRemoteLayers, Some(self.tenant_shard_id), Some(self.timeline_id), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index a0c9d99196bb5..d2272fc75fb4f 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -443,7 +443,6 @@ impl DeleteTimelineFlow { let timeline_id = timeline.timeline_id; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), Some(timeline_id), diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index dd769d41216cf..f84a4b0dac02b 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -28,7 +28,7 @@ use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; use crate::{ context::{DownloadBehavior, RequestContext}, pgdatadir_mapping::CollectKeySpaceError, - task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, + task_mgr::{self, TaskKind}, tenant::{ tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, }, @@ -56,7 +56,6 @@ impl Timeline { let self_clone = Arc::clone(self); let background_tasks_can_start = background_tasks_can_start.cloned(); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), TaskKind::Eviction, Some(self.tenant_shard_id), Some(self.timeline_id), diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 2fab6722b8f59..3592dda8d72cd 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -24,7 +24,7 @@ mod connection_manager; mod walreceiver_connection; use crate::context::{DownloadBehavior, RequestContext}; -use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME}; +use crate::task_mgr::{self, TaskKind}; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::timeline::walreceiver::connection_manager::{ connection_manager_loop_step, ConnectionManagerState, @@ -82,7 +82,6 @@ impl WalReceiver { let loop_status = Arc::new(std::sync::RwLock::new(None)); let manager_status = Arc::clone(&loop_status); task_mgr::spawn( - WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverManager, Some(timeline.tenant_shard_id), Some(timeline_id), @@ -181,7 +180,7 @@ impl TaskHandle { let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started); let cancellation_clone = cancellation.clone(); - let join_handle = WALRECEIVER_RUNTIME.spawn(async move { + let join_handle = tokio::spawn(async move { events_sender.send(TaskStateUpdate::Started).ok(); task(events_sender, cancellation_clone).await // events_sender is dropped at some point during the .await above. diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index d9f780cfd1802..784c4287de6fe 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -27,9 +27,7 @@ use super::TaskStateUpdate; use crate::{ context::RequestContext, metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST}, - task_mgr, - task_mgr::TaskKind, - task_mgr::WALRECEIVER_RUNTIME, + task_mgr::{self, TaskKind}, tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo}, walingest::WalIngest, walrecord::DecodedWALRecord, @@ -163,7 +161,6 @@ pub(super) async fn handle_walreceiver_connection( ); let connection_cancellation = cancellation.clone(); task_mgr::spawn( - WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverConnectionPoller, Some(timeline.tenant_shard_id), Some(timeline.timeline_id),