Skip to content

Commit

Permalink
feat(metrics): add internal rpc metrics (#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xfourzerofour authored Feb 23, 2024
1 parent 475b47d commit a5cde2d
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 55 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions crates/builder/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use rundler_sim::{
};
use rundler_task::Task;
use rundler_types::chain::ChainSpec;
use rundler_utils::{emit::WithEntryPoint, eth, handle};
use rundler_utils::{emit::WithEntryPoint, handle};
use rusoto_core::Region;
use tokio::{
sync::{broadcast, mpsc},
Expand Down Expand Up @@ -123,7 +123,8 @@ where
async fn run(mut self: Box<Self>, shutdown_token: CancellationToken) -> anyhow::Result<()> {
info!("Mempool config: {:?}", self.args.mempool_configs);

let provider = eth::new_provider(&self.args.rpc_url, Some(self.args.eth_poll_interval))?;
let provider =
rundler_provider::new_provider(&self.args.rpc_url, Some(self.args.eth_poll_interval))?;

let mut sender_handles = vec![];
let mut bundle_sender_actions = vec![];
Expand Down Expand Up @@ -275,8 +276,10 @@ where
self.args.mempool_configs.clone(),
);

let submit_provider =
eth::new_provider(&self.args.submit_url, Some(self.args.eth_poll_interval))?;
let submit_provider = rundler_provider::new_provider(
&self.args.submit_url,
Some(self.args.eth_poll_interval),
)?;

let transaction_sender = self.args.sender_type.into_sender(
&self.args.chain_spec,
Expand Down
7 changes: 5 additions & 2 deletions crates/pool/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use rundler_sim::{
};
use rundler_task::Task;
use rundler_types::chain::ChainSpec;
use rundler_utils::{emit::WithEntryPoint, eth, handle};
use rundler_utils::{emit::WithEntryPoint, handle};
use tokio::{sync::broadcast, try_join};
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -78,7 +78,10 @@ impl Task for PoolTask {
.map(|config| config.entry_point)
.collect(),
};
let provider = eth::new_provider(&self.args.http_url, Some(self.args.http_poll_interval))?;
let provider = rundler_provider::new_provider(
&self.args.http_url,
Some(self.args.http_poll_interval),
)?;
let chain = Chain::new(provider.clone(), chain_settings);
let (update_sender, _) = broadcast::channel(self.args.chain_update_channel_capacity);
let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone());
Expand Down
3 changes: 3 additions & 0 deletions crates/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ rundler-utils = { path = "../utils" }
anyhow.workspace = true
async-trait.workspace = true
ethers.workspace = true
metrics.workspace = true
reqwest.workspace = true
serde.workspace = true
tokio.workspace = true
thiserror.workspace = true
tracing.workspace = true
parse-display.workspace = true

mockall = {workspace = true, optional = true }

Expand Down
135 changes: 135 additions & 0 deletions crates/provider/src/ethers/metrics_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// This file is part of Rundler.
//
// Rundler is free software: you can redistribute it and/or modify it under the
// terms of the GNU Lesser General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later version.
//
// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use core::fmt::Debug;
use std::time::Duration;

use async_trait::async_trait;
use ethers::providers::{HttpClientError, JsonRpcClient};
use metrics::{counter, histogram};
use parse_display::Display;
use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Serialize};
use tokio::time::Instant;

#[derive(Display)]
#[display(style = "snake_case")]
enum RpcCode {
ServerError,
InternalError,
InvalidParams,
MethodNotFound,
InvalidRequest,
ParseError,
Success,
Other,
}

#[derive(Display)]
#[display(style = "snake_case")]
enum HttpCode {
TwoHundreds,
FourHundreds,
FiveHundreds,
Other,
}

#[derive(Debug)]
/// Metrics middleware struct to hold the inner http client
pub struct MetricsMiddleware<C> {
inner: C,
}

impl<C> MetricsMiddleware<C>
where
C: JsonRpcClient<Error = HttpClientError>,
{
/// Constructor for middleware
pub fn new(inner: C) -> Self {
Self { inner }
}

fn instrument_request<R>(
&self,
method: &str,
duration: Duration,
request: &Result<R, C::Error>,
) {
let method_str = method.to_string();

let mut http_code = StatusCode::OK.as_u16() as u64;
let mut rpc_code = 0;

if let Err(error) = request {
match error {
HttpClientError::ReqwestError(req_err) => {
http_code = req_err.status().unwrap_or_default().as_u16() as u64;
}
HttpClientError::JsonRpcError(rpc_err) => {
rpc_code = rpc_err.code;
}
_ => {}
}
}

let http = match http_code {
x if (500..=599).contains(&x) => HttpCode::FiveHundreds,
x if (400..=499).contains(&x) => HttpCode::FourHundreds,
x if (200..=299).contains(&x) => HttpCode::TwoHundreds,
_ => HttpCode::Other,
};

let rpc = match rpc_code {
x if x == -32000 => RpcCode::ParseError,
x if x == -32600 => RpcCode::InvalidRequest,
x if x == -32601 => RpcCode::MethodNotFound,
x if x == -32602 => RpcCode::InvalidParams,
x if x == -32603 => RpcCode::InternalError,
x if (-32099..=-32000).contains(&x) => RpcCode::ServerError,
x if x >= 0 => RpcCode::Success,
_ => RpcCode::Other,
};

counter!(
"internal_http_response_code",
&[("method", method_str.clone()), ("status", http.to_string())]
)
.increment(1);

counter!(
"internal_rpc_response_code",
&[("method", method_str.clone()), ("status", rpc.to_string())]
)
.increment(1);

histogram!("internal_rpc_method_response_time", "method" => method_str).record(duration);
}
}

#[async_trait]
impl<C: JsonRpcClient<Error = HttpClientError>> JsonRpcClient for MetricsMiddleware<C> {
type Error = HttpClientError;

async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send,
{
let start_time = Instant::now();
let result: Result<R, C::Error> = self.inner.request(method, params).await;
let duration = start_time.elapsed();
self.instrument_request(method, duration, &result);

result
}
}
4 changes: 2 additions & 2 deletions crates/provider/src/ethers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
mod entry_point;
pub use entry_point::EntryPointImpl as EthersEntryPoint;

mod provider;
mod metrics_middleware;
pub(crate) mod provider;
40 changes: 37 additions & 3 deletions crates/provider/src/ethers/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::{fmt::Debug, sync::Arc};
use std::{fmt::Debug, sync::Arc, time::Duration};

use anyhow::Context;
use ethers::{
abi::{AbiDecode, AbiEncode},
contract::ContractError,
prelude::ContractError as EthersContractError,
providers::{
JsonRpcClient, Middleware, Provider as EthersProvider,
ProviderError as EthersProviderError, RawCall,
Http, HttpRateLimitRetryPolicy, JsonRpcClient, Middleware, Provider as EthersProvider,
ProviderError as EthersProviderError, RawCall, RetryClient, RetryClientBuilder,
},
types::{
spoof, transaction::eip2718::TypedTransaction, Address, Block, BlockId, BlockNumber, Bytes,
Expand All @@ -29,6 +29,7 @@ use ethers::{
H256, U256, U64,
},
};
use reqwest::Url;
use rundler_types::{
contracts::{
gas_price_oracle::GasPriceOracle, i_aggregator::IAggregator, i_entry_point::IEntryPoint,
Expand All @@ -38,6 +39,7 @@ use rundler_types::{
};
use serde::{de::DeserializeOwned, Serialize};

use super::metrics_middleware::MetricsMiddleware;
use crate::{AggregatorOut, AggregatorSimOut, Provider, ProviderError, ProviderResult};

const ARBITRUM_NITRO_NODE_INTERFACE_ADDRESS: Address = H160([
Expand Down Expand Up @@ -321,3 +323,35 @@ fn get_revert_data<D: AbiDecode>(error: ProviderError) -> Result<D, ProviderErro
None => Err(error),
}
}

/// Construct a new Ethers provider from a URL and a poll interval.
///
/// Creates a provider with a retry client that retries 10 times, with an initial backoff of 500ms.
pub fn new_provider(
url: &str,
poll_interval: Option<Duration>,
) -> anyhow::Result<Arc<EthersProvider<RetryClient<MetricsMiddleware<Http>>>>> {
let parsed_url = Url::parse(url).context("provider url should be valid")?;

let http_client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(1))
.build()
.context("failed to build reqwest client")?;
let http = MetricsMiddleware::new(Http::new_with_client(parsed_url, http_client));

let client = RetryClientBuilder::default()
// these retries are if the server returns a 429
.rate_limit_retries(10)
// these retries are if the connection is dubious
.timeout_retries(3)
.initial_backoff(Duration::from_millis(500))
.build(http, Box::<HttpRateLimitRetryPolicy>::default());

let mut provider = EthersProvider::new(client);

if let Some(poll_interval) = poll_interval {
provider = provider.interval(poll_interval);
}

Ok(Arc::new(provider))
}
2 changes: 1 addition & 1 deletion crates/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! A provider is a type that provides access to blockchain data and functions
mod ethers;
pub use ethers::EthersEntryPoint;
pub use ethers::{provider::new_provider, EthersEntryPoint};

mod traits;
pub use traits::{
Expand Down
9 changes: 4 additions & 5 deletions crates/rpc/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};

use anyhow::bail;
use async_trait::async_trait;
use ethers::providers::{Http, Provider, RetryClient};
use ethers::providers::{JsonRpcClient, Provider};
use jsonrpsee::{
server::{middleware::ProxyGetRequestLayer, ServerBuilder},
RpcModule,
Expand All @@ -29,7 +29,6 @@ use rundler_task::{
Task,
};
use rundler_types::chain::ChainSpec;
use rundler_utils::eth;
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand Down Expand Up @@ -88,7 +87,7 @@ where
let addr: SocketAddr = format_socket_addr(&self.args.host, self.args.port).parse()?;
tracing::info!("Starting rpc server on {}", addr);

let provider = eth::new_provider(&self.args.rpc_url, None)?;
let provider = rundler_provider::new_provider(&self.args.rpc_url, None)?;
let ep = EthersEntryPoint::new(self.args.chain_spec.entry_point_address, provider.clone());

let mut module = RpcModule::new(());
Expand Down Expand Up @@ -148,9 +147,9 @@ where
Box::new(self)
}

fn attach_namespaces<E: EntryPoint + Clone>(
fn attach_namespaces<E: EntryPoint + Clone, C: JsonRpcClient + 'static>(
&self,
provider: Arc<Provider<RetryClient<Http>>>,
provider: Arc<Provider<C>>,
entry_point: E,
module: &mut RpcModule<()>,
) -> anyhow::Result<()> {
Expand Down
39 changes: 1 addition & 38 deletions crates/utils/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,12 @@

//! Utilities for working with an Ethereum-like chain via Ethers.
use std::{sync::Arc, time::Duration};

use anyhow::Context;
use ethers::{
abi::{AbiDecode, RawLog},
contract::ContractError,
providers::{
Http, HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient, RetryClientBuilder,
},
providers::Middleware,
types::{Address, Bytes, Log},
};
use url::Url;

/// Gets the revert data from a contract error if it is a revert error,
/// otherwise returns the original error.
Expand All @@ -51,37 +45,6 @@ pub fn parse_revert_message(revert_data: &[u8]) -> Option<String> {
.map(|err| err.reason)
}

/// Construct a new Ethers provider from a URL and a poll interval.
///
/// Creates a provider with a retry client that retries 10 times, with an initial backoff of 500ms.
pub fn new_provider(
url: &str,
poll_interval: Option<Duration>,
) -> anyhow::Result<Arc<Provider<RetryClient<Http>>>> {
let parsed_url = Url::parse(url).context("provider url should be valid")?;

let http_client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(1))
.build()
.context("failed to build reqwest client")?;
let http = Http::new_with_client(parsed_url, http_client);

let client = RetryClientBuilder::default()
// these retries are if the server returns a 429
.rate_limit_retries(10)
// these retries are if the connection is dubious
.timeout_retries(3)
.initial_backoff(Duration::from_millis(500))
.build(http, Box::<HttpRateLimitRetryPolicy>::default());

let mut provider = Provider::new(client);
if let Some(poll_interval) = poll_interval {
provider = provider.interval(poll_interval);
}

Ok(Arc::new(provider))
}

/// Converts an ethers `Log` into an ethabi `RawLog`.
pub fn log_to_raw_log(log: Log) -> RawLog {
let Log { topics, data, .. } = log;
Expand Down

0 comments on commit a5cde2d

Please sign in to comment.