Skip to content

Commit

Permalink
fix: impl timeout layer.
Browse files Browse the repository at this point in the history
  • Loading branch information
andysim3d committed Dec 5, 2024
1 parent d283ce1 commit a2128e2
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async-trait.workspace = true
auto_impl.workspace = true
const-hex.workspace = true
futures-util.workspace = true
pin-project.workspace = true
reqwest.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions crates/provider/src/alloy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ where
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(RpcCode::Success);
}
// for timeout error
alloy_json_rpc::RpcError::LocalUsageError(_) => {
method_logger.record_http(HttpCode::FourHundreds);
method_logger.record_rpc(RpcCode::ClientSideTimeout);
}
_ => {}
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/provider/src/alloy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::time::Duration;

use alloy_provider::{Provider as AlloyProvider, ProviderBuilder};
use alloy_rpc_client::ClientBuilder;
use alloy_transport::layers::RetryBackoffService;
use alloy_transport_http::Http;
use anyhow::Context;
use evm::AlloyEvmProvider;
use metrics::{AlloyMetricLayer, AlloyMetricMiddleware};
use provider_timeout::{ProviderTimeout, ProviderTimeoutLayer};
use reqwest::Client;
use url::Url;

Expand All @@ -28,6 +31,7 @@ pub use da::new_alloy_da_gas_oracle;
pub(crate) mod entry_point;
pub(crate) mod evm;
pub(crate) mod metrics;
mod provider_timeout;

/// Create a new alloy evm provider from a given RPC URL
pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result<impl EvmProvider + Clone> {
Expand All @@ -39,15 +43,19 @@ pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result<impl EvmProvider
pub fn new_alloy_provider(
rpc_url: &str,
) -> anyhow::Result<
impl AlloyProvider<RetryBackoffService<AlloyMetricMiddleware<Http<Client>>>> + Clone,
impl AlloyProvider<RetryBackoffService<AlloyMetricMiddleware<ProviderTimeout<Http<Client>>>>>
+ Clone,
> {
let url = Url::parse(rpc_url).context("invalid rpc url")?;
let metric_layer = AlloyMetricLayer::default();
// TODO: make this configurable: use a large number for CUPS for now
let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 1_000_000);
// add a timeout layer here.
let timeout_layer = ProviderTimeoutLayer::new(Duration::from_secs(10));
let client = ClientBuilder::default()
.layer(retry_layer)
.layer(metric_layer)
.layer(timeout_layer)
.http(url);
let provider = ProviderBuilder::new().on_client(client);
Ok(provider)
Expand Down
137 changes: 137 additions & 0 deletions crates/provider/src/alloy/provider_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//! Middleware that applies a timeout to requests.
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_transport::TransportError;
use pin_project::pin_project;
use tokio::time::Sleep;
use tower::{Layer, Service};

/// Applies a timeout to requests via the supplied inner service.
#[derive(Debug, Clone)]
pub(crate) struct ProviderTimeoutLayer {
timeout: Duration,
}

impl ProviderTimeoutLayer {
/// Create a timeout from a duration
pub(crate) fn new(timeout: Duration) -> Self {
ProviderTimeoutLayer { timeout }
}
}

impl<S> Layer<S> for ProviderTimeoutLayer
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError> + Sync,
{
type Service = ProviderTimeout<S>;

fn layer(&self, service: S) -> Self::Service {
ProviderTimeout::new(service, self.timeout)
}
}

/// Applies a timeout to requests.
#[derive(Debug)]
pub struct ProviderTimeout<S> {
service: S,
timeout: Duration,
}

// ===== impl Timeout =====

impl<S> ProviderTimeout<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError> + Sync,
{
/// Creates a new [`Timeout`]
pub const fn new(service: S, timeout: Duration) -> Self {
ProviderTimeout { service, timeout }
}
}

impl<S> Clone for ProviderTimeout<S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
timeout: self.timeout,
}
}
}
impl<S> Service<RequestPacket> for ProviderTimeout<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError>
+ Sync
+ Send
+ Clone
+ 'static,
S::Future: Send,
{
type Response = S::Response;
type Error = TransportError;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.service.poll_ready(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)),
}
}

fn call(&mut self, request: RequestPacket) -> Self::Future {
let response = self.service.call(request);
let sleep = tokio::time::sleep(self.timeout);
ResponseFuture::new(response, sleep)
}
}

#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
response: T,
#[pin]
sleep: Sleep,
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(response: T, sleep: Sleep) -> Self {
ResponseFuture { response, sleep }
}
}

impl<F, T> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, TransportError>>,
{
type Output = Result<T, TransportError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

// First, try polling the future
match this.response.poll(cx) {
Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)),
Poll::Pending => {}
}
// Now check the sleep
match this.sleep.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(TransportError::local_usage_str(
"provider request timeout from client side",
))),
}
}
}
1 change: 1 addition & 0 deletions crates/types/src/task/status_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum RpcCode {
Other,
InvalidParams,
DeadlineExceed,
ClientSideTimeout,
MethodNotFound,
AlreadyExist,
PermissionDenied,
Expand Down

0 comments on commit a2128e2

Please sign in to comment.