diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 542c1b7b3016..43b35c6d0822 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -402,15 +402,11 @@ fn start_pageserver( let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel(); let (init_done_tx, init_done_rx) = utils::completion::channel(); - let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel(); - let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel(); let order = pageserver::InitializationOrder { initial_tenant_load_remote: Some(init_done_tx), initial_tenant_load: Some(init_remote_done_tx), - initial_logical_size_can_start: init_done_rx.clone(), - initial_logical_size_attempt: Some(init_logical_size_done_tx), background_jobs_can_start: background_jobs_barrier.clone(), }; @@ -464,7 +460,7 @@ fn start_pageserver( }); let WaitForPhaseResult { - timeout_remaining: timeout, + timeout_remaining: _timeout, skipped: init_load_skipped, } = wait_for_phase("initial_tenant_load", init_load_done, timeout).await; @@ -472,26 +468,6 @@ fn start_pageserver( scopeguard::ScopeGuard::into_inner(guard); - let guard = scopeguard::guard_on_success((), |_| { - tracing::info!("Cancelled before initial logical sizes completed") - }); - - let logical_sizes_done = std::pin::pin!(async { - init_logical_size_done_rx.wait().await; - startup_checkpoint( - started_startup_at, - "initial_logical_sizes", - "Initial logical sizes completed", - ); - }); - - let WaitForPhaseResult { - timeout_remaining: _, - skipped: logical_sizes_skipped, - } = wait_for_phase("initial_logical_sizes", logical_sizes_done, timeout).await; - - scopeguard::ScopeGuard::into_inner(guard); - // allow background jobs to start: we either completed prior stages, or they reached timeout // and were skipped. It is important that we do not let them block background jobs indefinitely, // because things like consumption metrics for billing are blocked by this barrier. @@ -514,9 +490,6 @@ fn start_pageserver( if let Some(f) = init_load_skipped { f.await; } - if let Some(f) = logical_sizes_skipped { - f.await; - } scopeguard::ScopeGuard::into_inner(guard); startup_checkpoint(started_startup_at, "complete", "Startup complete"); diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index c6ff91e560e3..d70f1fec4d77 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -351,7 +351,12 @@ impl TimelineSnapshot { let current_exact_logical_size = { let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id); - let size = span.in_scope(|| t.get_current_logical_size(ctx)); + let size = span.in_scope(|| { + t.get_current_logical_size( + crate::tenant::timeline::GetLogicalSizePriority::Background, + ctx, + ) + }); match size { // Only send timeline logical size when it is fully calculated. CurrentLogicalSize::Exact(ref size) => Some(size.into()), diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 29a1ff52e818..71b7ea05ec52 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -338,7 +338,8 @@ async fn build_timeline_info_common( Lsn(0) => None, lsn @ Lsn(_) => Some(lsn), }; - let current_logical_size = timeline.get_current_logical_size(ctx); + let current_logical_size = + timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx); let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn_projected = timeline diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3f74694ef2de..0bdf096bfe54 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -186,13 +186,6 @@ pub struct InitializationOrder { /// Each initial tenant load task carries this until completion. pub initial_tenant_load: Option, - /// Barrier for when we can start initial logical size calculations. - pub initial_logical_size_can_start: utils::completion::Barrier, - - /// Each timeline owns a clone of this to be consumed on the initial logical size calculation - /// attempt. It is important to drop this once the attempt has completed. - pub initial_logical_size_attempt: Option, - /// Barrier for when we can start any background jobs. /// /// This can be broken up later on, but right now there is just one class of a background job. diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 0cfbfcdf2fb9..6e311041ba3f 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -407,16 +407,14 @@ pub(crate) mod initial_logical_size { use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; - use crate::task_mgr::TaskKind; - pub(crate) struct StartCalculation(IntCounterVec); pub(crate) static START_CALCULATION: Lazy = Lazy::new(|| { StartCalculation( register_int_counter_vec!( "pageserver_initial_logical_size_start_calculation", "Incremented each time we start an initial logical size calculation attempt. \ - The `task_kind` label is for the task kind that caused this attempt.", - &["attempt", "task_kind"] + The `circumstances` label provides some additional details.", + &["attempt", "circumstances"] ) .unwrap(), ) @@ -464,19 +462,24 @@ pub(crate) mod initial_logical_size { inc_drop_calculation: Option, } + #[derive(strum_macros::IntoStaticStr)] + pub(crate) enum StartCircumstances { + EmptyInitial, + SkippedConcurrencyLimiter, + AfterBackgroundTasksRateLimit, + } + impl StartCalculation { - pub(crate) fn first(&self, causing_task_kind: Option) -> OngoingCalculationGuard { - let task_kind_label: &'static str = - causing_task_kind.map(|k| k.into()).unwrap_or_default(); - self.0.with_label_values(&["first", task_kind_label]); + pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard { + let circumstances_label: &'static str = circumstances.into(); + self.0.with_label_values(&["first", circumstances_label]); OngoingCalculationGuard { inc_drop_calculation: Some(DROP_CALCULATION.first.clone()), } } - pub(crate) fn retry(&self, causing_task_kind: Option) -> OngoingCalculationGuard { - let task_kind_label: &'static str = - causing_task_kind.map(|k| k.into()).unwrap_or_default(); - self.0.with_label_values(&["retry", task_kind_label]); + pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard { + let circumstances_label: &'static str = circumstances.into(); + self.0.with_label_values(&["retry", circumstances_label]); OngoingCalculationGuard { inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()), } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 15d5609cebdd..a4481421585d 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -282,6 +282,10 @@ impl Timeline { } /// Get a list of all existing relations in given tablespace and database. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn list_rels( &self, spcnode: Oid, @@ -630,6 +634,10 @@ impl Timeline { /// /// Only relation blocks are counted currently. That excludes metadata, /// SLRUs, twophase files etc. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn get_current_logical_size_non_incremental( &self, lsn: Lsn, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f67a4174afa7..0b2e48e1ff15 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -472,7 +472,6 @@ impl Tenant { index_part: Option, metadata: TimelineMetadata, ancestor: Option>, - init_order: Option<&InitializationOrder>, _ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_shard_id; @@ -482,7 +481,6 @@ impl Tenant { &metadata, ancestor.clone(), resources, - init_order, CreateTimelineCause::Load, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); @@ -683,10 +681,6 @@ impl Tenant { // as we are no longer loading, signal completion by dropping // the completion while we resume deletion drop(_completion); - // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout - let _ = init_order - .as_mut() - .and_then(|x| x.initial_logical_size_attempt.take()); let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start); if let Some(background) = background_jobs_can_start { @@ -700,7 +694,6 @@ impl Tenant { &tenant_clone, preload, tenants, - init_order, &ctx, ) .await @@ -713,7 +706,7 @@ impl Tenant { } } - match tenant_clone.attach(init_order, preload, &ctx).await { + match tenant_clone.attach(preload, &ctx).await { Ok(()) => { info!("attach finished, activating"); tenant_clone.activate(broker_client, None, &ctx); @@ -776,7 +769,6 @@ impl Tenant { /// async fn attach( self: &Arc, - init_order: Option, preload: Option, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -789,7 +781,7 @@ impl Tenant { None => { // Deprecated dev mode: load from local disk state instead of remote storage // https://github.com/neondatabase/neon/issues/5624 - return self.load_local(init_order, ctx).await; + return self.load_local(ctx).await; } }; @@ -884,7 +876,6 @@ impl Tenant { &index_part.metadata, Some(remote_timeline_client), self.deletion_queue_client.clone(), - None, ) .await .context("resume_deletion") @@ -1009,10 +1000,6 @@ impl Tenant { None }; - // we can load remote timelines during init, but they are assumed to be so rare that - // initialization order is not passed to here. - let init_order = None; - // timeline loading after attach expects to find metadata file for each metadata save_metadata( self.conf, @@ -1030,7 +1017,6 @@ impl Tenant { Some(index_part), remote_metadata, ancestor, - init_order, ctx, ) .await @@ -1272,11 +1258,7 @@ impl Tenant { /// files on disk. Used at pageserver startup. /// /// No background tasks are started as part of this routine. - async fn load_local( - self: &Arc, - init_order: Option, - ctx: &RequestContext, - ) -> anyhow::Result<()> { + async fn load_local(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { span::debug_assert_current_span_has_tenant_id(); debug!("loading tenant task"); @@ -1302,7 +1284,7 @@ impl Tenant { // Process loadable timelines first for (timeline_id, local_metadata) in scan.sorted_timelines_to_load { if let Err(e) = self - .load_local_timeline(timeline_id, local_metadata, init_order.as_ref(), ctx, false) + .load_local_timeline(timeline_id, local_metadata, ctx, false) .await { match e { @@ -1336,13 +1318,7 @@ impl Tenant { } Some(local_metadata) => { if let Err(e) = self - .load_local_timeline( - timeline_id, - local_metadata, - init_order.as_ref(), - ctx, - true, - ) + .load_local_timeline(timeline_id, local_metadata, ctx, true) .await { match e { @@ -1370,12 +1346,11 @@ impl Tenant { /// Subroutine of `load_tenant`, to load an individual timeline /// /// NB: The parent is assumed to be already loaded! - #[instrument(skip(self, local_metadata, init_order, ctx))] + #[instrument(skip(self, local_metadata, ctx))] async fn load_local_timeline( self: &Arc, timeline_id: TimelineId, local_metadata: TimelineMetadata, - init_order: Option<&InitializationOrder>, ctx: &RequestContext, found_delete_mark: bool, ) -> Result<(), LoadLocalTimelineError> { @@ -1392,7 +1367,6 @@ impl Tenant { &local_metadata, None, self.deletion_queue_client.clone(), - init_order, ) .await .context("resume deletion") @@ -1409,17 +1383,9 @@ impl Tenant { None }; - self.timeline_init_and_sync( - timeline_id, - resources, - None, - local_metadata, - ancestor, - init_order, - ctx, - ) - .await - .map_err(LoadLocalTimelineError::Load) + self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx) + .await + .map_err(LoadLocalTimelineError::Load) } pub(crate) fn tenant_id(&self) -> TenantId { @@ -2314,7 +2280,6 @@ impl Tenant { new_metadata: &TimelineMetadata, ancestor: Option>, resources: TimelineResources, - init_order: Option<&InitializationOrder>, cause: CreateTimelineCause, ) -> anyhow::Result> { let state = match cause { @@ -2329,9 +2294,6 @@ impl Tenant { CreateTimelineCause::Delete => TimelineState::Stopping, }; - let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start); - let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt); - let pg_version = new_metadata.pg_version(); let timeline = Timeline::new( @@ -2345,8 +2307,6 @@ impl Tenant { Arc::clone(&self.walredo_mgr), resources, pg_version, - initial_logical_size_can_start.cloned(), - initial_logical_size_attempt.cloned().flatten(), state, self.cancel.child_token(), ); @@ -3168,7 +3128,6 @@ impl Tenant { new_metadata, ancestor, resources, - None, CreateTimelineCause::Load, ) .context("Failed to create timeline data structure")?; @@ -3843,7 +3802,7 @@ pub(crate) mod harness { match mode { LoadMode::Local => { tenant - .load_local(None, ctx) + .load_local(ctx) .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())) .await?; } @@ -3853,7 +3812,7 @@ pub(crate) mod harness { .instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())) .await?; tenant - .attach(None, Some(preload), ctx) + .attach(Some(preload), ctx) .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())) .await?; } diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index b7b2ef9c79cb..548b173c0d8c 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -15,7 +15,6 @@ use crate::{ context::RequestContext, task_mgr::{self, TaskKind}, tenant::mgr::{TenantSlot, TenantsMapRemoveResult}, - InitializationOrder, }; use super::{ @@ -390,7 +389,6 @@ impl DeleteTenantFlow { tenant: &Arc, preload: Option, tenants: &'static std::sync::RwLock, - init_order: Option, ctx: &RequestContext, ) -> Result<(), DeleteTenantError> { let (_, progress) = completion::channel(); @@ -400,10 +398,7 @@ impl DeleteTenantFlow { .await .expect("cant be stopping or broken"); - tenant - .attach(init_order, preload, ctx) - .await - .context("attach")?; + tenant.attach(preload, ctx).await.context("attach")?; Self::background( guard, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 3ed4e05beaba..e203d9d3340b 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -230,6 +230,10 @@ impl Layer { /// /// It is up to the caller to collect more data from the previous layer and /// perform WAL redo, if necessary. + /// + /// # Cancellation-Safety + /// + /// This method is cancellation-safe. pub(crate) async fn get_value_reconstruct_data( &self, key: Key, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 138578ec8ae1..bc404c41a044 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind { Eviction, ConsumptionMetricsCollectMetrics, ConsumptionMetricsSyntheticSizeWorker, + InitialLogicalSizeCalculation, } impl BackgroundLoopKind { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e252ee584e5c..f02fd733b42a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -20,23 +20,27 @@ use pageserver_api::{ }, shard::TenantShardId, }; +use rand::Rng; use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::{ runtime::Handle, - sync::{oneshot, watch, TryAcquireError}, + sync::{oneshot, watch}, }; use tokio_util::sync::CancellationToken; use tracing::*; use utils::{id::TenantTimelineId, sync::gate::Gate}; -use std::cmp::{max, min, Ordering}; use std::collections::{BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; +use std::{ + cmp::{max, min, Ordering}, + ops::ControlFlow, +}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, @@ -298,13 +302,6 @@ pub struct Timeline { eviction_task_timeline_state: tokio::sync::Mutex, - /// Barrier to wait before doing initial logical size calculation. Used only during startup. - initial_logical_size_can_start: Option, - - /// Completion shared between all timelines loaded during startup; used to delay heavier - /// background tasks until some logical sizes have been calculated. - initial_logical_size_attempt: Mutex>, - /// Load or creation time information about the disk_consistent_lsn and when the loading /// happened. Used for consumption metrics. pub(crate) loaded_at: (Lsn, SystemTime), @@ -453,6 +450,11 @@ pub enum LogicalSizeCalculationCause { TenantSizeHandler, } +pub enum GetLogicalSizePriority { + User, + Background, +} + #[derive(enumset::EnumSetType)] pub(crate) enum CompactFlags { ForceRepartition, @@ -489,6 +491,9 @@ impl Timeline { /// an ancestor branch, for example, or waste a lot of cycles chasing the /// non-existing key. /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn get( &self, key: Key, @@ -849,46 +854,6 @@ impl Timeline { } } - /// Retrieve current logical size of the timeline. - /// - /// The size could be lagging behind the actual number, in case - /// the initial size calculation has not been run (gets triggered on the first size access). - /// - /// return size and boolean flag that shows if the size is exact - pub(crate) fn get_current_logical_size( - self: &Arc, - ctx: &RequestContext, - ) -> logical_size::CurrentLogicalSize { - let current_size = self.current_logical_size.current_size(); - debug!("Current size: {current_size:?}"); - - if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) = - (current_size, self.current_logical_size.initial_part_end) - { - self.try_spawn_size_init_task(initial_part_end, ctx); - } - - if let CurrentLogicalSize::Approximate(_) = ¤t_size { - if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler { - let first = self - .current_logical_size - .did_return_approximate_to_walreceiver - .compare_exchange( - false, - true, - AtomicOrdering::Relaxed, - AtomicOrdering::Relaxed, - ) - .is_ok(); - if first { - crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc(); - } - } - } - - current_size - } - /// Check if more than 'checkpoint_distance' of WAL has been accumulated in /// the in-memory layer, and initiate flushing it if so. /// @@ -938,6 +903,7 @@ impl Timeline { background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { + self.spawn_initial_logical_size_computation_task(ctx); self.launch_wal_receiver(ctx, broker_client); self.set_state(TimelineState::Active); self.launch_eviction_task(background_jobs_can_start); @@ -1051,17 +1017,6 @@ impl Timeline { error!("Not activating a Stopping timeline"); } (_, new_state) => { - if matches!( - new_state, - TimelineState::Stopping | TimelineState::Broken { .. } - ) { - // drop the completion guard, if any; it might be holding off the completion - // forever needlessly - self.initial_logical_size_attempt - .lock() - .unwrap_or_else(|e| e.into_inner()) - .take(); - } self.state.send_replace(new_state); } } @@ -1383,8 +1338,6 @@ impl Timeline { walredo_mgr: Arc, resources: TimelineResources, pg_version: u32, - initial_logical_size_can_start: Option, - initial_logical_size_attempt: Option, state: TimelineState, cancel: CancellationToken, ) -> Arc { @@ -1484,8 +1437,6 @@ impl Timeline { ), delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())), - initial_logical_size_can_start, - initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt), cancel, gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")), @@ -1797,39 +1748,91 @@ impl Timeline { Ok(()) } - fn try_spawn_size_init_task(self: &Arc, lsn: Lsn, ctx: &RequestContext) { - let state = self.current_state(); - if matches!( - state, - TimelineState::Broken { .. } | TimelineState::Stopping - ) { - // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). - return; + /// Retrieve current logical size of the timeline. + /// + /// The size could be lagging behind the actual number, in case + /// the initial size calculation has not been run (gets triggered on the first size access). + /// + /// return size and boolean flag that shows if the size is exact + pub(crate) fn get_current_logical_size( + self: &Arc, + priority: GetLogicalSizePriority, + ctx: &RequestContext, + ) -> logical_size::CurrentLogicalSize { + let current_size = self.current_logical_size.current_size(); + debug!("Current size: {current_size:?}"); + + match (current_size.accuracy(), priority) { + (logical_size::Accuracy::Exact, _) => (), // nothing to do + (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => { + // background task will eventually deliver an exact value, we're in no rush + } + (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => { + // background task is not ready, but user is asking for it now; + // => make the background task skip the line + // (The alternative would be to calculate the size here, but, + // it can actually take a long time if the user has a lot of rels. + // And we'll inevitable need it again; So, let the background task do the work.) + match self + .current_logical_size + .cancel_wait_for_background_loop_concurrency_limit_semaphore + .get() + { + Some(cancel) => cancel.cancel(), + None => { + let state = self.current_state(); + if matches!( + state, + TimelineState::Broken { .. } | TimelineState::Stopping + ) { + + // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken). + // Don't make noise. + } else { + warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work"); + } + } + }; + } } - let permit = match Arc::clone(&self.current_logical_size.initial_size_computation) - .try_acquire_owned() - { - Ok(permit) => permit, - Err(TryAcquireError::NoPermits) => { - // computation already ongoing or finished with success - return; + if let CurrentLogicalSize::Approximate(_) = ¤t_size { + if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler { + let first = self + .current_logical_size + .did_return_approximate_to_walreceiver + .compare_exchange( + false, + true, + AtomicOrdering::Relaxed, + AtomicOrdering::Relaxed, + ) + .is_ok(); + if first { + crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc(); + } } - Err(TryAcquireError::Closed) => unreachable!("we never call close"), + } + + current_size + } + + fn spawn_initial_logical_size_computation_task(self: &Arc, ctx: &RequestContext) { + let Some(initial_part_end) = self.current_logical_size.initial_part_end else { + // nothing to do for freshly created timelines; + assert_eq!( + self.current_logical_size.current_size().accuracy(), + logical_size::Accuracy::Exact, + ); + return; }; - debug_assert!(self - .current_logical_size - .initial_logical_size - .get() - .is_none()); - info!( - "spawning logical size computation from context of task kind {:?}", - ctx.task_kind() - ); - let causing_task_kind = ctx.task_kind(); - // We need to start the computation task. - // It gets a separate context since it will outlive the request that called this function. + let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new(); + let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone(); + self.current_logical_size + .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token) + .expect("initial logical size calculation task must be spawned exactly once per Timeline object"); + let self_clone = Arc::clone(self); let background_ctx = ctx.detached_child( TaskKind::InitialLogicalSizeCalculation, @@ -1844,96 +1847,152 @@ impl Timeline { false, // NB: don't log errors here, task_mgr will do that. async move { - let cancel = task_mgr::shutdown_token(); + self_clone + .initial_logical_size_calculation_task( + initial_part_end, + cancel_wait_for_background_loop_concurrency_limit_semaphore, + cancel, + background_ctx, + ) + .await; + Ok(()) + } + .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, timeline_id=%self.timeline_id)), + ); + } - // in case we were created during pageserver initialization, wait for - // initialization to complete before proceeding. startup time init runs on the same - // runtime. - tokio::select! { - _ = cancel.cancelled() => { return Ok(()); }, - _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {} - }; - + async fn initial_logical_size_calculation_task( + self: Arc, + initial_part_end: Lsn, + skip_concurrency_limiter: CancellationToken, + cancel: CancellationToken, + background_ctx: RequestContext, + ) { + enum BackgroundCalculationError { + Cancelled, + Other(anyhow::Error), + } + let try_once = |attempt: usize| { + let background_ctx = &background_ctx; + let self_ref = &self; + let skip_concurrency_limiter = &skip_concurrency_limiter; + async move { + let cancel = task_mgr::shutdown_token(); + let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit( + BackgroundLoopKind::InitialLogicalSizeCalculation, + background_ctx, + &cancel, + ); - // hold off background tasks from starting until all timelines get to try at least - // once initial logical size calculation; though retry will rarely be useful. - // holding off is done because heavier tasks execute blockingly on the same - // runtime. - // - // dropping this at every outcome is probably better than trying to cling on to it, - // delay will be terminated by a timeout regardless. - let completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() }; + use crate::metrics::initial_logical_size::StartCircumstances; + let (_maybe_permit, circumstances) = tokio::select! { + res = wait_for_permit => { + match res { + Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit), + Err(RateLimitError::Cancelled) => { + return Err(BackgroundCalculationError::Cancelled); + } + } + } + () = skip_concurrency_limiter.cancelled() => { + // Some action that is part of a end user interaction requested logical size + // => break out of the rate limit + // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime; + // but then again what happens if they cancel; also, we should just be using + // one runtime across the entire process, so, let's leave this for now. + (None, StartCircumstances::SkippedConcurrencyLimiter) + } + }; - let metrics_guard = match &completion { - Some(_) => crate::metrics::initial_logical_size::START_CALCULATION.first(Some(causing_task_kind)), - None => crate::metrics::initial_logical_size::START_CALCULATION.retry(Some(causing_task_kind)), + let metrics_guard = if attempt == 1 { + crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances) + } else { + crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances) }; - let calculated_size = match self_clone - .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx) + match self_ref + .logical_size_calculation_task( + initial_part_end, + LogicalSizeCalculationCause::Initial, + background_ctx, + ) .await { - Ok(s) => s, + Ok(calculated_size) => Ok((calculated_size, metrics_guard)), Err(CalculateLogicalSizeError::Cancelled) => { - // Don't make noise, this is a common task. - // In the unlikely case that there is another call to this function, we'll retry - // because initial_logical_size is still None. - info!("initial size calculation cancelled, likely timeline delete / tenant detach"); - return Ok(()); + Err(BackgroundCalculationError::Cancelled) } Err(CalculateLogicalSizeError::Other(err)) => { - if let Some(e @ PageReconstructError::AncestorStopping(_)) = + if let Some(PageReconstructError::AncestorStopping(_)) = err.root_cause().downcast_ref() { - // This can happen if the timeline parent timeline switches to - // Stopping state while we're still calculating the initial - // timeline size for the child, for example if the tenant is - // being detached or the pageserver is shut down. Like with - // CalculateLogicalSizeError::Cancelled, don't make noise. - info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}"); - return Ok(()); + Err(BackgroundCalculationError::Cancelled) + } else { + Err(BackgroundCalculationError::Other(err)) } - return Err(err.context("Failed to calculate logical size")); } - }; + } + } + }; - // we cannot query current_logical_size.current_size() to know the current - // *negative* value, only truncated to u64. - let added = self_clone - .current_logical_size - .size_added_after_initial - .load(AtomicOrdering::Relaxed); + let retrying = async { + let mut attempt = 0; + loop { + attempt += 1; + + match try_once(attempt).await { + Ok(res) => return ControlFlow::Continue(res), + Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()), + Err(BackgroundCalculationError::Other(e)) => { + warn!(attempt, "initial size calculation failed: {e:?}"); + // exponential back-off doesn't make sense at these long intervals; + // use fixed retry interval with generous jitter instead + let sleep_duration = Duration::from_secs( + u64::try_from( + // 1hour base + (60_i64 * 60_i64) + // 10min jitter + + rand::thread_rng().gen_range(-10 * 60..10 * 60), + ) + .expect("10min < 1hour"), + ); + tokio::time::sleep(sleep_duration).await; + } + } + } + }; - let sum = calculated_size.saturating_add_signed(added); + let (calculated_size, metrics_guard) = tokio::select! { + res = retrying => { + match res { + ControlFlow::Continue(calculated_size) => calculated_size, + ControlFlow::Break(()) => return, + } + } + _ = cancel.cancelled() => { + return; + } + }; - // set the gauge value before it can be set in `update_current_logical_size`. - self_clone.metrics.current_logical_size_gauge.set(sum); + // we cannot query current_logical_size.current_size() to know the current + // *negative* value, only truncated to u64. + let added = self + .current_logical_size + .size_added_after_initial + .load(AtomicOrdering::Relaxed); - match self_clone - .current_logical_size - .initial_logical_size - .set((calculated_size, metrics_guard.calculation_result_saved())) - { - Ok(()) => (), - Err(_what_we_just_attempted_to_set) => { - let (existing_size, _) = self_clone - .current_logical_size - .initial_logical_size - .get() - .expect("once_cell set was lost, then get failed, impossible."); - // This shouldn't happen because the semaphore is initialized with 1. - // But if it happens, just complain & report success so there are no further retries. - error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing") - } - } - // now that `initial_logical_size.is_some()`, reduce permit count to 0 - // so that we prevent future callers from spawning this task - permit.forget(); - Ok(()) - }.in_current_span(), - ); + let sum = calculated_size.saturating_add_signed(added); + + // set the gauge value before it can be set in `update_current_logical_size`. + self.metrics.current_logical_size_gauge.set(sum); + + self.current_logical_size + .initial_logical_size + .set((calculated_size, metrics_guard.calculation_result_saved())) + .ok() + .expect("only this task sets it"); } pub fn spawn_ondemand_logical_size_calculation( @@ -1971,6 +2030,9 @@ impl Timeline { receiver } + /// # Cancel-Safety + /// + /// This method is cancellation-safe. #[instrument(skip_all)] async fn logical_size_calculation_task( self: &Arc, @@ -2008,6 +2070,10 @@ impl Timeline { /// /// NOTE: counted incrementally, includes ancestors. This can be a slow operation, /// especially if we need to download remote layers. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn calculate_logical_size( &self, up_to_lsn: Lsn, @@ -2123,6 +2189,10 @@ impl Timeline { /// /// This function takes the current timeline's locked LayerMap as an argument, /// so callers can avoid potential race conditions. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. async fn get_reconstruct_data( &self, key: Key, @@ -2371,6 +2441,9 @@ impl Timeline { } } + /// # Cancel-safety + /// + /// This method is cancellation-safe. async fn lookup_cached_page( &self, key: &Key, diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 497796c80ab5..2a103a7ff461 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -21,7 +21,6 @@ use crate::{ }, CreateTimelineCause, DeleteTimelineError, Tenant, }, - InitializationOrder, }; use super::{Timeline, TimelineResources}; @@ -407,7 +406,6 @@ impl DeleteTimelineFlow { local_metadata: &TimelineMetadata, remote_client: Option, deletion_queue_client: DeletionQueueClient, - init_order: Option<&InitializationOrder>, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. @@ -420,7 +418,6 @@ impl DeleteTimelineFlow { remote_client, deletion_queue_client, }, - init_order, // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. CreateTimelineCause::Delete, diff --git a/pageserver/src/tenant/timeline/logical_size.rs b/pageserver/src/tenant/timeline/logical_size.rs index a33fb28ebd83..f2db8c91fc75 100644 --- a/pageserver/src/tenant/timeline/logical_size.rs +++ b/pageserver/src/tenant/timeline/logical_size.rs @@ -1,11 +1,10 @@ use anyhow::Context; -use once_cell::sync::OnceCell; -use tokio::sync::Semaphore; +use once_cell::sync::OnceCell; +use tokio_util::sync::CancellationToken; use utils::lsn::Lsn; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering}; -use std::sync::Arc; /// Internal structure to hold all data needed for logical size calculation. /// @@ -28,8 +27,12 @@ pub(super) struct LogicalSize { crate::metrics::initial_logical_size::FinishedCalculationGuard, )>, - /// Semaphore to track ongoing calculation of `initial_logical_size`. - pub initial_size_computation: Arc, + /// Cancellation for the best-effort logical size calculation. + /// + /// The token is kept in a once-cell so that we can error out if a higher priority + /// request comes in *before* we have started the normal logical size calculation. + pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore: + OnceCell, /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. pub initial_part_end: Option, @@ -72,7 +75,7 @@ pub(crate) enum CurrentLogicalSize { Exact(Exact), } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum Accuracy { Approximate, Exact, @@ -115,11 +118,10 @@ impl LogicalSize { Self { initial_logical_size: OnceCell::with_value((0, { crate::metrics::initial_logical_size::START_CALCULATION - .first(None) + .first(crate::metrics::initial_logical_size::StartCircumstances::EmptyInitial) .calculation_result_saved() })), - // initial_logical_size already computed, so, don't admit any calculations - initial_size_computation: Arc::new(Semaphore::new(0)), + cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(), initial_part_end: None, size_added_after_initial: AtomicI64::new(0), did_return_approximate_to_walreceiver: AtomicBool::new(false), @@ -129,7 +131,7 @@ impl LogicalSize { pub(super) fn deferred_initial(compute_to: Lsn) -> Self { Self { initial_logical_size: OnceCell::new(), - initial_size_computation: Arc::new(Semaphore::new(1)), + cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell::new(), initial_part_end: Some(compute_to), size_added_after_initial: AtomicI64::new(0), did_return_approximate_to_walreceiver: AtomicBool::new(false), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 7045658f2415..3bcb7ff891fd 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -397,7 +397,10 @@ pub(super) async fn handle_walreceiver_connection( // Send the replication feedback message. // Regular standby_status_update fields are put into this message. let current_timeline_size = timeline - .get_current_logical_size(&ctx) + .get_current_logical_size( + crate::tenant::timeline::GetLogicalSizePriority::User, + &ctx, + ) // FIXME: https://github.com/neondatabase/neon/issues/5963 .size_dont_care_about_accuracy(); let status_update = PageserverFeedback { diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index a4cd42b6c378..86a749eaf313 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -384,7 +384,7 @@ def get_resident_physical_size(): env.pageserver.allowed_errors.extend( [ ".*download failed: downloading evicted layer file failed.*", - f".*initial size calculation.*{tenant_id}.*{timeline_id}.*Failed to calculate logical size", + f".*initial_size_calculation.*{tenant_id}.*{timeline_id}.*initial size calculation failed: downloading evicted layer file failed", ] ) diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 443b0812fd23..3cac32b79087 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -106,7 +106,6 @@ def assert_complete(): # Initial tenant load should reflect the delay we injected ("initial_tenant_load", lambda t, p: t >= (tenant_load_delay_ms / 1000.0) and t >= p), # Subsequent steps should occur in expected order - ("initial_logical_sizes", lambda t, p: t > 0 and t >= p), ("background_jobs_can_start", lambda t, p: t > 0 and t >= p), ("complete", lambda t, p: t > 0 and t >= p), ]