Skip to content

Commit

Permalink
feat(middleware): add response extractor. (#816)
Browse files Browse the repository at this point in the history
  • Loading branch information
andysim3d authored Oct 8, 2024
1 parent f3954e2 commit 81d88be
Show file tree
Hide file tree
Showing 20 changed files with 643 additions and 250 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ serde = "1.0.210"
serde_json = "1.0.128"
rand = "0.8.5"
reqwest = { version = "0.12.8", default-features = false, features = ["rustls-tls"] }
rustls = "0.23.13"
thiserror = "1.0.64"
tokio = { version = "1.39.3", default-features = false, features = ["rt", "sync", "time"]}
tokio-util = "0.7.12"
Expand Down
8 changes: 2 additions & 6 deletions crates/pool/src/server/remote/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use alloy_primitives::{Address, B256};
use async_trait::async_trait;
use futures_util::StreamExt;
use rundler_task::{
grpc::{grpc_metrics::HttpMethodExtractor, protos::from_bytes},
metrics::MetricsLayer,
grpc::{grpc_metrics::GrpcMetricsLayer, protos::from_bytes},
GracefulShutdown, TaskSpawner,
};
use rundler_types::{
Expand Down Expand Up @@ -83,10 +82,7 @@ pub(crate) async fn remote_mempool_server_task(
.set_serving::<OpPoolServer<OpPoolImpl>>()
.await;

let metrics_layer = MetricsLayer::<HttpMethodExtractor, _>::new(
"op_pool_service".to_string(),
"http-grpc".to_string(),
);
let metrics_layer = GrpcMetricsLayer::new("op_pool_service".to_string());

if let Err(e) = Server::builder()
.layer(metrics_layer)
Expand Down
3 changes: 3 additions & 0 deletions crates/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ anyhow.workspace = true
async-trait.workspace = true
auto_impl.workspace = true
thiserror.workspace = true
futures-util.workspace = true
tower.workspace = true
tracing.workspace = true
url.workspace = true

mockall = {workspace = true, optional = true }

[features]
Expand Down
168 changes: 153 additions & 15 deletions crates/provider/src/alloy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,160 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use alloy_json_rpc::RequestPacket;
/// Method extractor
use rundler_types::task::traits::RequestExtractor;

/// Method extractor for Alloy providers
#[derive(Clone, Copy)]
pub struct AlloyMethodExtractor;

impl RequestExtractor<RequestPacket> for AlloyMethodExtractor {
fn get_method_name(req: &RequestPacket) -> String {
match req {
RequestPacket::Single(request) => request.method().to_string(),
_ => {
// can't extract method name for batch.
"batch".to_string()
use std::task::{Context, Poll};

use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_transport::{BoxFuture, HttpError, TransportError, TransportErrorKind};
use futures_util::FutureExt;
use rundler_types::task::{
metric_recorder::MethodSessionLogger,
status_code::{HttpCode, RpcCode},
};
use tower::{Layer, Service};

/// Alloy provider metric layer.
#[derive(Default)]
pub(crate) struct AlloyMetricLayer {}

impl AlloyMetricLayer {}

impl<S> Layer<S> for AlloyMetricLayer
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError> + Sync,
{
type Service = AlloyMetricMiddleware<S>;

fn layer(&self, service: S) -> Self::Service {
AlloyMetricMiddleware::new(service)
}
}

pub struct AlloyMetricMiddleware<S> {
service: S,
}

impl<S> AlloyMetricMiddleware<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError> + Sync,
{
/// carete an alloy provider metric layer.
pub fn new(service: S) -> Self {
Self { service }
}
}

impl<S> Clone for AlloyMetricMiddleware<S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
}
}
}

impl<S> Service<RequestPacket> for AlloyMetricMiddleware<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError>
+ Sync
+ Send
+ Clone
+ 'static,
S::Future: Send,
{
type Response = ResponsePacket;
type Error = TransportError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, request: RequestPacket) -> Self::Future {
let method_name = get_method_name(&request);
let method_logger = MethodSessionLogger::start(
"alloy_provider_client".to_string(),
method_name,
"rpc".to_string(),
);
let mut svc = self.service.clone();
async move {
let response = svc.call(request).await;
method_logger.done();
match &response {
Ok(resp) => {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(get_rpc_status_code(resp));
}
Err(e) => match e {
alloy_json_rpc::RpcError::ErrorResp(rpc_error) => {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(get_rpc_status_from_code(rpc_error.code));
}
alloy_json_rpc::RpcError::Transport(TransportErrorKind::HttpError(
HttpError { status, body: _ },
)) => {
method_logger.record_http(get_http_status_from_code(*status));
}
alloy_json_rpc::RpcError::NullResp => {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(RpcCode::Success);
}
_ => {}
},
}
response
}
.boxed()
}
}

/// Get the method name from the request
fn get_method_name(req: &RequestPacket) -> String {
match req {
RequestPacket::Single(request) => request.method().to_string(),
RequestPacket::Batch(_) => {
// can't extract method name for batch.
"batch".to_string()
}
}
}

fn get_rpc_status_from_code(code: i64) -> RpcCode {
match code {
-32700 => RpcCode::ParseError,
-32600 => RpcCode::InvalidRequest,
-32601 => RpcCode::MethodNotFound,
-32602 => RpcCode::InvalidParams,
-32603 => RpcCode::InternalError,
x if (-32099..=-32000).contains(&x) => RpcCode::ServerError,
_ => RpcCode::Other,
}
}

fn get_http_status_from_code(code: u16) -> HttpCode {
match code {
x if (200..=299).contains(&x) => HttpCode::TwoHundreds,
x if (400..=499).contains(&x) => HttpCode::FourHundreds,
x if (500..=599).contains(&x) => HttpCode::FiveHundreds,
_ => HttpCode::Other,
}
}

fn get_rpc_status_code(response_packet: &ResponsePacket) -> RpcCode {
let response: &alloy_json_rpc::Response = match response_packet {
ResponsePacket::Batch(resps) => {
if resps.is_empty() {
return RpcCode::Success;
}
&resps[0]
}
ResponsePacket::Single(resp) => resp,
};
let response_code: i64 = match &response.payload {
alloy_json_rpc::ResponsePayload::Success(_) => 0,
alloy_json_rpc::ResponsePayload::Failure(error_payload) => error_payload.code,
};
get_rpc_status_from_code(response_code)
}
16 changes: 13 additions & 3 deletions crates/provider/src/alloy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

use alloy_provider::{Provider as AlloyProvider, ProviderBuilder};
use alloy_rpc_client::ClientBuilder;
use alloy_transport::layers::RetryBackoffService;
use alloy_transport_http::Http;
use anyhow::Context;
use evm::AlloyEvmProvider;
use metrics::{AlloyMetricLayer, AlloyMetricMiddleware};
use reqwest::Client;
use url::Url;

Expand All @@ -34,8 +36,16 @@ pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result<impl EvmProvider
/// Create a new alloy provider from a given RPC URL
pub fn new_alloy_provider(
rpc_url: &str,
) -> anyhow::Result<impl AlloyProvider<Http<Client>> + Clone> {
) -> anyhow::Result<
impl AlloyProvider<RetryBackoffService<AlloyMetricMiddleware<Http<Client>>>> + Clone,
> {
let url = Url::parse(rpc_url).context("invalid rpc url")?;
let client = ClientBuilder::default().http(url);
Ok(ProviderBuilder::new().on_client(client))
let metric_layer = AlloyMetricLayer::default();
let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 0);
let client = ClientBuilder::default()
.layer(retry_layer)
.layer(metric_layer)
.http(url);
let provider = ProviderBuilder::new().on_client(client);
Ok(provider)
}
1 change: 0 additions & 1 deletion crates/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub use alloy::{
},
},
evm::AlloyEvmProvider,
metrics::AlloyMethodExtractor,
new_alloy_evm_provider, new_alloy_provider,
};

Expand Down
1 change: 1 addition & 0 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ serde.workspace = true
strum.workspace = true
url.workspace = true
futures-util.workspace = true
http = "1.1.0"

[dev-dependencies]
mockall.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ pub use rundler::{RundlerApiClient, Settings as RundlerApiSettings};
mod task;
pub use task::{Args as RpcTaskArgs, RpcTask};

mod rpc_metrics;
mod types;
mod utils;
Loading

0 comments on commit 81d88be

Please sign in to comment.