From f911795509cb2cb57fbb36a01e9a0318a745fc01 Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Tue, 8 Oct 2024 20:11:35 -0400 Subject: [PATCH 1/4] feat(metric): switch to metrics-derive library. --- Cargo.lock | 15 ++ bin/rundler/src/cli/metrics.rs | 140 +++++++++++++++++- crates/builder/Cargo.toml | 1 + crates/builder/src/bundle_sender.rs | 171 +++++++++------------- crates/builder/src/signer/mod.rs | 13 +- crates/builder/src/transaction_tracker.rs | 74 +++++----- crates/pool/Cargo.toml | 1 + crates/pool/src/chain.rs | 53 ++++--- crates/pool/src/mempool/pool.rs | 84 +++++------ crates/pool/src/mempool/uo_pool.rs | 98 +++++++------ crates/rpc/Cargo.toml | 1 + crates/rpc/src/utils.rs | 12 +- 12 files changed, 404 insertions(+), 259 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 78fe8c62f..c7fa38e64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3268,6 +3268,18 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "metrics-derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3dbdd96ed57d565ec744cba02862d707acf373c5772d152abae6ec5c4e24f6c" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.77", +] + [[package]] name = "metrics-exporter-prometheus" version = "0.15.3" @@ -4449,6 +4461,7 @@ dependencies = [ "jsonrpsee", "linked-hash-map", "metrics", + "metrics-derive", "mockall", "num-traits", "parse-display", @@ -4500,6 +4513,7 @@ dependencies = [ "futures-util", "itertools 0.13.0", "metrics", + "metrics-derive", "mockall", "parking_lot", "prost", @@ -4572,6 +4586,7 @@ dependencies = [ "http 1.1.0", "jsonrpsee", "metrics", + "metrics-derive", "mockall", "rundler-contracts", "rundler-provider", diff --git a/bin/rundler/src/cli/metrics.rs b/bin/rundler/src/cli/metrics.rs index 025eda778..460b3e322 100644 --- a/bin/rundler/src/cli/metrics.rs +++ b/bin/rundler/src/cli/metrics.rs @@ -79,7 +79,145 @@ pub fn initialize<'a, T: TaskSpawner>( Ok(()) } -const TOKIO_PREFIX: &str = "tokio_rt_"; +#[allow(dead_code)] +#[derive(Metrics)] +#[metrics(scope = "rundler_tokio_rt")] +struct TokioMetrics { + #[metric(describe = "the total number of tokio wokers.")] + num_workers: Gauge, + #[metric(describe = "the number of blocking threads.")] + num_blocking_threads: Gauge, + #[metric( + rename = "active_tasks_count", + describe = "the number of active threads." + )] + num_alive_tasks: Gauge, + #[metric(describe = "the number of idle threads.")] + num_idle_blocking_threads: Gauge, + #[metric(describe = "the number of tasks currently scheduled in the blocking thread pool.")] + blocking_queue_depth: Gauge, + #[metric(describe = "the number of times worker threads parked.")] + total_park_count: Gauge, + #[metric(describe = "the maximum number of times any worker thread parked.")] + max_park_count: Gauge, + #[metric(describe = "the minimum number of times any worker thread parked.")] + min_park_count: Gauge, + #[metric(describe = "the average duration of a single invocation of poll on a task.")] + mean_poll_duration: Gauge, + #[metric( + describe = "the average duration of a single invocation of poll on a task on the worker with the lowest value." + )] + mean_poll_duration_worker_min: Gauge, + #[metric( + describe = "the average duration of a single invocation of poll on a task on the worker with the highest value." + )] + mean_poll_duration_worker_max: Gauge, + + #[metric( + describe = "the number of times worker threads unparked but performed no work before parking again." + )] + total_noop_count: Gauge, + #[metric( + describe = "the maximum number of times any worker thread unparked but performed no work before parking again." + )] + max_noop_count: Gauge, + #[metric( + describe = "the minimum number of times any worker thread unparked but performed no work before parking again." + )] + min_noop_count: Gauge, + + #[metric(describe = "the number of tasks worker threads stole from another worker thread.")] + total_steal_count: Gauge, + #[metric( + describe = "the maximum number of times any worker thread unparked but performed no work before parking again." + )] + max_steal_count: Gauge, + #[metric( + describe = "the minimum number of times any worker thread unparked but performed no work before parking again." + )] + min_steal_count: Gauge, + + #[metric( + describe = "the number of times worker threads stole tasks from another worker thread." + )] + total_steal_operations: Gauge, + #[metric( + describe = "the maximum number of any worker thread stole tasks from another worker thread." + )] + max_steal_operations: Gauge, + #[metric( + describe = "the maximum number of any worker thread stole tasks from another worker thread." + )] + min_steal_operations: Gauge, + + #[metric(describe = "the number of tasks scheduled from outside of the runtime.")] + num_remote_schedules: Gauge, + + #[metric(describe = "the number of tasks scheduled from worker threads.")] + total_local_schedule_count: Gauge, + #[metric(describe = "the maximum number of tasks scheduled from any one worker thread.")] + max_local_schedule_count: Gauge, + #[metric(describe = "the minimum number of tasks scheduled from any one worker thread.")] + min_local_schedule_count: Gauge, + + #[metric(describe = "the number of times worker threads saturated their local queues.")] + total_overflow_count: Gauge, + #[metric(describe = "the maximum number of times any one worker saturated its local queue.")] + max_overflow_count: Gauge, + #[metric(describe = "the minimum number of times any one worker saturated its local queue.")] + min_overflow_count: Gauge, + + #[metric(describe = "the number of tasks that have been polled across all worker threads.")] + total_polls_count: Gauge, + #[metric(describe = "the maximum number of tasks that have been polled in any worker thread.")] + max_polls_count: Gauge, + #[metric(describe = "the minimum number of tasks that have been polled in any worker thread.")] + min_polls_count: Gauge, + + #[metric(describe = "the amount of time worker threads were busy.")] + total_busy_duration: Gauge, + #[metric(describe = "the maximum amount of time a worker thread was busy.")] + max_busy_duration: Gauge, + #[metric(describe = "the minimum amount of time a worker thread was busy.")] + min_busy_duration: Gauge, + + #[metric( + describe = "the number of tasks currently scheduled in the runtime's injection queue." + )] + injection_queue_depth: Gauge, + #[metric(describe = "the total number of tasks currently scheduled in workers' local queues.")] + total_local_queue_depth: Gauge, + #[metric( + describe = "the maximum number of tasks currently scheduled any worker's local queue." + )] + max_local_queue_depth: Gauge, + #[metric( + describe = "the minimum number of tasks currently scheduled any worker's local queue." + )] + min_local_queue_depth: Gauge, + + #[metric( + describe = "the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets." + )] + budget_forced_yield_count: Gauge, + #[metric(describe = "the number of ready events processed by the runtime’s I/O driver.")] + io_driver_ready_count: Gauge, +} + +macro_rules! log_rm_metric { + ($tm:ident, $rm:ident, $metric_name:ident) => { + $tm.$metric_name.set($rm.$metric_name() as f64); + }; +} + +macro_rules! log_wm_metric { + ($tm:ident, $wm:ident, $metric_name:ident) => { + $tm.$metric_name.set($wm.$metric_name as f64); + }; + ($tm:ident, $wm:ident, $metric_name:ident, $converter:ident) => { + $tm.$metric_name.set($wm.$metric_name.$converter() as f64); + }; +} fn collect_tokio( runtime_metrics: &tokio::runtime::RuntimeMetrics, diff --git a/crates/builder/Cargo.toml b/crates/builder/Cargo.toml index 7c3397867..dea86378d 100644 --- a/crates/builder/Cargo.toml +++ b/crates/builder/Cargo.toml @@ -51,6 +51,7 @@ serde_json.workspace = true strum.workspace = true mockall = {workspace = true, optional = true } +metrics-derive = "0.1.0" [dev-dependencies] mockall.workspace = true diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 0193d595d..d3a26beba 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -18,6 +18,8 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use futures::Stream; use futures_util::StreamExt; +use metrics::Counter; +use metrics_derive::Metrics; #[cfg(test)] use mockall::automock; use rundler_provider::{BundleHandler, EntryPoint, TransactionRequest}; @@ -70,7 +72,7 @@ pub(crate) struct BundleSenderImpl { pool: C, settings: Settings, event_sender: broadcast::Sender>, - metrics: BuilderMetrics, + metrics: BuilderMetric, _uo_type: PhantomData, } @@ -154,7 +156,7 @@ where loop { if let Err(e) = self.step_state(&mut state).await { error!("Error in bundle sender loop: {e:#?}"); - self.metrics.increment_state_machine_errors(); + self.metrics.state_machine_errors.increment(1_u64); state.reset(); } } @@ -192,10 +194,10 @@ where pool, settings, event_sender, - metrics: BuilderMetrics { - builder_index, - entry_point: *entry_point.address(), - }, + metrics: BuilderMetric::new_with_labels(&[ + ("entry_point", entry_point.address().to_string()), + ("builder_index", builder_index.to_string()), + ]), entry_point, _uo_type: PhantomData, } @@ -276,7 +278,7 @@ where "Abandoning bundle after {} fee increases, no operations available after fee increase", inner.fee_increase_count ); - self.metrics.increment_bundle_txns_abandoned(); + self.metrics.bundle_txns_abandoned.increment(1_u64); // abandon the bundle by starting a new bundle process // If the node we are using still has the transaction in the mempool, its @@ -309,7 +311,7 @@ where } Err(error) => { error!("Bundle send error {error:?}"); - self.metrics.increment_bundle_txns_failed(); + self.metrics.bundle_txns_failed.increment(1_u64); state.bundle_error(error); } } @@ -335,6 +337,7 @@ where .. } => { info!("Bundle transaction mined"); + self.metrics.process_bundle_txn_success(gas_limit, gas_used); self.emit(BuilderEvent::transaction_mined( self.builder_index, @@ -350,7 +353,7 @@ where self.builder_index, nonce, )); - self.metrics.increment_bundle_txns_dropped(); + self.metrics.bundle_txns_dropped.increment(1_u64); // try again, increasing fees state.update(InnerState::Building(inner.to_building())); } @@ -360,7 +363,7 @@ where self.builder_index, nonce, )); - self.metrics.increment_bundle_txns_nonce_used(); + self.metrics.bundle_txns_nonce_used.increment(1_u64); state.reset(); } } @@ -373,7 +376,7 @@ where self.settings.max_blocks_to_wait_for_mine, inner.fee_increase_count + 1 ); - self.metrics.increment_bundle_txn_fee_increases(); + self.metrics.bundle_txn_fee_increases.increment(1_u64); state.update(InnerState::Building(inner.to_building())) } @@ -404,7 +407,7 @@ where match cancel_res { Ok(Some(_)) => { info!("Cancellation transaction sent, waiting for confirmation"); - self.metrics.increment_cancellation_txns_sent(); + self.metrics.cancellation_txns_sent.increment(1_u64); state.update(InnerState::CancelPending(inner.to_cancel_pending( state.block_number() + self.settings.max_blocks_to_wait_for_mine, @@ -412,7 +415,7 @@ where } Ok(None) => { info!("Soft cancellation or no transaction to cancel, starting new bundle attempt"); - self.metrics.increment_soft_cancellations(); + self.metrics.soft_cancellations.increment(1_u64); state.reset(); } Err(TransactionTrackerError::ReplacementUnderpriced) => { @@ -420,7 +423,7 @@ where if inner.fee_increase_count >= self.settings.max_cancellation_fee_increases { // abandon the cancellation warn!("Abandoning cancellation after max fee increases {}, starting new bundle attempt", inner.fee_increase_count); - self.metrics.increment_cancellations_abandoned(); + self.metrics.cancellations_abandoned.increment(1_u64); state.reset(); } else { // Increase fees again @@ -438,7 +441,7 @@ where } Err(e) => { error!("Failed to cancel transaction, moving back to building state: {e:#?}"); - self.metrics.increment_cancellation_txns_failed(); + self.metrics.cancellation_txns_failed.increment(1_u64); state.reset(); } } @@ -463,9 +466,11 @@ where // mined let fee = gas_used.zip(gas_price).map(|(used, price)| used * price); info!("Cancellation transaction mined. Price (wei) {fee:?}"); - self.metrics.increment_cancellation_txns_mined(); + self.metrics.cancellation_txns_mined.increment(1_u64); if let Some(fee) = fee { - self.metrics.increment_cancellation_txns_total_fee(fee); + self.metrics + .cancellation_txns_total_fee + .increment(fee as u64); }; } TrackerUpdate::LatestTxDropped { .. } => { @@ -484,7 +489,7 @@ where if inner.fee_increase_count >= self.settings.max_cancellation_fee_increases { // abandon the cancellation warn!("Abandoning cancellation after max fee increases {}, starting new bundle attempt", inner.fee_increase_count); - self.metrics.increment_cancellations_abandoned(); + self.metrics.cancellations_abandoned.increment(1_u64); state.reset(); } else { // start replacement, don't wait for trigger @@ -544,7 +549,7 @@ where op_hashes, } = bundle_tx; - self.metrics.increment_bundle_txns_sent(); + self.metrics.bundle_txns_sent.increment(1_u64); let send_result = state .transaction_tracker @@ -568,17 +573,17 @@ where Ok(SendBundleAttemptResult::Success) } Err(TransactionTrackerError::NonceTooLow) => { - self.metrics.increment_bundle_txn_nonce_too_low(); + self.metrics.bundle_txn_nonce_too_low.increment(1_u64); warn!("Bundle attempt nonce too low"); Ok(SendBundleAttemptResult::NonceTooLow) } Err(TransactionTrackerError::ReplacementUnderpriced) => { - self.metrics.increment_bundle_txn_replacement_underpriced(); + self.metrics.bundle_replacement_underpriced.increment(1_u64); warn!("Bundle attempt replacement transaction underpriced"); Ok(SendBundleAttemptResult::ReplacementUnderpriced) } Err(TransactionTrackerError::ConditionNotMet) => { - self.metrics.increment_bundle_txn_condition_not_met(); + self.metrics.bundle_txn_condition_not_met.increment(1_u64); warn!("Bundle attempt condition not met"); Ok(SendBundleAttemptResult::ConditionNotMet) } @@ -1127,90 +1132,60 @@ impl BundleSenderTrigger { } } -#[derive(Debug, Clone)] -struct BuilderMetrics { - builder_index: u64, - entry_point: Address, +#[derive(Metrics)] +#[metrics(scope = "builder")] +struct BuilderMetric { + #[metric(describe = "the number of bundle transactions already sent.")] + bundle_txns_sent: Counter, + #[metric(describe = "the number of bundle transactions successed.")] + bundle_txns_success: Counter, + #[metric(describe = "the number of bundle gas limit.")] + bundle_gas_limit: Counter, + #[metric(describe = "the number of bundle gas used.")] + bundle_gas_used: Counter, + #[metric(describe = "the number of dropped bundle transactions.")] + bundle_txns_dropped: Counter, + #[metric(describe = "the number of anabdoned bundle transactions.")] + bundle_txns_abandoned: Counter, + #[metric(describe = "the number of failed bundle transactions.")] + bundle_txns_failed: Counter, + #[metric(describe = "the number of bundle transactin nonce used event.")] + bundle_txns_nonce_used: Counter, + #[metric(describe = "the number of bundle transactions fee increase event.")] + bundle_txn_fee_increases: Counter, + #[metric(describe = "the number of bundle transactions underprice replaced event.")] + bundle_replacement_underpriced: Counter, + #[metric(describe = "the number of bundle transactions nonce too low event.")] + bundle_txn_nonce_too_low: Counter, + #[metric(describe = "the number of bundle transactions condition not met event.")] + bundle_txn_condition_not_met: Counter, + #[metric(describe = "the number of cancellation bundle transactions sent event.")] + cancellation_txns_sent: Counter, + #[metric(describe = "the number of cancellation bundle transactions mined event.")] + cancellation_txns_mined: Counter, + #[metric(describe = "the total fee of cancellation bundle transactions.")] + cancellation_txns_total_fee: Counter, + #[metric(describe = "the number of cancellation bundle transactions abandon event.")] + cancellations_abandoned: Counter, + #[metric(describe = "the number of soft cancellation bundle transactions event.")] + soft_cancellations: Counter, + #[metric(describe = "the number of cancellation bundle transactions failed event.")] + cancellation_txns_failed: Counter, + #[metric(describe = "the number of state machine errors.")] + state_machine_errors: Counter, } -impl BuilderMetrics { - fn increment_bundle_txns_sent(&self) { - metrics::counter!("builder_bundle_txns_sent", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()) - .increment(1); - } - +impl BuilderMetric { fn process_bundle_txn_success(&self, gas_limit: Option, gas_used: Option) { - metrics::counter!("builder_bundle_txns_success", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - + self.bundle_txns_success.increment(1_u64); if let Some(limit) = gas_limit { - metrics::counter!("builder_bundle_gas_limit", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(limit); + self.bundle_gas_limit.increment(limit); } if let Some(used) = gas_used { - metrics::counter!("builder_bundle_gas_used", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(used.try_into().unwrap_or(u64::MAX)); + self.bundle_gas_used + .increment(used.try_into().unwrap_or(u64::MAX)); } } - - fn increment_bundle_txns_dropped(&self) { - metrics::counter!("builder_bundle_txns_dropped", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - // used when we decide to stop trying a transaction - fn increment_bundle_txns_abandoned(&self) { - metrics::counter!("builder_bundle_txns_abandoned", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - // used when sending a transaction fails - fn increment_bundle_txns_failed(&self) { - metrics::counter!("builder_bundle_txns_failed", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_bundle_txns_nonce_used(&self) { - metrics::counter!("builder_bundle_txns_nonce_used", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_bundle_txn_fee_increases(&self) { - metrics::counter!("builder_bundle_fee_increases", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_bundle_txn_replacement_underpriced(&self) { - metrics::counter!("builder_bundle_replacement_underpriced", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_bundle_txn_nonce_too_low(&self) { - metrics::counter!("builder_bundle_nonce_too_low", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_bundle_txn_condition_not_met(&self) { - metrics::counter!("builder_bundle_condition_not_met", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_cancellation_txns_sent(&self) { - metrics::counter!("builder_cancellation_txns_sent", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_cancellation_txns_mined(&self) { - metrics::counter!("builder_cancellation_txns_mined", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_cancellation_txns_total_fee(&self, fee: u128) { - metrics::counter!("builder_cancellation_txns_total_fee", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(fee.try_into().unwrap_or(u64::MAX)); - } - - fn increment_cancellations_abandoned(&self) { - metrics::counter!("builder_cancellations_abandoned", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_soft_cancellations(&self) { - metrics::counter!("builder_soft_cancellations", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_cancellation_txns_failed(&self) { - metrics::counter!("builder_cancellation_txns_failed", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } - - fn increment_state_machine_errors(&self) { - metrics::counter!("builder_state_machine_errors", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); - } } #[cfg(test)] diff --git a/crates/builder/src/signer/mod.rs b/crates/builder/src/signer/mod.rs index ebe3be0a1..ef861250e 100644 --- a/crates/builder/src/signer/mod.rs +++ b/crates/builder/src/signer/mod.rs @@ -21,6 +21,8 @@ use alloy_signer::{Signature, Signer as _}; use anyhow::{bail, Context}; pub(crate) use aws::*; pub(crate) use local::*; +use metrics::Gauge; +use metrics_derive::Metrics; use num_traits::cast::ToPrimitive; use rundler_provider::{EvmProvider, TransactionRequest}; @@ -63,14 +65,21 @@ pub(crate) trait Signer: Send + Sync { } } +#[derive(Metrics)] +#[metrics(scope = "bundle_builder")] +struct BuilderMetric { + #[metric(describe = "the balance of bundler builder.")] + account_balance: Gauge, +} + pub(crate) async fn monitor_account_balance(addr: Address, provider: P) { loop { match provider.get_balance(addr, None).await { Ok(balance) => { let eth_balance = balance.to_f64().unwrap_or_default() / 1e18; tracing::info!("account {addr:?} balance: {}", eth_balance); - metrics::gauge!("bundle_builder_account_balance", "addr" => format!("{addr:?}")) - .set(eth_balance); + let metric = BuilderMetric::new_with_labels(&[("addr", format!("{addr:?}"))]); + metric.account_balance.set(eth_balance); } Err(err) => { tracing::error!("Get account {addr:?} balance error {err:?}"); diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index 4c1214e8f..49d39d2ae 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -14,6 +14,8 @@ use alloy_primitives::{Address, B256}; use anyhow::{bail, Context}; use async_trait::async_trait; +use metrics::Gauge; +use metrics_derive::Metrics; #[cfg(test)] use mockall::automock; use rundler_provider::{EvmProvider, TransactionRequest}; @@ -119,11 +121,11 @@ pub(crate) struct TransactionTrackerImpl { provider: P, sender: T, settings: Settings, - builder_index: u64, nonce: u64, transactions: Vec, has_abandoned: bool, attempt_count: u64, + metrics: TransactionTrackerMetrics, } #[derive(Clone, Copy, Debug)] @@ -157,11 +159,14 @@ where provider, sender, settings, - builder_index, nonce, transactions: vec![], has_abandoned: false, attempt_count: 0, + metrics: TransactionTrackerMetrics::new_with_labels(&[( + "builder_index", + builder_index.to_string(), + )]), }) } @@ -203,16 +208,27 @@ where } fn update_metrics(&self) { - TransactionTrackerMetrics::set_num_pending_transactions( - self.builder_index, - self.transactions.len(), - ); - TransactionTrackerMetrics::set_nonce(self.builder_index, self.nonce); - TransactionTrackerMetrics::set_attempt_count(self.builder_index, self.attempt_count); + self.metrics + .num_pending_transactions + .set(self.transactions.len() as f64); + self.metrics.nonce.set(self.nonce as f64); + self.metrics.attempt_count.set(self.attempt_count as f64); + if let Some(tx) = self.transactions.last() { - TransactionTrackerMetrics::set_current_fees(self.builder_index, Some(tx.gas_fees)); + self.metrics + .current_max_fee_per_gas + .set(tx.gas_fees.max_fee_per_gas as f64); + self.metrics + .max_priority_fee_per_gas + .set(tx.gas_fees.max_priority_fee_per_gas as f64); } else { - TransactionTrackerMetrics::set_current_fees(self.builder_index, None); + let fee = GasFees::default(); + self.metrics + .current_max_fee_per_gas + .set(fee.max_fee_per_gas as f64); + self.metrics + .max_priority_fee_per_gas + .set(fee.max_priority_fee_per_gas as f64); } } @@ -474,31 +490,19 @@ impl From for TransactionTrackerError { } } -struct TransactionTrackerMetrics {} - -impl TransactionTrackerMetrics { - fn set_num_pending_transactions(builder_index: u64, num_pending_transactions: usize) { - metrics::gauge!("builder_tracker_num_pending_transactions", "builder_index" => builder_index.to_string()) - .set(num_pending_transactions as f64); - } - - fn set_nonce(builder_index: u64, nonce: u64) { - metrics::gauge!("builder_tracker_nonce", "builder_index" => builder_index.to_string()) - .set(nonce as f64); - } - - fn set_attempt_count(builder_index: u64, attempt_count: u64) { - metrics::gauge!("builder_tracker_attempt_count", "builder_index" => builder_index.to_string()).set(attempt_count as f64); - } - - fn set_current_fees(builder_index: u64, current_fees: Option) { - let fees = current_fees.unwrap_or_default(); - - metrics::gauge!("builder_tracker_current_max_fee_per_gas", "builder_index" => builder_index.to_string()) - .set(fees.max_fee_per_gas as f64); - metrics::gauge!("builder_tracker_current_max_priority_fee_per_gas", "builder_index" => builder_index.to_string()) - .set(fees.max_priority_fee_per_gas as f64); - } +#[derive(Metrics)] +#[metrics(scope = "builder_tracker_")] +struct TransactionTrackerMetrics { + #[metric(describe = "the number of pending transactions.")] + num_pending_transactions: Gauge, + #[metric(describe = "the nonce of transaction.")] + nonce: Gauge, + #[metric(describe = "the number of pending transactions.")] + attempt_count: Gauge, + #[metric(describe = "the maximum fee per gas of current transaction.")] + current_max_fee_per_gas: Gauge, + #[metric(describe = "the maximum priority fee per gas of current transaction.")] + max_priority_fee_per_gas: Gauge, } #[cfg(test)] diff --git a/crates/pool/Cargo.toml b/crates/pool/Cargo.toml index 2fcbdf048..a91e01f5a 100644 --- a/crates/pool/Cargo.toml +++ b/crates/pool/Cargo.toml @@ -40,6 +40,7 @@ tracing.workspace = true url.workspace = true mockall = {workspace = true, optional = true } +metrics-derive = "0.1.0" [dev-dependencies] mockall.workspace = true diff --git a/crates/pool/src/chain.rs b/crates/pool/src/chain.rs index 5185fa207..9068ad99f 100644 --- a/crates/pool/src/chain.rs +++ b/crates/pool/src/chain.rs @@ -21,6 +21,8 @@ use alloy_primitives::{Address, B256, U256}; use alloy_sol_types::SolEvent; use anyhow::{ensure, Context}; use futures::future; +use metrics::{Counter, Gauge}; +use metrics_derive::Metrics; use rundler_contracts::{ v0_6::IEntryPoint::{ Deposited as DepositedV06, UserOperationEvent as UserOperationEventV06, @@ -57,8 +59,10 @@ pub(crate) struct Chain { blocks: VecDeque, /// Semaphore to limit the number of concurrent `eth_getLogs` calls. load_ops_semaphore: Semaphore, - /// Filter template + /// Filter template. filter_template: Filter, + /// Metrics of chain events. + metrics: ChainMetrics, } #[derive(Default, Debug, Eq, PartialEq)] @@ -168,6 +172,7 @@ impl Chain

{ blocks: VecDeque::new(), load_ops_semaphore: Semaphore::new(MAX_LOAD_OPS_CONCURRENCY), filter_template, + metrics: ChainMetrics::default(), } } @@ -206,7 +211,7 @@ impl Chain

{ for i in 0..=self.settings.max_sync_retries { if i > 0 { - ChainMetrics::increment_sync_retries(); + self.metrics.sync_retries.increment(1_u64); } let update = self.sync_to_block(block.clone()).await; @@ -224,7 +229,7 @@ impl Chain

{ "Failed to update chain at block {:?} after {} retries. Abandoning sync.", block_hash, self.settings.max_sync_retries ); - ChainMetrics::increment_sync_abandoned(); + self.metrics.sync_abandoned.increment(1_u64); } } @@ -326,10 +331,10 @@ impl Chain

{ self.blocks.pop_front(); } - ChainMetrics::set_block_height(current_block_number); + self.metrics.block_height.set(current_block_number as f64); if reorg_depth > 0 { - ChainMetrics::increment_reorgs_detected(); - ChainMetrics::increment_total_reorg_depth(reorg_depth); + self.metrics.reorgs_detected.increment(1_u64); + self.metrics.total_reorg_depth.increment(reorg_depth); } self.new_update( @@ -698,28 +703,20 @@ impl ChainUpdate { } } -struct ChainMetrics {} - -impl ChainMetrics { - fn set_block_height(block_height: u64) { - metrics::gauge!("op_pool_chain_block_height").set(block_height as f64); - } - - fn increment_reorgs_detected() { - metrics::counter!("op_pool_chain_reorgs_detected").increment(1); - } - - fn increment_total_reorg_depth(depth: u64) { - metrics::counter!("op_pool_chain_total_reorg_depth").increment(depth); - } - - fn increment_sync_retries() { - metrics::counter!("op_pool_chain_sync_retries").increment(1); - } - - fn increment_sync_abandoned() { - metrics::counter!("op_pool_chain_sync_abandoned").increment(1); - } +#[derive(Metrics)] +#[metrics(scope = "op_pool_chain")] + +struct ChainMetrics { + #[metric(describe = "the height of block.")] + block_height: Gauge, + #[metric(describe = "the number of reorg event detected.")] + reorgs_detected: Counter, + #[metric(describe = "the total number of reorg depth.")] + total_reorg_depth: Counter, + #[metric(describe = "the number of removed entities.")] + sync_retries: Counter, + #[metric(describe = "the number of sync abanded.")] + sync_abandoned: Counter, } #[cfg(test)] diff --git a/crates/pool/src/mempool/pool.rs b/crates/pool/src/mempool/pool.rs index 074ef4052..803d8369d 100644 --- a/crates/pool/src/mempool/pool.rs +++ b/crates/pool/src/mempool/pool.rs @@ -20,6 +20,8 @@ use std::{ use alloy_primitives::{Address, B256}; use anyhow::Context; +use metrics::{Gauge, Histogram}; +use metrics_derive::Metrics; use rundler_types::{ pool::{MempoolError, PoolOperation}, Entity, EntityType, GasFees, Timestamp, UserOperation, UserOperationId, UserOperationVariant, @@ -85,10 +87,13 @@ pub(crate) struct PoolInner { prev_sys_block_time: Duration, /// The number of the previous block prev_block_number: u64, + /// The metrics of pool. + metrics: PoolMetrics, } impl PoolInner { pub(crate) fn new(config: PoolInnerConfig) -> Self { + let entry_point = config.entry_point.to_string(); Self { config, by_hash: HashMap::new(), @@ -103,6 +108,7 @@ impl PoolInner { cache_size: SizeTracker::default(), prev_sys_block_time: Duration::default(), prev_block_number: 0, + metrics: PoolMetrics::new_with_labels(&[("entry_point", entry_point)]), } } @@ -200,7 +206,7 @@ impl PoolInner { self.remove_operation_by_hash(*hash); } - PoolMetrics::set_num_candidates(num_candidates, self.config.entry_point); + self.metrics.num_candidates.set(num_candidates as f64); self.prev_block_number = block_number; self.prev_sys_block_time = sys_block_time; @@ -295,7 +301,12 @@ impl PoolInner { // Time to mine will also fail because UO1's hash was removed from the pool. if let Some(time_to_mine) = self.time_to_mine.get(&mined_op.hash) { - PoolMetrics::record_time_to_mine(time_to_mine, mined_op.entry_point); + self.metrics + .time_to_mine + .record(time_to_mine.candidate_for_time.as_secs_f64()); + self.metrics + .blocks_to_mine + .record(time_to_mine.candidate_for_blocks as f64); } else { warn!("Could not find time to mine for {:?}", mined_op.hash); } @@ -524,16 +535,13 @@ impl PoolInner { } fn update_metrics(&self) { - PoolMetrics::set_pool_metrics( - self.by_hash.len(), - self.pool_size.0, - self.config.entry_point, - ); - PoolMetrics::set_cache_metrics( - self.mined_hashes_with_block_numbers.len(), - self.cache_size.0, - self.config.entry_point, - ); + self.metrics.num_ops_in_pool.set(self.by_hash.len() as f64); + self.metrics.size_bytes.set(self.pool_size.0 as f64); + + self.metrics + .num_ops_in_cache + .set(self.mined_hashes_with_block_numbers.len() as f64); + self.metrics.cache_size_bytes.set(self.cache_size.0 as f64); } } @@ -600,41 +608,23 @@ impl TimeToMineInfo { } } -struct PoolMetrics {} - -impl PoolMetrics { - fn set_pool_metrics(num_ops: usize, size_bytes: isize, entry_point: Address) { - metrics::gauge!("op_pool_num_ops_in_pool", "entry_point" => entry_point.to_string()) - .set(num_ops as f64); - metrics::gauge!("op_pool_size_bytes", "entry_point" => entry_point.to_string()) - .set(size_bytes as f64); - } - - fn set_cache_metrics(num_ops: usize, size_bytes: isize, entry_point: Address) { - metrics::gauge!("op_pool_num_ops_in_cache", "entry_point" => entry_point.to_string()) - .set(num_ops as f64); - metrics::gauge!("op_pool_cache_size_bytes", "entry_point" => entry_point.to_string()) - .set(size_bytes as f64); - } - - // Set the number of candidates in the pool, only changes on block boundaries - fn set_num_candidates(num_candidates: usize, entry_point: Address) { - metrics::gauge!("op_pool_num_candidates", "entry_point" => entry_point.to_string()) - .set(num_candidates as f64); - } - - fn record_time_to_mine(time_to_mine: &TimeToMineInfo, entry_point: Address) { - metrics::histogram!( - "op_pool_time_to_mine", - "entry_point" => entry_point.to_string() - ) - .record(time_to_mine.candidate_for_time.as_secs_f64()); - metrics::histogram!( - "op_pool_blocks_to_mine", - "entry_point" => entry_point.to_string() - ) - .record(time_to_mine.candidate_for_blocks as f64); - } +#[derive(Metrics)] +#[metrics(scope = "op_pool")] +struct PoolMetrics { + #[metric(describe = "the number of ops in mempool.")] + num_ops_in_pool: Gauge, + #[metric(describe = "the size of mempool in bytes.")] + size_bytes: Gauge, + #[metric(describe = "the number of ops in mempool cache(mined but not persistent).")] + num_ops_in_cache: Gauge, + #[metric(describe = "the size of mempool cache in bytes.")] + cache_size_bytes: Gauge, + #[metric(describe = "the number of candidates.")] + num_candidates: Gauge, + #[metric(describe = "the duration of a bundle mined.")] + time_to_mine: Histogram, + #[metric(describe = "the duration of a blcoked mined.")] + blocks_to_mine: Histogram, } #[cfg(test)] diff --git a/crates/pool/src/mempool/uo_pool.rs b/crates/pool/src/mempool/uo_pool.rs index b797e241f..a6e5978a0 100644 --- a/crates/pool/src/mempool/uo_pool.rs +++ b/crates/pool/src/mempool/uo_pool.rs @@ -15,6 +15,8 @@ use std::{collections::HashSet, marker::PhantomData, sync::Arc}; use alloy_primitives::{utils::format_units, Address, B256, U256}; use itertools::Itertools; +use metrics::{Counter, Gauge}; +use metrics_derive::Metrics; use parking_lot::RwLock; use rundler_provider::EntryPoint; use rundler_sim::{Prechecker, Simulator}; @@ -52,6 +54,8 @@ pub(crate) struct UoPool>, prechecker: P, simulator: S, + ep_specific_metrics: UoPoolMetricsEPSpecific, + metrics: UoPoolMetrics, _uo_type: PhantomData, } @@ -78,6 +82,7 @@ where paymaster: PaymasterTracker, reputation: Arc, ) -> Self { + let entry_point = config.entry_point.to_string(); Self { state: RwLock::new(UoPoolState { pool: PoolInner::new(config.clone().into()), @@ -92,6 +97,11 @@ where prechecker, simulator, config, + ep_specific_metrics: UoPoolMetricsEPSpecific::new_with_labels(&[( + "entry_point", + entry_point, + )]), + metrics: UoPoolMetrics::default(), _uo_type: PhantomData, } } @@ -117,8 +127,10 @@ where reason: OpRemovalReason::EntityThrottled { entity }, }) } - UoPoolMetrics::increment_removed_operations(count, self.config.entry_point); - UoPoolMetrics::increment_removed_entities(self.config.entry_point); + self.ep_specific_metrics + .removed_operations + .increment(count as u64); + self.ep_specific_metrics.removed_entities.increment(1_u64); } fn remove_entity(&self, entity: Entity) { @@ -131,8 +143,10 @@ where reason: OpRemovalReason::EntityRemoved { entity }, }) } - UoPoolMetrics::increment_removed_operations(count, self.config.entry_point); - UoPoolMetrics::increment_removed_entities(self.config.entry_point); + self.ep_specific_metrics + .removed_operations + .increment(count as u64); + self.ep_specific_metrics.removed_entities.increment(1_u64); } } @@ -260,11 +274,11 @@ where update.latest_block_hash, ); } - UoPoolMetrics::update_ops_seen( - mined_op_count as isize - unmined_op_count as isize, - self.config.entry_point, - ); - UoPoolMetrics::increment_unmined_operations(unmined_op_count, self.config.entry_point); + let ops_seen: f64 = (mined_op_count as isize - unmined_op_count as isize) as f64; + self.ep_specific_metrics.ops_seen.increment(ops_seen); + self.ep_specific_metrics + .unmined_operations + .increment(unmined_op_count); let mut state = self.state.write(); state @@ -323,20 +337,22 @@ where Ok(s) => s.parse::().unwrap_or_default(), Err(_) => 0.0, }; - UoPoolMetrics::current_max_fee_gwei(max_fee); + self.metrics.current_max_fee_gwei.set(max_fee); let max_priority_fee = match format_units(bundle_fees.max_priority_fee_per_gas, "gwei") { Ok(s) => s.parse::().unwrap_or_default(), Err(_) => 0.0, }; - UoPoolMetrics::current_max_priority_fee_gwei(max_priority_fee); + self.metrics + .current_max_priority_fee_gwei + .set(max_priority_fee); let base_fee_f64 = match format_units(base_fee, "gwei") { Ok(s) => s.parse::().unwrap_or_default(), Err(_) => 0.0, }; - UoPoolMetrics::current_base_fee(base_fee_f64); + self.metrics.current_base_fee.set(base_fee_f64); // cache for the next update { @@ -538,7 +554,7 @@ where } fn remove_operations(&self, hashes: &[B256]) { - let mut count = 0; + let mut count: u64 = 0; let mut removed_hashes = vec![]; { let mut state = self.state.write(); @@ -557,7 +573,7 @@ where reason: OpRemovalReason::Requested, }) } - UoPoolMetrics::increment_removed_operations(count, self.config.entry_point); + self.ep_specific_metrics.removed_operations.increment(count); } fn remove_op_by_id(&self, id: &UserOperationId) -> MempoolResult> { @@ -691,40 +707,28 @@ where } } -struct UoPoolMetrics {} - -impl UoPoolMetrics { - fn update_ops_seen(num_ops: isize, entry_point: Address) { - metrics::gauge!("op_pool_ops_seen", "entry_point" => entry_point.to_string()) - .increment(num_ops as f64); - } - - fn increment_unmined_operations(num_ops: usize, entry_point: Address) { - metrics::counter!("op_pool_unmined_operations", "entry_point" => entry_point.to_string()) - .increment(num_ops as u64); - } - - fn increment_removed_operations(num_ops: usize, entry_point: Address) { - metrics::counter!("op_pool_removed_operations", "entry_point" => entry_point.to_string()) - .increment(num_ops as u64); - } - - fn increment_removed_entities(entry_point: Address) { - metrics::counter!("op_pool_removed_entities", "entry_point" => entry_point.to_string()) - .increment(1); - } - - fn current_max_fee_gwei(fee: f64) { - metrics::gauge!("op_pool_current_max_fee_gwei").set(fee); - } - - fn current_max_priority_fee_gwei(fee: f64) { - metrics::gauge!("op_pool_current_max_priority_fee_gwei").set(fee); - } +#[derive(Metrics)] +#[metrics(scope = "op_pool")] +struct UoPoolMetricsEPSpecific { + #[metric(describe = "the number of ops seen.")] + ops_seen: Gauge, + #[metric(describe = "the number of unmined ops.")] + unmined_operations: Counter, + #[metric(describe = "the number of removed ops.")] + removed_operations: Counter, + #[metric(describe = "the number of removed entities.")] + removed_entities: Counter, +} - fn current_base_fee(fee: f64) { - metrics::gauge!("op_pool_current_base_fee").set(fee); - } +#[derive(Metrics)] +#[metrics(scope = "op_pool")] +struct UoPoolMetrics { + #[metric(describe = "the maximum fee in Gwei.")] + current_max_fee_gwei: Gauge, + #[metric(describe = "the maximum priority fee in Gwei.")] + current_max_priority_fee_gwei: Gauge, + #[metric(describe = "the base fee of current block.")] + current_base_fee: Gauge, } #[cfg(test)] diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 3057ebece..eca3555f7 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -33,6 +33,7 @@ strum.workspace = true url.workspace = true futures-util.workspace = true http = "1.1.0" +metrics-derive = "0.1.0" [dev-dependencies] mockall.workspace = true diff --git a/crates/rpc/src/utils.rs b/crates/rpc/src/utils.rs index 92c2f9ed7..c6b67ee5f 100644 --- a/crates/rpc/src/utils.rs +++ b/crates/rpc/src/utils.rs @@ -18,9 +18,18 @@ use jsonrpsee::{ core::RpcResult, types::{error::INTERNAL_ERROR_CODE, ErrorObjectOwned}, }; +use metrics::Counter; +use metrics_derive::Metrics; use crate::{error::rpc_err, eth::EthRpcError}; +#[derive(Metrics)] +#[metrics(scope = "rpc")] +struct RPCMetric { + #[metric(describe = "the count of rpc panic.")] + panic_count: Counter, +} + pub(crate) async fn safe_call_rpc_handler(rpc_name: &'static str, f: F) -> RpcResult where F: Future> + Send, @@ -30,7 +39,8 @@ where match f.catch_unwind().await { Ok(r) => r.map_err(Into::into), Err(_) => { - metrics::counter!("rpc_panic_count", "rpc_name" => rpc_name).increment(1); + let rpc_metric = RPCMetric::new_with_labels(&[("rpc_name", rpc_name)]); + rpc_metric.panic_count.increment(1_u64); tracing::error!("PANIC in RPC handler: {}", rpc_name); Err(EthRpcError::Internal(anyhow::anyhow!("internal error: panic, see logs")).into()) } From 1e400a9a8fb3cd8d5e9898b85f58cb3bccc2def6 Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Tue, 8 Oct 2024 20:49:26 -0400 Subject: [PATCH 2/4] feat(metric): use metrics_derive. --- Cargo.lock | 1 + bin/rundler/Cargo.toml | 1 + bin/rundler/src/cli/metrics.rs | 142 +++++++++++++++------------------ 3 files changed, 68 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7fa38e64..c87f8f51c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4414,6 +4414,7 @@ dependencies = [ "go-parse-duration", "itertools 0.13.0", "metrics", + "metrics-derive", "metrics-exporter-prometheus", "metrics-process", "metrics-util", diff --git a/bin/rundler/Cargo.toml b/bin/rundler/Cargo.toml index a99106ec3..efb3099e1 100644 --- a/bin/rundler/Cargo.toml +++ b/bin/rundler/Cargo.toml @@ -47,3 +47,4 @@ tracing.workspace = true tracing-appender = "0.2.3" tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json"] } +metrics-derive = "0.1.0" diff --git a/bin/rundler/src/cli/metrics.rs b/bin/rundler/src/cli/metrics.rs index 460b3e322..735b79d4a 100644 --- a/bin/rundler/src/cli/metrics.rs +++ b/bin/rundler/src/cli/metrics.rs @@ -14,7 +14,8 @@ use std::{net::SocketAddr, time::Duration}; use itertools::Itertools; -use metrics::gauge; +use metrics::Gauge; +use metrics_derive::Metrics; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_process::Collector; use metrics_util::layers::{PrefixLayer, Stack}; @@ -66,11 +67,13 @@ pub fn initialize<'a, T: TaskSpawner>( let frequency = std::time::Duration::from_millis(sample_interval_millis); let runtime_metrics = handle.metrics(); let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle); + let tokio_runtime_metrics = TokioMetrics::default(); + task_spawner.spawn_critical( "tokio metrics collector", Box::pin(async move { for metrics in runtime_monitor.intervals() { - collect_tokio(&runtime_metrics, metrics); + collect_tokio(&runtime_metrics, metrics, &tokio_runtime_metrics); tokio::time::sleep(frequency).await; } }), @@ -82,7 +85,7 @@ pub fn initialize<'a, T: TaskSpawner>( #[allow(dead_code)] #[derive(Metrics)] #[metrics(scope = "rundler_tokio_rt")] -struct TokioMetrics { +pub(crate) struct TokioMetrics { #[metric(describe = "the total number of tokio wokers.")] num_workers: Gauge, #[metric(describe = "the number of blocking threads.")] @@ -211,83 +214,70 @@ macro_rules! log_rm_metric { } macro_rules! log_wm_metric { - ($tm:ident, $wm:ident, $metric_name:ident) => { - $tm.$metric_name.set($wm.$metric_name as f64); + ($tm:ident, $rm:ident, $metric_name:ident) => { + $tm.$metric_name.set($rm.$metric_name as f64); }; - ($tm:ident, $wm:ident, $metric_name:ident, $converter:ident) => { - $tm.$metric_name.set($wm.$metric_name.$converter() as f64); + ($tm:ident, $rm:ident, $metric_name:ident, $converter:ident) => { + $tm.$metric_name.set($rm.$metric_name.$converter() as f64); }; } fn collect_tokio( - runtime_metrics: &tokio::runtime::RuntimeMetrics, - worker_metrics: tokio_metrics::RuntimeMetrics, + rm: &tokio::runtime::RuntimeMetrics, + wm: tokio_metrics::RuntimeMetrics, + tm: &TokioMetrics, ) { - gauge!(format!("{}num_workers", TOKIO_PREFIX)).set(runtime_metrics.num_workers() as f64); - gauge!(format!("{}num_blocking_threads", TOKIO_PREFIX)) - .set(runtime_metrics.num_blocking_threads() as f64); - gauge!(format!("{}active_tasks_count", TOKIO_PREFIX)) - .set(runtime_metrics.num_alive_tasks() as f64); - gauge!(format!("{}num_idle_blocking_threads", TOKIO_PREFIX)) - .set(runtime_metrics.num_idle_blocking_threads() as f64); - gauge!(format!("{}blocking_queue_depth", TOKIO_PREFIX)) - .set(runtime_metrics.blocking_queue_depth() as f64); - gauge!(format!("{}total_park_count", TOKIO_PREFIX)).set(worker_metrics.total_park_count as f64); - gauge!(format!("{}max_park_count", TOKIO_PREFIX)).set(worker_metrics.max_park_count as f64); - gauge!(format!("{}min_park_count", TOKIO_PREFIX)).set(worker_metrics.min_park_count as f64); - gauge!(format!("{}mean_poll_duration", TOKIO_PREFIX)) - .set(worker_metrics.mean_poll_duration.as_secs_f64()); - gauge!(format!("{}mean_poll_duration_worker_min", TOKIO_PREFIX)) - .set(worker_metrics.mean_poll_duration_worker_min.as_secs_f64()); - gauge!(format!("{}mean_poll_duration_worker_max", TOKIO_PREFIX)) - .set(worker_metrics.mean_poll_duration_worker_max.as_secs_f64()); - gauge!(format!("{}total_noop_count", TOKIO_PREFIX)).set(worker_metrics.total_noop_count as f64); - gauge!(format!("{}max_noop_count", TOKIO_PREFIX)).set(worker_metrics.max_noop_count as f64); - gauge!(format!("{}min_noop_count", TOKIO_PREFIX)).set(worker_metrics.min_noop_count as f64); - gauge!(format!("{}total_steal_count", TOKIO_PREFIX)) - .set(worker_metrics.total_steal_count as f64); - gauge!(format!("{}max_steal_count", TOKIO_PREFIX),).set(worker_metrics.max_steal_count as f64); - gauge!(format!("{}min_steal_count", TOKIO_PREFIX),).set(worker_metrics.min_steal_count as f64); - gauge!(format!("{}total_steal_operations", TOKIO_PREFIX)) - .set(worker_metrics.total_steal_operations as f64); - gauge!(format!("{}max_steal_operations", TOKIO_PREFIX)) - .set(worker_metrics.max_steal_operations as f64); - gauge!(format!("{}min_steal_operations", TOKIO_PREFIX)) - .set(worker_metrics.min_steal_operations as f64); - gauge!(format!("{}num_remote_schedules", TOKIO_PREFIX)) - .set(worker_metrics.num_remote_schedules as f64); - gauge!(format!("{}total_local_schedule_count", TOKIO_PREFIX)) - .set(worker_metrics.total_local_schedule_count as f64); - gauge!(format!("{}max_local_schedule_count", TOKIO_PREFIX),) - .set(worker_metrics.max_local_schedule_count as f64); - gauge!(format!("{}min_local_schedule_count", TOKIO_PREFIX),) - .set(worker_metrics.min_local_schedule_count as f64); - gauge!(format!("{}total_overflow_count", TOKIO_PREFIX)) - .set(worker_metrics.total_overflow_count as f64); - gauge!(format!("{}max_overflow_count", TOKIO_PREFIX)) - .set(worker_metrics.max_overflow_count as f64); - gauge!(format!("{}min_overflow_count", TOKIO_PREFIX),) - .set(worker_metrics.min_overflow_count as f64); - gauge!(format!("{}total_polls_count", TOKIO_PREFIX)) - .set(worker_metrics.total_polls_count as f64); - gauge!(format!("{}max_polls_count", TOKIO_PREFIX)).set(worker_metrics.max_polls_count as f64); - gauge!(format!("{}min_polls_count", TOKIO_PREFIX)).set(worker_metrics.min_polls_count as f64); - gauge!(format!("{}total_busy_duration", TOKIO_PREFIX)) - .set(worker_metrics.total_busy_duration.as_secs_f64()); - gauge!(format!("{}max_busy_duration", TOKIO_PREFIX)) - .set(worker_metrics.max_busy_duration.as_secs_f64()); - gauge!(format!("{}min_busy_duration", TOKIO_PREFIX)) - .set(worker_metrics.min_busy_duration.as_secs_f64()); - gauge!(format!("{}injection_queue_depth", TOKIO_PREFIX)) - .set(worker_metrics.injection_queue_depth as f64); - gauge!(format!("{}total_local_queue_depth", TOKIO_PREFIX)) - .set(worker_metrics.total_local_queue_depth as f64); - gauge!(format!("{}max_local_queue_depth", TOKIO_PREFIX)) - .set(worker_metrics.max_local_queue_depth as f64); - gauge!(format!("{}min_local_queue_depth", TOKIO_PREFIX)) - .set(worker_metrics.min_local_queue_depth as f64); - gauge!(format!("{}budget_forced_yield_count", TOKIO_PREFIX)) - .set(worker_metrics.budget_forced_yield_count as f64); - gauge!(format!("{}io_driver_ready_count", TOKIO_PREFIX)) - .set(worker_metrics.io_driver_ready_count as f64); + log_rm_metric!(tm, rm, num_workers); + log_rm_metric!(tm, rm, num_blocking_threads); + log_rm_metric!(tm, rm, num_alive_tasks); + log_rm_metric!(tm, rm, num_idle_blocking_threads); + log_rm_metric!(tm, rm, num_idle_blocking_threads); + log_rm_metric!(tm, rm, blocking_queue_depth); + + log_wm_metric!(tm, wm, total_park_count); + log_wm_metric!(tm, wm, max_park_count); + log_wm_metric!(tm, wm, min_park_count); + + log_wm_metric!(tm, wm, mean_poll_duration, as_secs_f64); + log_wm_metric!(tm, wm, mean_poll_duration_worker_min, as_secs_f64); + log_wm_metric!(tm, wm, mean_poll_duration_worker_max, as_secs_f64); + + log_wm_metric!(tm, wm, total_noop_count); + log_wm_metric!(tm, wm, max_noop_count); + log_wm_metric!(tm, wm, min_noop_count); + + log_wm_metric!(tm, wm, total_steal_count); + log_wm_metric!(tm, wm, max_steal_count); + log_wm_metric!(tm, wm, min_steal_count); + + log_wm_metric!(tm, wm, total_steal_operations); + log_wm_metric!(tm, wm, max_steal_operations); + log_wm_metric!(tm, wm, min_steal_operations); + + log_wm_metric!(tm, wm, num_remote_schedules); + log_wm_metric!(tm, wm, total_local_schedule_count); + + log_wm_metric!(tm, wm, max_local_schedule_count); + log_wm_metric!(tm, wm, min_local_schedule_count); + log_wm_metric!(tm, wm, total_overflow_count); + + log_wm_metric!(tm, wm, max_overflow_count); + log_wm_metric!(tm, wm, min_overflow_count); + log_wm_metric!(tm, wm, total_polls_count); + + log_wm_metric!(tm, wm, max_polls_count); + log_wm_metric!(tm, wm, min_polls_count); + + log_wm_metric!(tm, wm, total_busy_duration, as_secs_f64); + log_wm_metric!(tm, wm, max_busy_duration, as_secs_f64); + log_wm_metric!(tm, wm, min_busy_duration, as_secs_f64); + + log_wm_metric!(tm, wm, injection_queue_depth); + + log_wm_metric!(tm, wm, total_local_queue_depth); + log_wm_metric!(tm, wm, max_local_queue_depth); + log_wm_metric!(tm, wm, min_local_queue_depth); + + log_wm_metric!(tm, wm, budget_forced_yield_count); + log_wm_metric!(tm, wm, io_driver_ready_count); } From 298870f9527c129c247f5f88a5a5b20efa62e998 Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Tue, 8 Oct 2024 11:02:06 -0400 Subject: [PATCH 3/4] feat(metric): switch to metrics_derive. --- Cargo.lock | 1 + crates/types/Cargo.toml | 1 + crates/types/src/task/metric_recorder.rs | 123 +++++++++++------------ 3 files changed, 59 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c87f8f51c..a62d6aec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4673,6 +4673,7 @@ dependencies = [ "constcat", "futures-util", "metrics", + "metrics-derive", "mockall", "num_enum", "parse-display", diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 1344efd43..5b2eee4e2 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -30,6 +30,7 @@ thiserror.workspace = true metrics.workspace = true mockall = {workspace = true, optional = true } +metrics-derive = "0.1.0" [dev-dependencies] rundler-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/types/src/task/metric_recorder.rs b/crates/types/src/task/metric_recorder.rs index 81815c782..d29c481e6 100644 --- a/crates/types/src/task/metric_recorder.rs +++ b/crates/types/src/task/metric_recorder.rs @@ -10,9 +10,10 @@ // // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use std::time::{Duration, Instant}; +use std::time::Instant; -use metrics::{counter, gauge, histogram}; +use metrics::{Counter, Gauge, Histogram}; +use metrics_derive::Metrics; use super::status_code::{HttpCode, RpcCode}; @@ -22,6 +23,30 @@ pub struct MethodSessionLogger { service_name: String, method_name: String, protocol: String, + method_metric: MethodMetrics, +} + +#[derive(Metrics)] +#[metrics(scope = "rundler_runtime")] +pub(crate) struct MethodMetrics { + #[metric(describe = "total num of requests.")] + num_requests: Counter, + + #[metric(describe = "opening requests.")] + open_requests: Gauge, + + #[metric(describe = "opening requests.")] + request_latency: Histogram, +} + +#[derive(Metrics)] +#[metrics(scope = "rundler_runtime")] +pub(crate) struct MethodStatusMetrics { + #[metric(describe = "opening requests.")] + http_response_status: Counter, + + #[metric(describe = "opening requests.")] + rpc_response_status: Counter, } impl MethodSessionLogger { @@ -29,86 +54,52 @@ impl MethodSessionLogger { pub fn new(service_name: String, method_name: String, protocol: String) -> Self { Self { start_time: Instant::now(), - method_name, - service_name, - protocol, + method_name: method_name.clone(), + service_name: service_name.clone(), + protocol: protocol.clone(), + method_metric: MethodMetrics::new_with_labels(&[ + ("method_name", method_name), + ("service_name", service_name), + ("protocol", protocol), + ]), } } /// start the session. time will be initialized. pub fn start(service_name: String, method_name: String, protocol: String) -> Self { - MethodMetrics::increment_num_requests(&method_name, &service_name, &protocol); - MethodMetrics::increment_open_requests(&method_name, &service_name, &protocol); - Self::new(service_name, method_name, protocol) + let logger = Self::new(service_name, method_name, protocol); + logger.method_metric.num_requests.increment(1); + logger.method_metric.open_requests.increment(1); + logger } /// record a rpc status code. pub fn record_rpc(&self, rpc_code: RpcCode) { - MethodMetrics::increment_rpc_response_code(&self.method_name, &self.service_name, rpc_code); + let method_status_metric = MethodStatusMetrics::new_with_labels(&[ + ("method_name", self.method_name.clone()), + ("service_name", self.service_name.clone()), + ("protocol", self.protocol.clone()), + ("status_code", rpc_code.to_string()), + ]); + method_status_metric.rpc_response_status.increment(1); } /// record a http status code. pub fn record_http(&self, http_code: HttpCode) { - MethodMetrics::increment_http_response_code( - &self.method_name, - &self.service_name, - http_code, - ); + let method_status_metric = MethodStatusMetrics::new_with_labels(&[ + ("method_name", self.method_name.clone()), + ("service_name", self.service_name.clone()), + ("protocol", self.protocol.clone()), + ("status_code", http_code.to_string()), + ]); + method_status_metric.http_response_status.increment(1); } /// end of the session. Record the session duration. pub fn done(&self) { - MethodMetrics::record_request_latency( - &self.method_name, - &self.service_name, - &self.protocol, - self.start_time.elapsed(), - ); - MethodMetrics::decrement_open_requests( - &self.method_name, - &self.service_name, - &self.protocol, - ); - } -} - -struct MethodMetrics {} - -impl MethodMetrics { - pub(crate) fn increment_num_requests(method_name: &str, service_name: &str, protocol: &str) { - counter!("num_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).increment(1) - } - - pub(crate) fn increment_open_requests(method_name: &str, service_name: &str, protocol: &str) { - gauge!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).increment(1_f64) - } - - pub(crate) fn decrement_open_requests(method_name: &str, service_name: &str, protocol: &str) { - gauge!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).decrement(1_f64) - } - - pub(crate) fn increment_http_response_code( - method_name: &str, - service_name: &str, - http_status_code: HttpCode, - ) { - counter!("http_response_status", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => "http", "response_code" => http_status_code.to_string()).increment(1) - } - - pub(crate) fn increment_rpc_response_code( - method_name: &str, - service_name: &str, - rpc_status_code: RpcCode, - ) { - counter!("rpc_response_status", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => "rpc", "response_code" => rpc_status_code.to_string()).increment(1) - } - - pub(crate) fn record_request_latency( - method_name: &str, - service_name: &str, - protocol: &str, - latency: Duration, - ) { - histogram!("request_latency", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocol" => protocol.to_string()).record(latency.as_millis() as f64) + self.method_metric.open_requests.decrement(1); + self.method_metric + .request_latency + .record(self.start_time.elapsed().as_millis() as f64); } } From 9ab1dfd483995def4494de6d00115274d03c28df Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Fri, 11 Oct 2024 13:39:18 -0400 Subject: [PATCH 4/4] feat(metrics-derive): minor fix of metric description, lint. --- Cargo.toml | 1 + bin/rundler/Cargo.toml | 3 +- bin/rundler/src/cli/metrics.rs | 2 +- crates/builder/Cargo.toml | 2 +- crates/builder/src/bundle_sender.rs | 72 +++++++++++------------ crates/builder/src/signer/mod.rs | 4 +- crates/builder/src/transaction_tracker.rs | 2 +- crates/pool/Cargo.toml | 2 +- crates/pool/src/chain.rs | 15 +++-- crates/pool/src/mempool/pool.rs | 6 +- crates/pool/src/mempool/uo_pool.rs | 10 ++-- crates/rpc/Cargo.toml | 2 +- crates/rpc/src/utils.rs | 2 +- crates/types/Cargo.toml | 2 +- crates/types/src/task/metric_recorder.rs | 28 +++++---- 15 files changed, 78 insertions(+), 75 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0436ebc7e..7d3ca9840 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ futures-util = "0.3.30" itertools = "0.13.0" jsonrpsee = "0.24.5" metrics = "0.23.0" +metrics-derive = "0.1.0" mockall = "0.13.0" parse-display = "0.10.0" pin-project = "1.1.5" diff --git a/bin/rundler/Cargo.toml b/bin/rundler/Cargo.toml index efb3099e1..57fece234 100644 --- a/bin/rundler/Cargo.toml +++ b/bin/rundler/Cargo.toml @@ -47,4 +47,5 @@ tracing.workspace = true tracing-appender = "0.2.3" tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json"] } -metrics-derive = "0.1.0" +metrics-derive.workspace = true + diff --git a/bin/rundler/src/cli/metrics.rs b/bin/rundler/src/cli/metrics.rs index 735b79d4a..8c3f29ed0 100644 --- a/bin/rundler/src/cli/metrics.rs +++ b/bin/rundler/src/cli/metrics.rs @@ -84,7 +84,7 @@ pub fn initialize<'a, T: TaskSpawner>( #[allow(dead_code)] #[derive(Metrics)] -#[metrics(scope = "rundler_tokio_rt")] +#[metrics(scope = "tokio_rt")] pub(crate) struct TokioMetrics { #[metric(describe = "the total number of tokio wokers.")] num_workers: Gauge, diff --git a/crates/builder/Cargo.toml b/crates/builder/Cargo.toml index dea86378d..07b4a6020 100644 --- a/crates/builder/Cargo.toml +++ b/crates/builder/Cargo.toml @@ -51,7 +51,7 @@ serde_json.workspace = true strum.workspace = true mockall = {workspace = true, optional = true } -metrics-derive = "0.1.0" +metrics-derive.workspace = true [dev-dependencies] mockall.workspace = true diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index d3a26beba..b29c90300 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -156,7 +156,7 @@ where loop { if let Err(e) = self.step_state(&mut state).await { error!("Error in bundle sender loop: {e:#?}"); - self.metrics.state_machine_errors.increment(1_u64); + self.metrics.state_machine_errors.increment(1); state.reset(); } } @@ -278,7 +278,7 @@ where "Abandoning bundle after {} fee increases, no operations available after fee increase", inner.fee_increase_count ); - self.metrics.bundle_txns_abandoned.increment(1_u64); + self.metrics.bundle_txns_abandoned.increment(1); // abandon the bundle by starting a new bundle process // If the node we are using still has the transaction in the mempool, its @@ -311,7 +311,7 @@ where } Err(error) => { error!("Bundle send error {error:?}"); - self.metrics.bundle_txns_failed.increment(1_u64); + self.metrics.bundle_txns_failed.increment(1); state.bundle_error(error); } } @@ -353,7 +353,7 @@ where self.builder_index, nonce, )); - self.metrics.bundle_txns_dropped.increment(1_u64); + self.metrics.bundle_txns_dropped.increment(1); // try again, increasing fees state.update(InnerState::Building(inner.to_building())); } @@ -363,7 +363,7 @@ where self.builder_index, nonce, )); - self.metrics.bundle_txns_nonce_used.increment(1_u64); + self.metrics.bundle_txns_nonce_used.increment(1); state.reset(); } } @@ -376,7 +376,7 @@ where self.settings.max_blocks_to_wait_for_mine, inner.fee_increase_count + 1 ); - self.metrics.bundle_txn_fee_increases.increment(1_u64); + self.metrics.bundle_txn_fee_increases.increment(1); state.update(InnerState::Building(inner.to_building())) } @@ -407,7 +407,7 @@ where match cancel_res { Ok(Some(_)) => { info!("Cancellation transaction sent, waiting for confirmation"); - self.metrics.cancellation_txns_sent.increment(1_u64); + self.metrics.cancellation_txns_sent.increment(1); state.update(InnerState::CancelPending(inner.to_cancel_pending( state.block_number() + self.settings.max_blocks_to_wait_for_mine, @@ -415,7 +415,7 @@ where } Ok(None) => { info!("Soft cancellation or no transaction to cancel, starting new bundle attempt"); - self.metrics.soft_cancellations.increment(1_u64); + self.metrics.soft_cancellations.increment(1); state.reset(); } Err(TransactionTrackerError::ReplacementUnderpriced) => { @@ -423,7 +423,7 @@ where if inner.fee_increase_count >= self.settings.max_cancellation_fee_increases { // abandon the cancellation warn!("Abandoning cancellation after max fee increases {}, starting new bundle attempt", inner.fee_increase_count); - self.metrics.cancellations_abandoned.increment(1_u64); + self.metrics.cancellations_abandoned.increment(1); state.reset(); } else { // Increase fees again @@ -441,7 +441,7 @@ where } Err(e) => { error!("Failed to cancel transaction, moving back to building state: {e:#?}"); - self.metrics.cancellation_txns_failed.increment(1_u64); + self.metrics.cancellation_txns_failed.increment(1); state.reset(); } } @@ -466,7 +466,7 @@ where // mined let fee = gas_used.zip(gas_price).map(|(used, price)| used * price); info!("Cancellation transaction mined. Price (wei) {fee:?}"); - self.metrics.cancellation_txns_mined.increment(1_u64); + self.metrics.cancellation_txns_mined.increment(1); if let Some(fee) = fee { self.metrics .cancellation_txns_total_fee @@ -489,7 +489,7 @@ where if inner.fee_increase_count >= self.settings.max_cancellation_fee_increases { // abandon the cancellation warn!("Abandoning cancellation after max fee increases {}, starting new bundle attempt", inner.fee_increase_count); - self.metrics.cancellations_abandoned.increment(1_u64); + self.metrics.cancellations_abandoned.increment(1); state.reset(); } else { // start replacement, don't wait for trigger @@ -549,7 +549,7 @@ where op_hashes, } = bundle_tx; - self.metrics.bundle_txns_sent.increment(1_u64); + self.metrics.bundle_txns_sent.increment(1); let send_result = state .transaction_tracker @@ -573,17 +573,17 @@ where Ok(SendBundleAttemptResult::Success) } Err(TransactionTrackerError::NonceTooLow) => { - self.metrics.bundle_txn_nonce_too_low.increment(1_u64); + self.metrics.bundle_txn_nonce_too_low.increment(1); warn!("Bundle attempt nonce too low"); Ok(SendBundleAttemptResult::NonceTooLow) } Err(TransactionTrackerError::ReplacementUnderpriced) => { - self.metrics.bundle_replacement_underpriced.increment(1_u64); + self.metrics.bundle_replacement_underpriced.increment(1); warn!("Bundle attempt replacement transaction underpriced"); Ok(SendBundleAttemptResult::ReplacementUnderpriced) } Err(TransactionTrackerError::ConditionNotMet) => { - self.metrics.bundle_txn_condition_not_met.increment(1_u64); + self.metrics.bundle_txn_condition_not_met.increment(1); warn!("Bundle attempt condition not met"); Ok(SendBundleAttemptResult::ConditionNotMet) } @@ -894,7 +894,7 @@ impl BuildingState { // Finalize an underpriced round. // - // This will clear out the number of fee increases and increment the number of underpriced rounds. + // This will clear out the count of fee increases and increment the count of underpriced rounds. // Use this when we are in an underpriced state, but there are no longer any UOs available to bundle. fn underpriced_round(self) -> Self { let mut underpriced_info = self @@ -1135,49 +1135,49 @@ impl BundleSenderTrigger { #[derive(Metrics)] #[metrics(scope = "builder")] struct BuilderMetric { - #[metric(describe = "the number of bundle transactions already sent.")] + #[metric(describe = "the count of bundle transactions already sent.")] bundle_txns_sent: Counter, - #[metric(describe = "the number of bundle transactions successed.")] + #[metric(describe = "the count of bundle transactions successed.")] bundle_txns_success: Counter, - #[metric(describe = "the number of bundle gas limit.")] + #[metric(describe = "the count of bundle gas limit.")] bundle_gas_limit: Counter, - #[metric(describe = "the number of bundle gas used.")] + #[metric(describe = "the count of bundle gas used.")] bundle_gas_used: Counter, - #[metric(describe = "the number of dropped bundle transactions.")] + #[metric(describe = "the count of dropped bundle transactions.")] bundle_txns_dropped: Counter, - #[metric(describe = "the number of anabdoned bundle transactions.")] + #[metric(describe = "the count of anabdoned bundle transactions.")] bundle_txns_abandoned: Counter, - #[metric(describe = "the number of failed bundle transactions.")] + #[metric(describe = "the count of failed bundle transactions.")] bundle_txns_failed: Counter, - #[metric(describe = "the number of bundle transactin nonce used event.")] + #[metric(describe = "the count of bundle transaction nonce used events.")] bundle_txns_nonce_used: Counter, - #[metric(describe = "the number of bundle transactions fee increase event.")] + #[metric(describe = "the count of bundle transactions fee increase events.")] bundle_txn_fee_increases: Counter, - #[metric(describe = "the number of bundle transactions underprice replaced event.")] + #[metric(describe = "the count of bundle transactions underprice replaced events.")] bundle_replacement_underpriced: Counter, - #[metric(describe = "the number of bundle transactions nonce too low event.")] + #[metric(describe = "the count of bundle transactions nonce too low events.")] bundle_txn_nonce_too_low: Counter, - #[metric(describe = "the number of bundle transactions condition not met event.")] + #[metric(describe = "the count of bundle transactions condition not met events.")] bundle_txn_condition_not_met: Counter, - #[metric(describe = "the number of cancellation bundle transactions sent event.")] + #[metric(describe = "the count of cancellation bundle transactions sent events.")] cancellation_txns_sent: Counter, - #[metric(describe = "the number of cancellation bundle transactions mined event.")] + #[metric(describe = "the count of cancellation bundle transactions mined events.")] cancellation_txns_mined: Counter, #[metric(describe = "the total fee of cancellation bundle transactions.")] cancellation_txns_total_fee: Counter, - #[metric(describe = "the number of cancellation bundle transactions abandon event.")] + #[metric(describe = "the count of cancellation bundle transactions abandon events.")] cancellations_abandoned: Counter, - #[metric(describe = "the number of soft cancellation bundle transactions event.")] + #[metric(describe = "the count of soft cancellation bundle transactions events.")] soft_cancellations: Counter, - #[metric(describe = "the number of cancellation bundle transactions failed event.")] + #[metric(describe = "the count of cancellation bundle transactions failed events.")] cancellation_txns_failed: Counter, - #[metric(describe = "the number of state machine errors.")] + #[metric(describe = "the count of state machine errors.")] state_machine_errors: Counter, } impl BuilderMetric { fn process_bundle_txn_success(&self, gas_limit: Option, gas_used: Option) { - self.bundle_txns_success.increment(1_u64); + self.bundle_txns_success.increment(1); if let Some(limit) = gas_limit { self.bundle_gas_limit.increment(limit); } diff --git a/crates/builder/src/signer/mod.rs b/crates/builder/src/signer/mod.rs index ef861250e..a049a58cc 100644 --- a/crates/builder/src/signer/mod.rs +++ b/crates/builder/src/signer/mod.rs @@ -73,13 +73,13 @@ struct BuilderMetric { } pub(crate) async fn monitor_account_balance(addr: Address, provider: P) { + let metric = BuilderMetric::new_with_labels(&[("addr", format!("{addr:?}"))]); loop { match provider.get_balance(addr, None).await { Ok(balance) => { let eth_balance = balance.to_f64().unwrap_or_default() / 1e18; - tracing::info!("account {addr:?} balance: {}", eth_balance); - let metric = BuilderMetric::new_with_labels(&[("addr", format!("{addr:?}"))]); metric.account_balance.set(eth_balance); + tracing::info!("account {addr:?} balance: {}", eth_balance); } Err(err) => { tracing::error!("Get account {addr:?} balance error {err:?}"); diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index 49d39d2ae..49cc2cc56 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -495,7 +495,7 @@ impl From for TransactionTrackerError { struct TransactionTrackerMetrics { #[metric(describe = "the number of pending transactions.")] num_pending_transactions: Gauge, - #[metric(describe = "the nonce of transaction.")] + #[metric(describe = "the current account‘s nonce.")] nonce: Gauge, #[metric(describe = "the number of pending transactions.")] attempt_count: Gauge, diff --git a/crates/pool/Cargo.toml b/crates/pool/Cargo.toml index a91e01f5a..faabb4fad 100644 --- a/crates/pool/Cargo.toml +++ b/crates/pool/Cargo.toml @@ -40,7 +40,7 @@ tracing.workspace = true url.workspace = true mockall = {workspace = true, optional = true } -metrics-derive = "0.1.0" +metrics-derive.workspace = true [dev-dependencies] mockall.workspace = true diff --git a/crates/pool/src/chain.rs b/crates/pool/src/chain.rs index 9068ad99f..24ba9d0a3 100644 --- a/crates/pool/src/chain.rs +++ b/crates/pool/src/chain.rs @@ -211,7 +211,7 @@ impl Chain

{ for i in 0..=self.settings.max_sync_retries { if i > 0 { - self.metrics.sync_retries.increment(1_u64); + self.metrics.sync_retries.increment(1); } let update = self.sync_to_block(block.clone()).await; @@ -229,7 +229,7 @@ impl Chain

{ "Failed to update chain at block {:?} after {} retries. Abandoning sync.", block_hash, self.settings.max_sync_retries ); - self.metrics.sync_abandoned.increment(1_u64); + self.metrics.sync_abandoned.increment(1); } } @@ -333,7 +333,7 @@ impl Chain

{ self.metrics.block_height.set(current_block_number as f64); if reorg_depth > 0 { - self.metrics.reorgs_detected.increment(1_u64); + self.metrics.reorgs_detected.increment(1); self.metrics.total_reorg_depth.increment(reorg_depth); } @@ -705,17 +705,16 @@ impl ChainUpdate { #[derive(Metrics)] #[metrics(scope = "op_pool_chain")] - struct ChainMetrics { #[metric(describe = "the height of block.")] block_height: Gauge, - #[metric(describe = "the number of reorg event detected.")] + #[metric(describe = "the count of reorg event detected.")] reorgs_detected: Counter, - #[metric(describe = "the total number of reorg depth.")] + #[metric(describe = "the count of reorg depth.")] total_reorg_depth: Counter, - #[metric(describe = "the number of removed entities.")] + #[metric(describe = "the count of sync retries.")] sync_retries: Counter, - #[metric(describe = "the number of sync abanded.")] + #[metric(describe = "the count of sync abanded.")] sync_abandoned: Counter, } diff --git a/crates/pool/src/mempool/pool.rs b/crates/pool/src/mempool/pool.rs index 803d8369d..e976b2093 100644 --- a/crates/pool/src/mempool/pool.rs +++ b/crates/pool/src/mempool/pool.rs @@ -615,15 +615,15 @@ struct PoolMetrics { num_ops_in_pool: Gauge, #[metric(describe = "the size of mempool in bytes.")] size_bytes: Gauge, - #[metric(describe = "the number of ops in mempool cache(mined but not persistent).")] + #[metric(describe = "the number of ops in mempool cache (mined but not persistent).")] num_ops_in_cache: Gauge, #[metric(describe = "the size of mempool cache in bytes.")] cache_size_bytes: Gauge, #[metric(describe = "the number of candidates.")] num_candidates: Gauge, - #[metric(describe = "the duration of a bundle mined.")] + #[metric(describe = "the duration distribution of a bundle mined.")] time_to_mine: Histogram, - #[metric(describe = "the duration of a blcoked mined.")] + #[metric(describe = "the duration distribution of a blocked mined.")] blocks_to_mine: Histogram, } diff --git a/crates/pool/src/mempool/uo_pool.rs b/crates/pool/src/mempool/uo_pool.rs index a6e5978a0..f077b241f 100644 --- a/crates/pool/src/mempool/uo_pool.rs +++ b/crates/pool/src/mempool/uo_pool.rs @@ -130,7 +130,7 @@ where self.ep_specific_metrics .removed_operations .increment(count as u64); - self.ep_specific_metrics.removed_entities.increment(1_u64); + self.ep_specific_metrics.removed_entities.increment(1); } fn remove_entity(&self, entity: Entity) { @@ -146,7 +146,7 @@ where self.ep_specific_metrics .removed_operations .increment(count as u64); - self.ep_specific_metrics.removed_entities.increment(1_u64); + self.ep_specific_metrics.removed_entities.increment(1); } } @@ -712,11 +712,11 @@ where struct UoPoolMetricsEPSpecific { #[metric(describe = "the number of ops seen.")] ops_seen: Gauge, - #[metric(describe = "the number of unmined ops.")] + #[metric(describe = "the count of unmined ops.")] unmined_operations: Counter, - #[metric(describe = "the number of removed ops.")] + #[metric(describe = "the count of removed ops.")] removed_operations: Counter, - #[metric(describe = "the number of removed entities.")] + #[metric(describe = "the count of removed entities.")] removed_entities: Counter, } diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index eca3555f7..afe10c717 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -22,6 +22,7 @@ anyhow.workspace = true async-trait.workspace = true jsonrpsee = { workspace = true , features = ["client", "macros", "server"] } metrics.workspace = true +metrics-derive.workspace = true thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true @@ -33,7 +34,6 @@ strum.workspace = true url.workspace = true futures-util.workspace = true http = "1.1.0" -metrics-derive = "0.1.0" [dev-dependencies] mockall.workspace = true diff --git a/crates/rpc/src/utils.rs b/crates/rpc/src/utils.rs index c6b67ee5f..24895aba3 100644 --- a/crates/rpc/src/utils.rs +++ b/crates/rpc/src/utils.rs @@ -40,7 +40,7 @@ where Ok(r) => r.map_err(Into::into), Err(_) => { let rpc_metric = RPCMetric::new_with_labels(&[("rpc_name", rpc_name)]); - rpc_metric.panic_count.increment(1_u64); + rpc_metric.panic_count.increment(1); tracing::error!("PANIC in RPC handler: {}", rpc_name); Err(EthRpcError::Internal(anyhow::anyhow!("internal error: panic, see logs")).into()) } diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 5b2eee4e2..5a7e979fe 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -30,7 +30,7 @@ thiserror.workspace = true metrics.workspace = true mockall = {workspace = true, optional = true } -metrics-derive = "0.1.0" +metrics-derive.workspace = true [dev-dependencies] rundler-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/types/src/task/metric_recorder.rs b/crates/types/src/task/metric_recorder.rs index d29c481e6..4fcd25232 100644 --- a/crates/types/src/task/metric_recorder.rs +++ b/crates/types/src/task/metric_recorder.rs @@ -27,25 +27,25 @@ pub struct MethodSessionLogger { } #[derive(Metrics)] -#[metrics(scope = "rundler_runtime")] +#[metrics(scope = "rpc_stats")] pub(crate) struct MethodMetrics { - #[metric(describe = "total num of requests.")] + #[metric(describe = "total count of requests.")] num_requests: Counter, - #[metric(describe = "opening requests.")] + #[metric(describe = "the number of opening requests.")] open_requests: Gauge, - #[metric(describe = "opening requests.")] + #[metric(describe = "the distribution of request latency.")] request_latency: Histogram, } #[derive(Metrics)] -#[metrics(scope = "rundler_runtime")] +#[metrics(scope = "rpc_stats")] pub(crate) struct MethodStatusMetrics { - #[metric(describe = "opening requests.")] + #[metric(describe = "the count of http response status.")] http_response_status: Counter, - #[metric(describe = "opening requests.")] + #[metric(describe = "the count of rpc response status.")] rpc_response_status: Counter, } @@ -75,24 +75,26 @@ impl MethodSessionLogger { /// record a rpc status code. pub fn record_rpc(&self, rpc_code: RpcCode) { - let method_status_metric = MethodStatusMetrics::new_with_labels(&[ + MethodStatusMetrics::new_with_labels(&[ ("method_name", self.method_name.clone()), ("service_name", self.service_name.clone()), ("protocol", self.protocol.clone()), ("status_code", rpc_code.to_string()), - ]); - method_status_metric.rpc_response_status.increment(1); + ]) + .rpc_response_status + .increment(1); } /// record a http status code. pub fn record_http(&self, http_code: HttpCode) { - let method_status_metric = MethodStatusMetrics::new_with_labels(&[ + MethodStatusMetrics::new_with_labels(&[ ("method_name", self.method_name.clone()), ("service_name", self.service_name.clone()), ("protocol", self.protocol.clone()), ("status_code", http_code.to_string()), - ]); - method_status_metric.http_response_status.increment(1); + ]) + .http_response_status + .increment(1); } /// end of the session. Record the session duration.