Skip to content

Commit

Permalink
refactor(consumption_metrics): pre-split cleanup (#5325)
Browse files Browse the repository at this point in the history
Cleanups in preparation to splitting the consumption_metrics.rs in
#5326.

Split off from #5297.
  • Loading branch information
koivunej authored Sep 16, 2023
1 parent 74d99b5 commit 9cf4ae8
Showing 1 changed file with 126 additions and 129 deletions.
255 changes: 126 additions & 129 deletions pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//!
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
//! Cache metrics to send only the updated ones.
//!
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
Expand Down Expand Up @@ -264,113 +261,74 @@ pub async fn collect_metrics(

let final_path: Arc<PathBuf> = Arc::new(local_disk_storage);

let cancel = task_mgr::shutdown_token();
let restore_and_reschedule = restore_and_reschedule(&final_path, metric_collection_interval);

let mut cached_metrics = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
ret = restore_and_reschedule => ret,
};

// define client here to reuse it for all requests
let client = reqwest::ClientBuilder::new()
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
.build()
.expect("Failed to create http client with timeout");

let node_id = node_id.to_string();
let cancel = task_mgr::shutdown_token();

let (mut cached_metrics, oldest_metric_captured_at) =
match read_metrics_from_disk(final_path.clone()).await {
Ok(found_some) => {
// there is no min needed because we write these sequentially in
// collect_all_metrics
let oldest_metric_captured_at = found_some
.iter()
.map(|(_, (et, _))| et.recorded_at())
.copied()
.next();

let cached = found_some
.into_iter()
.collect::<HashMap<MetricsKey, (EventType, u64)>>();

(cached, oldest_metric_captured_at)
}
Err(e) => {
let root = e.root_cause();

let maybe_ioerr = root.downcast_ref::<std::io::Error>();
let is_not_found =
maybe_ioerr.is_some_and(|e| e.kind() == std::io::ErrorKind::NotFound);

if !is_not_found {
tracing::info!(
"failed to read any previous metrics from {final_path:?}: {e:#}"
);
}
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(metric_collection_interval);

(HashMap::new(), None)
}
loop {
let tick_at = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};

if let Some(oldest_metric_captured_at) = oldest_metric_captured_at {
// FIXME: chrono methods panic
let oldest_metric_captured_at: SystemTime = oldest_metric_captured_at.into();
let now = SystemTime::now();
let error = match now.duration_since(oldest_metric_captured_at) {
Ok(from_last_send) if from_last_send < metric_collection_interval => {
let sleep_for = metric_collection_interval - from_last_send;
// these are point in time, with variable "now"
let metrics = collect_all_metrics(&cached_metrics, &ctx).await;

let deadline = std::time::Instant::now() + sleep_for;
if metrics.is_empty() {
continue;
}

tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = tokio::time::sleep_until(deadline.into()) => {},
}
let metrics = Arc::new(metrics);

let now = std::time::Instant::now();
// why not race cancellation here? because we are one of the last tasks, and if we are
// already here, better to try to flush the new values.

// executor threads might be busy, add extra measurements
Some(if now < deadline {
deadline - now
} else {
now - deadline
})
}
Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
Err(_) => {
tracing::warn!(
?now,
?oldest_metric_captured_at,
"oldest recorded metric is in future; first values will come out with inconsistent timestamps"
);
oldest_metric_captured_at.duration_since(now).ok()
let flush = async {
match flush_metrics_to_disk(&metrics, &final_path).await {
Ok(()) => {
tracing::debug!("flushed metrics to disk");
}
Err(e) => {
// idea here is that if someone creates a directory as our final_path, then they
// might notice it from the logs before shutdown and remove it
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
}
}
};

if let Some(error) = error {
if error.as_secs() >= 60 {
tracing::info!(
error_ms = error.as_millis(),
"startup scheduling error due to restart"
)
let upload = async {
let res = upload_metrics(
&client,
metric_collection_endpoint,
&cancel,
&node_id,
&metrics,
&mut cached_metrics,
)
.await;
if let Err(e) = res {
// serialization error which should never happen
tracing::error!("failed to upload due to {e:#}");
}
}
}

// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(metric_collection_interval);

loop {
let tick_at = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};

iteration(
&client,
metric_collection_endpoint,
&cancel,
&mut cached_metrics,
&node_id,
&final_path,
&ctx,
)
.await;
// let these run concurrently
let (_, _) = tokio::join!(flush, upload);

crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
Expand All @@ -380,55 +338,94 @@ pub async fn collect_metrics(
}
}

async fn iteration(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
cancel: &CancellationToken,
cached_metrics: &mut Cache,
node_id: &str,
/// Called on the first iteration in an attempt to join the metric uploading schedule from previous
/// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
///
/// Cancellation safe.
async fn restore_and_reschedule(
final_path: &Arc<PathBuf>,
ctx: &RequestContext,
) {
// these are point in time, with variable "now"
let metrics = collect_all_metrics(cached_metrics, ctx).await;
metric_collection_interval: Duration,
) -> Cache {
let (cached, earlier_metric_at) = match read_metrics_from_disk(final_path.clone()).await {
Ok(found_some) => {
// there is no min needed because we write these sequentially in
// collect_all_metrics
let earlier_metric_at = found_some
.iter()
.map(|(_, (et, _))| et.recorded_at())
.copied()
.next();

let cached = found_some.into_iter().collect::<Cache>();

(cached, earlier_metric_at)
}
Err(e) => {
use std::io::{Error, ErrorKind};

if metrics.is_empty() {
return;
}
let root = e.root_cause();

let metrics = Arc::new(metrics);
let maybe_ioerr = root.downcast_ref::<Error>();
let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);

let flush = async {
match flush_metrics_to_disk(&metrics, final_path).await {
Ok(()) => {
tracing::debug!("flushed metrics to disk");
}
Err(e) => {
// idea here is that if someone creates a directory as our final_path, then they
// might notice it from the logs before shutdown and remove it
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
if !is_not_found {
tracing::info!("failed to read any previous metrics from {final_path:?}: {e:#}");
}

(HashMap::new(), None)
}
};

let upload = async {
let res = upload_metrics(
client,
metric_collection_endpoint,
cancel,
node_id,
&metrics,
cached_metrics,
)
.await;
if let Err(e) = res {
// serialization error which should never happen
tracing::error!("failed to upload due to {e:#}");
if let Some(earlier_metric_at) = earlier_metric_at {
let earlier_metric_at: SystemTime = earlier_metric_at.into();

let error = reschedule(earlier_metric_at, metric_collection_interval).await;

if let Some(error) = error {
if error.as_secs() >= 60 {
tracing::info!(
error_ms = error.as_millis(),
"startup scheduling error due to restart"
)
}
}
};
}

// let these run concurrently
let (_, _) = tokio::join!(flush, upload);
cached
}

async fn reschedule(
earlier_metric_at: SystemTime,
metric_collection_interval: Duration,
) -> Option<Duration> {
let now = SystemTime::now();
match now.duration_since(earlier_metric_at) {
Ok(from_last_send) if from_last_send < metric_collection_interval => {
let sleep_for = metric_collection_interval - from_last_send;

let deadline = std::time::Instant::now() + sleep_for;

tokio::time::sleep_until(deadline.into()).await;

let now = std::time::Instant::now();

// executor threads might be busy, add extra measurements
Some(if now < deadline {
deadline - now
} else {
now - deadline
})
}
Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
Err(_) => {
tracing::warn!(
?now,
?earlier_metric_at,
"oldest recorded metric is in future; first values will come out with inconsistent timestamps"
);
earlier_metric_at.duration_since(now).ok()
}
}
}

async fn collect_all_metrics(cached_metrics: &Cache, ctx: &RequestContext) -> Vec<RawMetric> {
Expand Down Expand Up @@ -929,7 +926,7 @@ async fn calculate_synthetic_size_worker(
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");

// reminder: this ticker is ready right away
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;

Expand Down

1 comment on commit 9cf4ae8

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2552 tests run: 2425 passed, 0 failed, 127 skipped (full report)


Flaky tests (3)

Postgres 16

  • test_partial_evict_tenant: debug
  • test_pageserver_recovery: release

Postgres 14

  • test_get_tenant_size_with_multiple_branches: debug

Code coverage (full report)

  • functions: 53.0% (7750 of 14615 functions)
  • lines: 81.0% (45247 of 55829 lines)

The comment gets automatically updated with the latest test results
9cf4ae8 at 2023-09-16T15:58:59.530Z :recycle:

Please sign in to comment.