Skip to content

Commit

Permalink
feat(metrics): add tokio runtime metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Oct 18, 2023
1 parent dc7a26c commit 8afb087
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 61 deletions.
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
rustdocflags = ["--cfg", "tokio_unstable"]
38 changes: 30 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/rundler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ serde.workspace = true
serde_json.workspace = true
sscanf = "0.4.0"
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "sync"] }
tokio-metrics = "0.3.1"
tokio-rustls = "0.24.1"
tokio-util = "0.7.8"
tracing.workspace = true
Expand Down
226 changes: 226 additions & 0 deletions bin/rundler/src/cli/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// This file is part of Rundler.
//
// Rundler is free software: you can redistribute it and/or modify it under the
// terms of the GNU Lesser General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later version.
//
// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::{net::SocketAddr, time::Duration};

use itertools::Itertools;
use metrics::gauge;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_process::Collector;
use metrics_util::layers::{PrefixLayer, Stack};

pub fn initialize<'a>(
listen_addr: SocketAddr,
tags: impl IntoIterator<Item = &'a String>,
) -> anyhow::Result<()> {
let mut builder = PrometheusBuilder::new().with_http_listener(listen_addr);

let tags: Vec<(&str, &str)> = tags
.into_iter()
.filter_map(|t| t.split('=').collect_tuple())
.collect();
for (k, v) in tags {
builder = builder.add_global_label(k, v);
}

let (recorder, exporter) = builder.build()?;
tokio::spawn(exporter);
Stack::new(recorder)
.push(PrefixLayer::new("rundler"))
.install()?;

tokio::spawn(async {
let collector = Collector::default();
loop {
collector.collect();
tokio::time::sleep(Duration::from_secs(1)).await;
}
});

let handle = tokio::runtime::Handle::current();
let frequency = std::time::Duration::from_millis(500);
let runtime_metrics = handle.metrics();
let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
tokio::spawn(async move {
for metrics in runtime_monitor.intervals() {
collect_tokio(&runtime_metrics, metrics);
tokio::time::sleep(frequency).await;
}
});

Ok(())
}

const TOKIO_PREFIX: &str = "tokio_rt_";

fn collect_tokio(
runtime_metrics: &tokio::runtime::RuntimeMetrics,
worker_metrics: tokio_metrics::RuntimeMetrics,
) {
gauge!(
format!("{}num_workers", TOKIO_PREFIX),
runtime_metrics.num_workers() as f64
);
gauge!(
format!("{}num_blocking_threads", TOKIO_PREFIX),
runtime_metrics.num_blocking_threads() as f64
);
gauge!(
format!("{}active_tasks_count", TOKIO_PREFIX),
runtime_metrics.active_tasks_count() as f64
);
gauge!(
format!("{}num_idle_blocking_threads", TOKIO_PREFIX),
runtime_metrics.num_idle_blocking_threads() as f64
);
gauge!(
format!("{}blocking_queue_depth", TOKIO_PREFIX),
runtime_metrics.blocking_queue_depth() as f64
);
gauge!(
format!("{}total_park_count", TOKIO_PREFIX),
worker_metrics.total_park_count as f64
);
gauge!(
format!("{}max_park_count", TOKIO_PREFIX),
worker_metrics.max_park_count as f64
);
gauge!(
format!("{}min_park_count", TOKIO_PREFIX),
worker_metrics.min_park_count as f64
);
gauge!(
format!("{}mean_poll_duration", TOKIO_PREFIX),
worker_metrics.mean_poll_duration.as_secs_f64()
);
gauge!(
format!("{}mean_poll_duration_worker_min", TOKIO_PREFIX),
worker_metrics.mean_poll_duration_worker_min.as_secs_f64()
);
gauge!(
format!("{}mean_poll_duration_worker_max", TOKIO_PREFIX),
worker_metrics.mean_poll_duration_worker_max.as_secs_f64()
);
gauge!(
format!("{}total_noop_count", TOKIO_PREFIX),
worker_metrics.total_noop_count as f64,
);
gauge!(
format!("{}max_noop_count", TOKIO_PREFIX),
worker_metrics.max_noop_count as f64,
);
gauge!(
format!("{}min_noop_count", TOKIO_PREFIX),
worker_metrics.min_noop_count as f64,
);
gauge!(
format!("{}total_steal_count", TOKIO_PREFIX),
worker_metrics.total_steal_count as f64,
);
gauge!(
format!("{}max_steal_count", TOKIO_PREFIX),
worker_metrics.max_steal_count as f64,
);
gauge!(
format!("{}min_steal_count", TOKIO_PREFIX),
worker_metrics.min_steal_count as f64,
);
gauge!(
format!("{}total_steal_operations", TOKIO_PREFIX),
worker_metrics.total_steal_operations as f64,
);
gauge!(
format!("{}max_steal_operations", TOKIO_PREFIX),
worker_metrics.max_steal_operations as f64,
);
gauge!(
format!("{}min_steal_operations", TOKIO_PREFIX),
worker_metrics.min_steal_operations as f64,
);
gauge!(
format!("{}num_remote_schedules", TOKIO_PREFIX),
worker_metrics.num_remote_schedules as f64,
);
gauge!(
format!("{}total_local_schedule_count", TOKIO_PREFIX),
worker_metrics.total_local_schedule_count as f64,
);
gauge!(
format!("{}max_local_schedule_count", TOKIO_PREFIX),
worker_metrics.max_local_schedule_count as f64,
);
gauge!(
format!("{}min_local_schedule_count", TOKIO_PREFIX),
worker_metrics.min_local_schedule_count as f64,
);
gauge!(
format!("{}total_overflow_count", TOKIO_PREFIX),
worker_metrics.total_overflow_count as f64,
);
gauge!(
format!("{}max_overflow_count", TOKIO_PREFIX),
worker_metrics.max_overflow_count as f64,
);
gauge!(
format!("{}min_overflow_count", TOKIO_PREFIX),
worker_metrics.min_overflow_count as f64,
);
gauge!(
format!("{}total_polls_count", TOKIO_PREFIX),
worker_metrics.total_polls_count as f64,
);
gauge!(
format!("{}max_polls_count", TOKIO_PREFIX),
worker_metrics.max_polls_count as f64,
);
gauge!(
format!("{}min_polls_count", TOKIO_PREFIX),
worker_metrics.min_polls_count as f64,
);
gauge!(
format!("{}total_busy_duration", TOKIO_PREFIX),
worker_metrics.total_busy_duration.as_secs_f64(),
);
gauge!(
format!("{}max_busy_duration", TOKIO_PREFIX),
worker_metrics.max_busy_duration.as_secs_f64(),
);
gauge!(
format!("{}min_busy_duration", TOKIO_PREFIX),
worker_metrics.min_busy_duration.as_secs_f64(),
);
gauge!(
format!("{}injection_queue_depth", TOKIO_PREFIX),
worker_metrics.injection_queue_depth as f64,
);
gauge!(
format!("{}total_local_queue_depth", TOKIO_PREFIX),
worker_metrics.total_local_queue_depth as f64,
);
gauge!(
format!("{}max_local_queue_depth", TOKIO_PREFIX),
worker_metrics.max_local_queue_depth as f64,
);
gauge!(
format!("{}min_local_queue_depth", TOKIO_PREFIX),
worker_metrics.min_local_queue_depth as f64,
);
gauge!(
format!("{}budget_forced_yield_count", TOKIO_PREFIX),
worker_metrics.budget_forced_yield_count as f64,
);
gauge!(
format!("{}io_driver_ready_count", TOKIO_PREFIX),
worker_metrics.io_driver_ready_count as f64,
);
}
5 changes: 2 additions & 3 deletions bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use clap::{builder::PossibleValuesParser, Args, Parser, Subcommand};

mod builder;
mod json;
mod metrics;
mod node;
mod pool;
mod prometheus_exporter;
mod rpc;
mod tracing;

Expand All @@ -41,8 +41,7 @@ pub async fn run() -> anyhow::Result<()> {
tracing::info!("Parsed CLI options: {:#?}", opt);

let metrics_addr = format!("{}:{}", opt.metrics.host, opt.metrics.port).parse()?;
prometheus_exporter::initialize(metrics_addr, &opt.metrics.tags)
.context("metrics server should start")?;
metrics::initialize(metrics_addr, &opt.metrics.tags).context("metrics server should start")?;

match opt.command {
Command::Node(args) => node::run(args, opt.common).await?,
Expand Down
Loading

0 comments on commit 8afb087

Please sign in to comment.