diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 065f74045a63..3e7b8f62c766 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -1,8 +1,5 @@ -//! //! Periodically collect consumption metrics for all active tenants //! and push them to a HTTP endpoint. -//! Cache metrics to send only the updated ones. -//! use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::{mgr, LogicalSizeCalculationCause}; @@ -264,6 +261,14 @@ pub async fn collect_metrics( let final_path: Arc = Arc::new(local_disk_storage); + let cancel = task_mgr::shutdown_token(); + let restore_and_reschedule = restore_and_reschedule(&final_path, metric_collection_interval); + + let mut cached_metrics = tokio::select! { + _ = cancel.cancelled() => return Ok(()), + ret = restore_and_reschedule => ret, + }; + // define client here to reuse it for all requests let client = reqwest::ClientBuilder::new() .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT) @@ -271,106 +276,59 @@ pub async fn collect_metrics( .expect("Failed to create http client with timeout"); let node_id = node_id.to_string(); - let cancel = task_mgr::shutdown_token(); - let (mut cached_metrics, oldest_metric_captured_at) = - match read_metrics_from_disk(final_path.clone()).await { - Ok(found_some) => { - // there is no min needed because we write these sequentially in - // collect_all_metrics - let oldest_metric_captured_at = found_some - .iter() - .map(|(_, (et, _))| et.recorded_at()) - .copied() - .next(); - - let cached = found_some - .into_iter() - .collect::>(); - - (cached, oldest_metric_captured_at) - } - Err(e) => { - let root = e.root_cause(); - - let maybe_ioerr = root.downcast_ref::(); - let is_not_found = - maybe_ioerr.is_some_and(|e| e.kind() == std::io::ErrorKind::NotFound); - - if !is_not_found { - tracing::info!( - "failed to read any previous metrics from {final_path:?}: {e:#}" - ); - } + // reminder: ticker is ready immediatedly + let mut ticker = tokio::time::interval(metric_collection_interval); - (HashMap::new(), None) - } + loop { + let tick_at = tokio::select! { + _ = cancel.cancelled() => return Ok(()), + tick_at = ticker.tick() => tick_at, }; - if let Some(oldest_metric_captured_at) = oldest_metric_captured_at { - // FIXME: chrono methods panic - let oldest_metric_captured_at: SystemTime = oldest_metric_captured_at.into(); - let now = SystemTime::now(); - let error = match now.duration_since(oldest_metric_captured_at) { - Ok(from_last_send) if from_last_send < metric_collection_interval => { - let sleep_for = metric_collection_interval - from_last_send; + // these are point in time, with variable "now" + let metrics = collect_all_metrics(&cached_metrics, &ctx).await; - let deadline = std::time::Instant::now() + sleep_for; + if metrics.is_empty() { + continue; + } - tokio::select! { - _ = cancel.cancelled() => { return Ok(()); }, - _ = tokio::time::sleep_until(deadline.into()) => {}, - } + let metrics = Arc::new(metrics); - let now = std::time::Instant::now(); + // why not race cancellation here? because we are one of the last tasks, and if we are + // already here, better to try to flush the new values. - // executor threads might be busy, add extra measurements - Some(if now < deadline { - deadline - now - } else { - now - deadline - }) - } - Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)), - Err(_) => { - tracing::warn!( - ?now, - ?oldest_metric_captured_at, - "oldest recorded metric is in future; first values will come out with inconsistent timestamps" - ); - oldest_metric_captured_at.duration_since(now).ok() + let flush = async { + match flush_metrics_to_disk(&metrics, &final_path).await { + Ok(()) => { + tracing::debug!("flushed metrics to disk"); + } + Err(e) => { + // idea here is that if someone creates a directory as our final_path, then they + // might notice it from the logs before shutdown and remove it + tracing::error!("failed to persist metrics to {final_path:?}: {e:#}"); + } } }; - if let Some(error) = error { - if error.as_secs() >= 60 { - tracing::info!( - error_ms = error.as_millis(), - "startup scheduling error due to restart" - ) + let upload = async { + let res = upload_metrics( + &client, + metric_collection_endpoint, + &cancel, + &node_id, + &metrics, + &mut cached_metrics, + ) + .await; + if let Err(e) = res { + // serialization error which should never happen + tracing::error!("failed to upload due to {e:#}"); } - } - } - - // reminder: ticker is ready immediatedly - let mut ticker = tokio::time::interval(metric_collection_interval); - - loop { - let tick_at = tokio::select! { - _ = cancel.cancelled() => return Ok(()), - tick_at = ticker.tick() => tick_at, }; - iteration( - &client, - metric_collection_endpoint, - &cancel, - &mut cached_metrics, - &node_id, - &final_path, - &ctx, - ) - .await; + // let these run concurrently + let (_, _) = tokio::join!(flush, upload); crate::tenant::tasks::warn_when_period_overrun( tick_at.elapsed(), @@ -380,55 +338,94 @@ pub async fn collect_metrics( } } -async fn iteration( - client: &reqwest::Client, - metric_collection_endpoint: &reqwest::Url, - cancel: &CancellationToken, - cached_metrics: &mut Cache, - node_id: &str, +/// Called on the first iteration in an attempt to join the metric uploading schedule from previous +/// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts. +/// +/// Cancellation safe. +async fn restore_and_reschedule( final_path: &Arc, - ctx: &RequestContext, -) { - // these are point in time, with variable "now" - let metrics = collect_all_metrics(cached_metrics, ctx).await; + metric_collection_interval: Duration, +) -> Cache { + let (cached, earlier_metric_at) = match read_metrics_from_disk(final_path.clone()).await { + Ok(found_some) => { + // there is no min needed because we write these sequentially in + // collect_all_metrics + let earlier_metric_at = found_some + .iter() + .map(|(_, (et, _))| et.recorded_at()) + .copied() + .next(); + + let cached = found_some.into_iter().collect::(); + + (cached, earlier_metric_at) + } + Err(e) => { + use std::io::{Error, ErrorKind}; - if metrics.is_empty() { - return; - } + let root = e.root_cause(); - let metrics = Arc::new(metrics); + let maybe_ioerr = root.downcast_ref::(); + let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound); - let flush = async { - match flush_metrics_to_disk(&metrics, final_path).await { - Ok(()) => { - tracing::debug!("flushed metrics to disk"); - } - Err(e) => { - // idea here is that if someone creates a directory as our final_path, then they - // might notice it from the logs before shutdown and remove it - tracing::error!("failed to persist metrics to {final_path:?}: {e:#}"); + if !is_not_found { + tracing::info!("failed to read any previous metrics from {final_path:?}: {e:#}"); } + + (HashMap::new(), None) } }; - let upload = async { - let res = upload_metrics( - client, - metric_collection_endpoint, - cancel, - node_id, - &metrics, - cached_metrics, - ) - .await; - if let Err(e) = res { - // serialization error which should never happen - tracing::error!("failed to upload due to {e:#}"); + if let Some(earlier_metric_at) = earlier_metric_at { + let earlier_metric_at: SystemTime = earlier_metric_at.into(); + + let error = reschedule(earlier_metric_at, metric_collection_interval).await; + + if let Some(error) = error { + if error.as_secs() >= 60 { + tracing::info!( + error_ms = error.as_millis(), + "startup scheduling error due to restart" + ) + } } - }; + } - // let these run concurrently - let (_, _) = tokio::join!(flush, upload); + cached +} + +async fn reschedule( + earlier_metric_at: SystemTime, + metric_collection_interval: Duration, +) -> Option { + let now = SystemTime::now(); + match now.duration_since(earlier_metric_at) { + Ok(from_last_send) if from_last_send < metric_collection_interval => { + let sleep_for = metric_collection_interval - from_last_send; + + let deadline = std::time::Instant::now() + sleep_for; + + tokio::time::sleep_until(deadline.into()).await; + + let now = std::time::Instant::now(); + + // executor threads might be busy, add extra measurements + Some(if now < deadline { + deadline - now + } else { + now - deadline + }) + } + Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)), + Err(_) => { + tracing::warn!( + ?now, + ?earlier_metric_at, + "oldest recorded metric is in future; first values will come out with inconsistent timestamps" + ); + earlier_metric_at.duration_since(now).ok() + } + } } async fn collect_all_metrics(cached_metrics: &Cache, ctx: &RequestContext) -> Vec { @@ -929,7 +926,7 @@ async fn calculate_synthetic_size_worker( ) -> anyhow::Result<()> { info!("starting calculate_synthetic_size_worker"); - // reminder: this ticker is ready right away + // reminder: ticker is ready immediatedly let mut ticker = tokio::time::interval(synthetic_size_calculation_interval); let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;