Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: add tokio-epoll-uring slots waiters queue depth metrics #9482

Merged
merged 14 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libs/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use once_cell::sync::Lazy;
use prometheus::core::{
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
};
pub use prometheus::local::LocalHistogram;
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
Expand Down
115 changes: 112 additions & 3 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3034,13 +3034,111 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}

pub mod tokio_epoll_uring {
use metrics::{register_int_counter, UIntGauge};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use metrics::{register_histogram, register_int_counter, Histogram, LocalHistogram, UIntGauge};
use once_cell::sync::Lazy;

/// Shared storage for tokio-epoll-uring thread local metrics.
pub(crate) static THREAD_LOCAL_METRICS_STORAGE: Lazy<ThreadLocalMetricsStorage> =
Lazy::new(|| {
let slots_submission_queue_depth = register_histogram!(
"pageserver_tokio_epoll_uring_slots_submission_queue_depth",
"The slots waiters queue depth of each tokio_epoll_uring system",
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
)
.expect("failed to define a metric");
ThreadLocalMetricsStorage {
observers: Mutex::new(HashMap::new()),
slots_submission_queue_depth,
}
});

pub struct ThreadLocalMetricsStorage {
/// List of thread local metrics observers.
observers: Mutex<HashMap<u64, Arc<ThreadLocalMetrics>>>,
/// A histogram shared between all thread local systems
/// for collecting slots submission queue depth.
slots_submission_queue_depth: Histogram,
}

/// Each thread-local [`tokio_epoll_uring::System`] gets one of these as its
/// [`tokio_epoll_uring::metrics::PerSystemMetrics`] generic.
///
/// The System makes observations into [`Self`] and periodically, the collector
/// comes along and flushes [`Self`] into the shared storage [`THREAD_LOCAL_METRICS_STORAGE`].
///
/// [`LocalHistogram`] is `!Send`, so, we need to put it behind a [`Mutex`].
/// But except for the periodic flush, the lock is uncontended so there's no waiting
/// for cache coherence protocol to get an exclusive cache line.
pub struct ThreadLocalMetrics {
/// Local observer of thread local tokio-epoll-uring system's slots waiters queue depth.
slots_submission_queue_depth: Mutex<LocalHistogram>,
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
}

impl ThreadLocalMetricsStorage {
/// Registers a new thread local system. Returns a thread local metrics observer.
pub fn register_system(&self, id: u64) -> Arc<ThreadLocalMetrics> {
let per_system_metrics = Arc::new(ThreadLocalMetrics::new(
self.slots_submission_queue_depth.local(),
));
let mut g = self.observers.lock().unwrap();
g.insert(id, Arc::clone(&per_system_metrics));
per_system_metrics
}

/// Removes metrics observer for a thread local system.
/// This should be called before dropping a thread local system.
pub fn remove_system(&self, id: u64) {
let mut g = self.observers.lock().unwrap();
g.remove(&id);
}

/// Flush all thread local metrics to the shared storage.
pub fn flush_thread_local_metrics(&self) {
let g = self.observers.lock().unwrap();
g.values().for_each(|local| {
local.flush();
});
}
}

impl ThreadLocalMetrics {
pub fn new(slots_submission_queue_depth: LocalHistogram) -> Self {
ThreadLocalMetrics {
slots_submission_queue_depth: Mutex::new(slots_submission_queue_depth),
}
}

/// Flushes the thread local metrics to shared aggregator.
pub fn flush(&self) {
let Self {
slots_submission_queue_depth,
} = self;
slots_submission_queue_depth.lock().unwrap().flush();
}
}

impl tokio_epoll_uring::metrics::PerSystemMetrics for ThreadLocalMetrics {
fn observe_slots_submission_queue_depth(&self, queue_depth: u64) {
let Self {
slots_submission_queue_depth,
} = self;
slots_submission_queue_depth
.lock()
.unwrap()
.observe(queue_depth as f64);
}
}

pub struct Collector {
descs: Vec<metrics::core::Desc>,
systems_created: UIntGauge,
systems_destroyed: UIntGauge,
thread_local_metrics_storage: &'static ThreadLocalMetricsStorage,
}

impl metrics::core::Collector for Collector {
Expand All @@ -3050,20 +3148,29 @@ pub mod tokio_epoll_uring {

fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(Self::NMETRICS);
let tokio_epoll_uring::metrics::Metrics {
let tokio_epoll_uring::metrics::GlobalMetrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
self.systems_created.set(systems_created);
mfs.extend(self.systems_created.collect());
self.systems_destroyed.set(systems_destroyed);
mfs.extend(self.systems_destroyed.collect());

self.thread_local_metrics_storage
.flush_thread_local_metrics();

mfs.extend(
self.thread_local_metrics_storage
.slots_submission_queue_depth
.collect(),
);
mfs
}
}

impl Collector {
const NMETRICS: usize = 2;
const NMETRICS: usize = 3;

#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Expand Down Expand Up @@ -3095,6 +3202,7 @@ pub mod tokio_epoll_uring {
descs,
systems_created,
systems_destroyed,
thread_local_metrics_storage: &THREAD_LOCAL_METRICS_STORAGE,
}
}
}
Expand Down Expand Up @@ -3454,6 +3562,7 @@ pub fn preinitialize_metrics() {
Lazy::force(&RECONSTRUCT_TIME);
Lazy::force(&BASEBACKUP_QUERY_TIME);
Lazy::force(&COMPUTE_COMMANDS_COUNTERS);
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);

tenant_throttling::preinitialize_global_metrics();
}
18 changes: 13 additions & 5 deletions pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@ use tokio_epoll_uring::{System, SystemHandle};

use crate::virtual_file::on_fatal_io_error;

use crate::metrics::tokio_epoll_uring as metrics;
use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE};

#[derive(Clone)]
struct ThreadLocalState(Arc<ThreadLocalStateInner>);

struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle>,
cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
launch_attempts: AtomicU32,
/// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
thread_local_state_id: u64,
}

impl Drop for ThreadLocalStateInner {
fn drop(&mut self) {
THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
}
}

impl ThreadLocalState {
pub fn new() -> Self {
Self(Arc::new(ThreadLocalStateInner {
Expand Down Expand Up @@ -71,7 +77,8 @@ pub async fn thread_local_system() -> Handle {
&fake_cancel,
)
.await;
let res = System::launch()
let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
let res = System::launch_with_metrics(per_system_metrics)
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {
Expand All @@ -86,6 +93,7 @@ pub async fn thread_local_system() -> Handle {
emit_launch_failure_process_stats();
});
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
Err(())
}
// abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
Expand Down Expand Up @@ -115,7 +123,7 @@ fn emit_launch_failure_process_stats() {
// number of threads
// rss / system memory usage generally

let tokio_epoll_uring::metrics::Metrics {
let tokio_epoll_uring::metrics::GlobalMetrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
Expand Down Expand Up @@ -182,7 +190,7 @@ fn emit_launch_failure_process_stats() {
pub struct Handle(ThreadLocalState);

impl std::ops::Deref for Handle {
type Target = SystemHandle;
type Target = SystemHandle<metrics::ThreadLocalMetrics>;

fn deref(&self) -> &Self::Target {
self.0
Expand Down
1 change: 1 addition & 0 deletions test_runner/fixtures/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def counter(name: str) -> str:
counter("pageserver_tenant_throttling_count_accounted_finish_global"),
counter("pageserver_tenant_throttling_wait_usecs_sum_global"),
counter("pageserver_tenant_throttling_count_global"),
*histogram("pageserver_tokio_epoll_uring_slots_submission_queue_depth"),
)

PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
Expand Down
Loading