From a5cde2d80a2c61a28db2ac011c1e73d9f071d52b Mon Sep 17 00:00:00 2001 From: JP <36560907+0xfourzerofour@users.noreply.github.com> Date: Thu, 22 Feb 2024 21:46:03 -0500 Subject: [PATCH] feat(metrics): add internal rpc metrics (#618) --- Cargo.lock | 3 + crates/builder/src/task.rs | 11 +- crates/pool/src/task.rs | 7 +- crates/provider/Cargo.toml | 3 + .../provider/src/ethers/metrics_middleware.rs | 135 ++++++++++++++++++ crates/provider/src/ethers/mod.rs | 4 +- crates/provider/src/ethers/provider.rs | 40 +++++- crates/provider/src/lib.rs | 2 +- crates/rpc/src/task.rs | 9 +- crates/utils/src/eth.rs | 39 +---- 10 files changed, 198 insertions(+), 55 deletions(-) create mode 100644 crates/provider/src/ethers/metrics_middleware.rs diff --git a/Cargo.lock b/Cargo.lock index fedbd8eb2..7f20537b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4303,7 +4303,10 @@ dependencies = [ "anyhow", "async-trait", "ethers", + "metrics 0.22.1", "mockall", + "parse-display", + "reqwest", "rundler-types", "rundler-utils", "serde", diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index e9eb1c242..a69dc3b73 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -29,7 +29,7 @@ use rundler_sim::{ }; use rundler_task::Task; use rundler_types::chain::ChainSpec; -use rundler_utils::{emit::WithEntryPoint, eth, handle}; +use rundler_utils::{emit::WithEntryPoint, handle}; use rusoto_core::Region; use tokio::{ sync::{broadcast, mpsc}, @@ -123,7 +123,8 @@ where async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { info!("Mempool config: {:?}", self.args.mempool_configs); - let provider = eth::new_provider(&self.args.rpc_url, Some(self.args.eth_poll_interval))?; + let provider = + rundler_provider::new_provider(&self.args.rpc_url, Some(self.args.eth_poll_interval))?; let mut sender_handles = vec![]; let mut bundle_sender_actions = vec![]; @@ -275,8 +276,10 @@ where self.args.mempool_configs.clone(), ); - let submit_provider = - eth::new_provider(&self.args.submit_url, Some(self.args.eth_poll_interval))?; + let submit_provider = rundler_provider::new_provider( + &self.args.submit_url, + Some(self.args.eth_poll_interval), + )?; let transaction_sender = self.args.sender_type.into_sender( &self.args.chain_spec, diff --git a/crates/pool/src/task.rs b/crates/pool/src/task.rs index a4f6179a6..7a556d13c 100644 --- a/crates/pool/src/task.rs +++ b/crates/pool/src/task.rs @@ -22,7 +22,7 @@ use rundler_sim::{ }; use rundler_task::Task; use rundler_types::chain::ChainSpec; -use rundler_utils::{emit::WithEntryPoint, eth, handle}; +use rundler_utils::{emit::WithEntryPoint, handle}; use tokio::{sync::broadcast, try_join}; use tokio_util::sync::CancellationToken; @@ -78,7 +78,10 @@ impl Task for PoolTask { .map(|config| config.entry_point) .collect(), }; - let provider = eth::new_provider(&self.args.http_url, Some(self.args.http_poll_interval))?; + let provider = rundler_provider::new_provider( + &self.args.http_url, + Some(self.args.http_poll_interval), + )?; let chain = Chain::new(provider.clone(), chain_settings); let (update_sender, _) = broadcast::channel(self.args.chain_update_channel_capacity); let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone()); diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index f8ff285ea..efcf5032b 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -13,10 +13,13 @@ rundler-utils = { path = "../utils" } anyhow.workspace = true async-trait.workspace = true ethers.workspace = true +metrics.workspace = true +reqwest.workspace = true serde.workspace = true tokio.workspace = true thiserror.workspace = true tracing.workspace = true +parse-display.workspace = true mockall = {workspace = true, optional = true } diff --git a/crates/provider/src/ethers/metrics_middleware.rs b/crates/provider/src/ethers/metrics_middleware.rs new file mode 100644 index 000000000..30777c820 --- /dev/null +++ b/crates/provider/src/ethers/metrics_middleware.rs @@ -0,0 +1,135 @@ +// This file is part of Rundler. +// +// Rundler is free software: you can redistribute it and/or modify it under the +// terms of the GNU Lesser General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later version. +// +// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +// See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with Rundler. +// If not, see https://www.gnu.org/licenses/. + +use core::fmt::Debug; +use std::time::Duration; + +use async_trait::async_trait; +use ethers::providers::{HttpClientError, JsonRpcClient}; +use metrics::{counter, histogram}; +use parse_display::Display; +use reqwest::StatusCode; +use serde::{de::DeserializeOwned, Serialize}; +use tokio::time::Instant; + +#[derive(Display)] +#[display(style = "snake_case")] +enum RpcCode { + ServerError, + InternalError, + InvalidParams, + MethodNotFound, + InvalidRequest, + ParseError, + Success, + Other, +} + +#[derive(Display)] +#[display(style = "snake_case")] +enum HttpCode { + TwoHundreds, + FourHundreds, + FiveHundreds, + Other, +} + +#[derive(Debug)] +/// Metrics middleware struct to hold the inner http client +pub struct MetricsMiddleware { + inner: C, +} + +impl MetricsMiddleware +where + C: JsonRpcClient, +{ + /// Constructor for middleware + pub fn new(inner: C) -> Self { + Self { inner } + } + + fn instrument_request( + &self, + method: &str, + duration: Duration, + request: &Result, + ) { + let method_str = method.to_string(); + + let mut http_code = StatusCode::OK.as_u16() as u64; + let mut rpc_code = 0; + + if let Err(error) = request { + match error { + HttpClientError::ReqwestError(req_err) => { + http_code = req_err.status().unwrap_or_default().as_u16() as u64; + } + HttpClientError::JsonRpcError(rpc_err) => { + rpc_code = rpc_err.code; + } + _ => {} + } + } + + let http = match http_code { + x if (500..=599).contains(&x) => HttpCode::FiveHundreds, + x if (400..=499).contains(&x) => HttpCode::FourHundreds, + x if (200..=299).contains(&x) => HttpCode::TwoHundreds, + _ => HttpCode::Other, + }; + + let rpc = match rpc_code { + x if x == -32000 => RpcCode::ParseError, + x if x == -32600 => RpcCode::InvalidRequest, + x if x == -32601 => RpcCode::MethodNotFound, + x if x == -32602 => RpcCode::InvalidParams, + x if x == -32603 => RpcCode::InternalError, + x if (-32099..=-32000).contains(&x) => RpcCode::ServerError, + x if x >= 0 => RpcCode::Success, + _ => RpcCode::Other, + }; + + counter!( + "internal_http_response_code", + &[("method", method_str.clone()), ("status", http.to_string())] + ) + .increment(1); + + counter!( + "internal_rpc_response_code", + &[("method", method_str.clone()), ("status", rpc.to_string())] + ) + .increment(1); + + histogram!("internal_rpc_method_response_time", "method" => method_str).record(duration); + } +} + +#[async_trait] +impl> JsonRpcClient for MetricsMiddleware { + type Error = HttpClientError; + + async fn request(&self, method: &str, params: T) -> Result + where + T: Debug + Serialize + Send + Sync, + R: DeserializeOwned + Send, + { + let start_time = Instant::now(); + let result: Result = self.inner.request(method, params).await; + let duration = start_time.elapsed(); + self.instrument_request(method, duration, &result); + + result + } +} diff --git a/crates/provider/src/ethers/mod.rs b/crates/provider/src/ethers/mod.rs index 97733c59c..072d4bc8f 100644 --- a/crates/provider/src/ethers/mod.rs +++ b/crates/provider/src/ethers/mod.rs @@ -15,5 +15,5 @@ mod entry_point; pub use entry_point::EntryPointImpl as EthersEntryPoint; - -mod provider; +mod metrics_middleware; +pub(crate) mod provider; diff --git a/crates/provider/src/ethers/provider.rs b/crates/provider/src/ethers/provider.rs index ac22598e4..3b2c3acc9 100644 --- a/crates/provider/src/ethers/provider.rs +++ b/crates/provider/src/ethers/provider.rs @@ -11,7 +11,7 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc, time::Duration}; use anyhow::Context; use ethers::{ @@ -19,8 +19,8 @@ use ethers::{ contract::ContractError, prelude::ContractError as EthersContractError, providers::{ - JsonRpcClient, Middleware, Provider as EthersProvider, - ProviderError as EthersProviderError, RawCall, + Http, HttpRateLimitRetryPolicy, JsonRpcClient, Middleware, Provider as EthersProvider, + ProviderError as EthersProviderError, RawCall, RetryClient, RetryClientBuilder, }, types::{ spoof, transaction::eip2718::TypedTransaction, Address, Block, BlockId, BlockNumber, Bytes, @@ -29,6 +29,7 @@ use ethers::{ H256, U256, U64, }, }; +use reqwest::Url; use rundler_types::{ contracts::{ gas_price_oracle::GasPriceOracle, i_aggregator::IAggregator, i_entry_point::IEntryPoint, @@ -38,6 +39,7 @@ use rundler_types::{ }; use serde::{de::DeserializeOwned, Serialize}; +use super::metrics_middleware::MetricsMiddleware; use crate::{AggregatorOut, AggregatorSimOut, Provider, ProviderError, ProviderResult}; const ARBITRUM_NITRO_NODE_INTERFACE_ADDRESS: Address = H160([ @@ -321,3 +323,35 @@ fn get_revert_data(error: ProviderError) -> Result Err(error), } } + +/// Construct a new Ethers provider from a URL and a poll interval. +/// +/// Creates a provider with a retry client that retries 10 times, with an initial backoff of 500ms. +pub fn new_provider( + url: &str, + poll_interval: Option, +) -> anyhow::Result>>>> { + let parsed_url = Url::parse(url).context("provider url should be valid")?; + + let http_client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(1)) + .build() + .context("failed to build reqwest client")?; + let http = MetricsMiddleware::new(Http::new_with_client(parsed_url, http_client)); + + let client = RetryClientBuilder::default() + // these retries are if the server returns a 429 + .rate_limit_retries(10) + // these retries are if the connection is dubious + .timeout_retries(3) + .initial_backoff(Duration::from_millis(500)) + .build(http, Box::::default()); + + let mut provider = EthersProvider::new(client); + + if let Some(poll_interval) = poll_interval { + provider = provider.interval(poll_interval); + } + + Ok(Arc::new(provider)) +} diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index f6f45df15..1f2415d8f 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -22,7 +22,7 @@ //! A provider is a type that provides access to blockchain data and functions mod ethers; -pub use ethers::EthersEntryPoint; +pub use ethers::{provider::new_provider, EthersEntryPoint}; mod traits; pub use traits::{ diff --git a/crates/rpc/src/task.rs b/crates/rpc/src/task.rs index 20f65340f..54b05e6d4 100644 --- a/crates/rpc/src/task.rs +++ b/crates/rpc/src/task.rs @@ -15,7 +15,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::bail; use async_trait::async_trait; -use ethers::providers::{Http, Provider, RetryClient}; +use ethers::providers::{JsonRpcClient, Provider}; use jsonrpsee::{ server::{middleware::ProxyGetRequestLayer, ServerBuilder}, RpcModule, @@ -29,7 +29,6 @@ use rundler_task::{ Task, }; use rundler_types::chain::ChainSpec; -use rundler_utils::eth; use tokio_util::sync::CancellationToken; use tracing::info; @@ -88,7 +87,7 @@ where let addr: SocketAddr = format_socket_addr(&self.args.host, self.args.port).parse()?; tracing::info!("Starting rpc server on {}", addr); - let provider = eth::new_provider(&self.args.rpc_url, None)?; + let provider = rundler_provider::new_provider(&self.args.rpc_url, None)?; let ep = EthersEntryPoint::new(self.args.chain_spec.entry_point_address, provider.clone()); let mut module = RpcModule::new(()); @@ -148,9 +147,9 @@ where Box::new(self) } - fn attach_namespaces( + fn attach_namespaces( &self, - provider: Arc>>, + provider: Arc>, entry_point: E, module: &mut RpcModule<()>, ) -> anyhow::Result<()> { diff --git a/crates/utils/src/eth.rs b/crates/utils/src/eth.rs index 236816b67..22e821990 100644 --- a/crates/utils/src/eth.rs +++ b/crates/utils/src/eth.rs @@ -13,18 +13,12 @@ //! Utilities for working with an Ethereum-like chain via Ethers. -use std::{sync::Arc, time::Duration}; - -use anyhow::Context; use ethers::{ abi::{AbiDecode, RawLog}, contract::ContractError, - providers::{ - Http, HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient, RetryClientBuilder, - }, + providers::Middleware, types::{Address, Bytes, Log}, }; -use url::Url; /// Gets the revert data from a contract error if it is a revert error, /// otherwise returns the original error. @@ -51,37 +45,6 @@ pub fn parse_revert_message(revert_data: &[u8]) -> Option { .map(|err| err.reason) } -/// Construct a new Ethers provider from a URL and a poll interval. -/// -/// Creates a provider with a retry client that retries 10 times, with an initial backoff of 500ms. -pub fn new_provider( - url: &str, - poll_interval: Option, -) -> anyhow::Result>>> { - let parsed_url = Url::parse(url).context("provider url should be valid")?; - - let http_client = reqwest::Client::builder() - .connect_timeout(Duration::from_secs(1)) - .build() - .context("failed to build reqwest client")?; - let http = Http::new_with_client(parsed_url, http_client); - - let client = RetryClientBuilder::default() - // these retries are if the server returns a 429 - .rate_limit_retries(10) - // these retries are if the connection is dubious - .timeout_retries(3) - .initial_backoff(Duration::from_millis(500)) - .build(http, Box::::default()); - - let mut provider = Provider::new(client); - if let Some(poll_interval) = poll_interval { - provider = provider.interval(poll_interval); - } - - Ok(Arc::new(provider)) -} - /// Converts an ethers `Log` into an ethabi `RawLog`. pub fn log_to_raw_log(log: Log) -> RawLog { let Log { topics, data, .. } = log;