Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: switch to metrics_derive #832

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ futures-util = "0.3.30"
itertools = "0.13.0"
jsonrpsee = "0.24.5"
metrics = "0.23.0"
metrics-derive = "0.1.0"
mockall = "0.13.0"
parse-display = "0.10.0"
pin-project = "1.1.5"
Expand Down
2 changes: 2 additions & 0 deletions bin/rundler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ tracing.workspace = true
tracing-appender = "0.2.3"
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json"] }
metrics-derive.workspace = true

272 changes: 200 additions & 72 deletions bin/rundler/src/cli/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
use std::{net::SocketAddr, time::Duration};

use itertools::Itertools;
use metrics::gauge;
use metrics::Gauge;
use metrics_derive::Metrics;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_process::Collector;
use metrics_util::layers::{PrefixLayer, Stack};
Expand Down Expand Up @@ -66,11 +67,13 @@ pub fn initialize<'a, T: TaskSpawner>(
let frequency = std::time::Duration::from_millis(sample_interval_millis);
let runtime_metrics = handle.metrics();
let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
let tokio_runtime_metrics = TokioMetrics::default();

task_spawner.spawn_critical(
"tokio metrics collector",
Box::pin(async move {
for metrics in runtime_monitor.intervals() {
collect_tokio(&runtime_metrics, metrics);
collect_tokio(&runtime_metrics, metrics, &tokio_runtime_metrics);
tokio::time::sleep(frequency).await;
}
}),
Expand All @@ -79,77 +82,202 @@ pub fn initialize<'a, T: TaskSpawner>(
Ok(())
}

const TOKIO_PREFIX: &str = "tokio_rt_";
#[allow(dead_code)]
#[derive(Metrics)]
#[metrics(scope = "tokio_rt")]
pub(crate) struct TokioMetrics {
#[metric(describe = "the total number of tokio wokers.")]
num_workers: Gauge,
#[metric(describe = "the number of blocking threads.")]
num_blocking_threads: Gauge,
#[metric(
rename = "active_tasks_count",
describe = "the number of active threads."
)]
num_alive_tasks: Gauge,
#[metric(describe = "the number of idle threads.")]
num_idle_blocking_threads: Gauge,
#[metric(describe = "the number of tasks currently scheduled in the blocking thread pool.")]
blocking_queue_depth: Gauge,
#[metric(describe = "the number of times worker threads parked.")]
total_park_count: Gauge,
#[metric(describe = "the maximum number of times any worker thread parked.")]
max_park_count: Gauge,
#[metric(describe = "the minimum number of times any worker thread parked.")]
min_park_count: Gauge,
#[metric(describe = "the average duration of a single invocation of poll on a task.")]
mean_poll_duration: Gauge,
#[metric(
describe = "the average duration of a single invocation of poll on a task on the worker with the lowest value."
)]
mean_poll_duration_worker_min: Gauge,
#[metric(
describe = "the average duration of a single invocation of poll on a task on the worker with the highest value."
)]
mean_poll_duration_worker_max: Gauge,

#[metric(
describe = "the number of times worker threads unparked but performed no work before parking again."
)]
total_noop_count: Gauge,
#[metric(
describe = "the maximum number of times any worker thread unparked but performed no work before parking again."
)]
max_noop_count: Gauge,
#[metric(
describe = "the minimum number of times any worker thread unparked but performed no work before parking again."
)]
min_noop_count: Gauge,

#[metric(describe = "the number of tasks worker threads stole from another worker thread.")]
total_steal_count: Gauge,
#[metric(
describe = "the maximum number of times any worker thread unparked but performed no work before parking again."
)]
max_steal_count: Gauge,
#[metric(
describe = "the minimum number of times any worker thread unparked but performed no work before parking again."
)]
min_steal_count: Gauge,

#[metric(
describe = "the number of times worker threads stole tasks from another worker thread."
)]
total_steal_operations: Gauge,
#[metric(
describe = "the maximum number of any worker thread stole tasks from another worker thread."
)]
max_steal_operations: Gauge,
#[metric(
describe = "the maximum number of any worker thread stole tasks from another worker thread."
)]
min_steal_operations: Gauge,

#[metric(describe = "the number of tasks scheduled from outside of the runtime.")]
num_remote_schedules: Gauge,

#[metric(describe = "the number of tasks scheduled from worker threads.")]
total_local_schedule_count: Gauge,
#[metric(describe = "the maximum number of tasks scheduled from any one worker thread.")]
max_local_schedule_count: Gauge,
#[metric(describe = "the minimum number of tasks scheduled from any one worker thread.")]
min_local_schedule_count: Gauge,

#[metric(describe = "the number of times worker threads saturated their local queues.")]
total_overflow_count: Gauge,
#[metric(describe = "the maximum number of times any one worker saturated its local queue.")]
max_overflow_count: Gauge,
#[metric(describe = "the minimum number of times any one worker saturated its local queue.")]
min_overflow_count: Gauge,

#[metric(describe = "the number of tasks that have been polled across all worker threads.")]
total_polls_count: Gauge,
#[metric(describe = "the maximum number of tasks that have been polled in any worker thread.")]
max_polls_count: Gauge,
#[metric(describe = "the minimum number of tasks that have been polled in any worker thread.")]
min_polls_count: Gauge,

#[metric(describe = "the amount of time worker threads were busy.")]
total_busy_duration: Gauge,
#[metric(describe = "the maximum amount of time a worker thread was busy.")]
max_busy_duration: Gauge,
#[metric(describe = "the minimum amount of time a worker thread was busy.")]
min_busy_duration: Gauge,

#[metric(
describe = "the number of tasks currently scheduled in the runtime's injection queue."
)]
injection_queue_depth: Gauge,
#[metric(describe = "the total number of tasks currently scheduled in workers' local queues.")]
total_local_queue_depth: Gauge,
#[metric(
describe = "the maximum number of tasks currently scheduled any worker's local queue."
)]
max_local_queue_depth: Gauge,
#[metric(
describe = "the minimum number of tasks currently scheduled any worker's local queue."
)]
min_local_queue_depth: Gauge,

#[metric(
describe = "the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
)]
budget_forced_yield_count: Gauge,
#[metric(describe = "the number of ready events processed by the runtime’s I/O driver.")]
io_driver_ready_count: Gauge,
}

macro_rules! log_rm_metric {
($tm:ident, $rm:ident, $metric_name:ident) => {
$tm.$metric_name.set($rm.$metric_name() as f64);
};
}

macro_rules! log_wm_metric {
($tm:ident, $rm:ident, $metric_name:ident) => {
$tm.$metric_name.set($rm.$metric_name as f64);
};
($tm:ident, $rm:ident, $metric_name:ident, $converter:ident) => {
$tm.$metric_name.set($rm.$metric_name.$converter() as f64);
};
}

fn collect_tokio(
runtime_metrics: &tokio::runtime::RuntimeMetrics,
worker_metrics: tokio_metrics::RuntimeMetrics,
rm: &tokio::runtime::RuntimeMetrics,
wm: tokio_metrics::RuntimeMetrics,
tm: &TokioMetrics,
) {
gauge!(format!("{}num_workers", TOKIO_PREFIX)).set(runtime_metrics.num_workers() as f64);
gauge!(format!("{}num_blocking_threads", TOKIO_PREFIX))
.set(runtime_metrics.num_blocking_threads() as f64);
gauge!(format!("{}active_tasks_count", TOKIO_PREFIX))
.set(runtime_metrics.num_alive_tasks() as f64);
gauge!(format!("{}num_idle_blocking_threads", TOKIO_PREFIX))
.set(runtime_metrics.num_idle_blocking_threads() as f64);
gauge!(format!("{}blocking_queue_depth", TOKIO_PREFIX))
.set(runtime_metrics.blocking_queue_depth() as f64);
gauge!(format!("{}total_park_count", TOKIO_PREFIX)).set(worker_metrics.total_park_count as f64);
gauge!(format!("{}max_park_count", TOKIO_PREFIX)).set(worker_metrics.max_park_count as f64);
gauge!(format!("{}min_park_count", TOKIO_PREFIX)).set(worker_metrics.min_park_count as f64);
gauge!(format!("{}mean_poll_duration", TOKIO_PREFIX))
.set(worker_metrics.mean_poll_duration.as_secs_f64());
gauge!(format!("{}mean_poll_duration_worker_min", TOKIO_PREFIX))
.set(worker_metrics.mean_poll_duration_worker_min.as_secs_f64());
gauge!(format!("{}mean_poll_duration_worker_max", TOKIO_PREFIX))
.set(worker_metrics.mean_poll_duration_worker_max.as_secs_f64());
gauge!(format!("{}total_noop_count", TOKIO_PREFIX)).set(worker_metrics.total_noop_count as f64);
gauge!(format!("{}max_noop_count", TOKIO_PREFIX)).set(worker_metrics.max_noop_count as f64);
gauge!(format!("{}min_noop_count", TOKIO_PREFIX)).set(worker_metrics.min_noop_count as f64);
gauge!(format!("{}total_steal_count", TOKIO_PREFIX))
.set(worker_metrics.total_steal_count as f64);
gauge!(format!("{}max_steal_count", TOKIO_PREFIX),).set(worker_metrics.max_steal_count as f64);
gauge!(format!("{}min_steal_count", TOKIO_PREFIX),).set(worker_metrics.min_steal_count as f64);
gauge!(format!("{}total_steal_operations", TOKIO_PREFIX))
.set(worker_metrics.total_steal_operations as f64);
gauge!(format!("{}max_steal_operations", TOKIO_PREFIX))
.set(worker_metrics.max_steal_operations as f64);
gauge!(format!("{}min_steal_operations", TOKIO_PREFIX))
.set(worker_metrics.min_steal_operations as f64);
gauge!(format!("{}num_remote_schedules", TOKIO_PREFIX))
.set(worker_metrics.num_remote_schedules as f64);
gauge!(format!("{}total_local_schedule_count", TOKIO_PREFIX))
.set(worker_metrics.total_local_schedule_count as f64);
gauge!(format!("{}max_local_schedule_count", TOKIO_PREFIX),)
.set(worker_metrics.max_local_schedule_count as f64);
gauge!(format!("{}min_local_schedule_count", TOKIO_PREFIX),)
.set(worker_metrics.min_local_schedule_count as f64);
gauge!(format!("{}total_overflow_count", TOKIO_PREFIX))
.set(worker_metrics.total_overflow_count as f64);
gauge!(format!("{}max_overflow_count", TOKIO_PREFIX))
.set(worker_metrics.max_overflow_count as f64);
gauge!(format!("{}min_overflow_count", TOKIO_PREFIX),)
.set(worker_metrics.min_overflow_count as f64);
gauge!(format!("{}total_polls_count", TOKIO_PREFIX))
.set(worker_metrics.total_polls_count as f64);
gauge!(format!("{}max_polls_count", TOKIO_PREFIX)).set(worker_metrics.max_polls_count as f64);
gauge!(format!("{}min_polls_count", TOKIO_PREFIX)).set(worker_metrics.min_polls_count as f64);
gauge!(format!("{}total_busy_duration", TOKIO_PREFIX))
.set(worker_metrics.total_busy_duration.as_secs_f64());
gauge!(format!("{}max_busy_duration", TOKIO_PREFIX))
.set(worker_metrics.max_busy_duration.as_secs_f64());
gauge!(format!("{}min_busy_duration", TOKIO_PREFIX))
.set(worker_metrics.min_busy_duration.as_secs_f64());
gauge!(format!("{}injection_queue_depth", TOKIO_PREFIX))
.set(worker_metrics.injection_queue_depth as f64);
gauge!(format!("{}total_local_queue_depth", TOKIO_PREFIX))
.set(worker_metrics.total_local_queue_depth as f64);
gauge!(format!("{}max_local_queue_depth", TOKIO_PREFIX))
.set(worker_metrics.max_local_queue_depth as f64);
gauge!(format!("{}min_local_queue_depth", TOKIO_PREFIX))
.set(worker_metrics.min_local_queue_depth as f64);
gauge!(format!("{}budget_forced_yield_count", TOKIO_PREFIX))
.set(worker_metrics.budget_forced_yield_count as f64);
gauge!(format!("{}io_driver_ready_count", TOKIO_PREFIX))
.set(worker_metrics.io_driver_ready_count as f64);
log_rm_metric!(tm, rm, num_workers);
log_rm_metric!(tm, rm, num_blocking_threads);
log_rm_metric!(tm, rm, num_alive_tasks);
log_rm_metric!(tm, rm, num_idle_blocking_threads);
log_rm_metric!(tm, rm, num_idle_blocking_threads);
log_rm_metric!(tm, rm, blocking_queue_depth);

log_wm_metric!(tm, wm, total_park_count);
log_wm_metric!(tm, wm, max_park_count);
log_wm_metric!(tm, wm, min_park_count);

log_wm_metric!(tm, wm, mean_poll_duration, as_secs_f64);
log_wm_metric!(tm, wm, mean_poll_duration_worker_min, as_secs_f64);
log_wm_metric!(tm, wm, mean_poll_duration_worker_max, as_secs_f64);

log_wm_metric!(tm, wm, total_noop_count);
log_wm_metric!(tm, wm, max_noop_count);
log_wm_metric!(tm, wm, min_noop_count);

log_wm_metric!(tm, wm, total_steal_count);
log_wm_metric!(tm, wm, max_steal_count);
log_wm_metric!(tm, wm, min_steal_count);

log_wm_metric!(tm, wm, total_steal_operations);
log_wm_metric!(tm, wm, max_steal_operations);
log_wm_metric!(tm, wm, min_steal_operations);

log_wm_metric!(tm, wm, num_remote_schedules);
log_wm_metric!(tm, wm, total_local_schedule_count);

log_wm_metric!(tm, wm, max_local_schedule_count);
log_wm_metric!(tm, wm, min_local_schedule_count);
log_wm_metric!(tm, wm, total_overflow_count);

log_wm_metric!(tm, wm, max_overflow_count);
log_wm_metric!(tm, wm, min_overflow_count);
log_wm_metric!(tm, wm, total_polls_count);

log_wm_metric!(tm, wm, max_polls_count);
log_wm_metric!(tm, wm, min_polls_count);

log_wm_metric!(tm, wm, total_busy_duration, as_secs_f64);
log_wm_metric!(tm, wm, max_busy_duration, as_secs_f64);
log_wm_metric!(tm, wm, min_busy_duration, as_secs_f64);

log_wm_metric!(tm, wm, injection_queue_depth);

log_wm_metric!(tm, wm, total_local_queue_depth);
log_wm_metric!(tm, wm, max_local_queue_depth);
log_wm_metric!(tm, wm, min_local_queue_depth);

log_wm_metric!(tm, wm, budget_forced_yield_count);
log_wm_metric!(tm, wm, io_driver_ready_count);
}
1 change: 1 addition & 0 deletions crates/builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ serde_json.workspace = true
strum.workspace = true

mockall = {workspace = true, optional = true }
metrics-derive.workspace = true

[dev-dependencies]
mockall.workspace = true
Expand Down
Loading
Loading