From a6af3b2e8fe7f16ab88189d66b54732569c17401 Mon Sep 17 00:00:00 2001 From: Dan Coombs Date: Wed, 18 Dec 2024 19:39:11 -0600 Subject: [PATCH 1/4] fix(pool): fix pool candidates metrics to align with builder (#943) --- crates/pool/src/mempool/pool.rs | 58 +++++++++++------------------- crates/pool/src/mempool/uo_pool.rs | 39 ++++++++------------ crates/sim/src/lib.rs | 4 +-- crates/sim/src/precheck.rs | 42 +++++++++++++--------- 4 files changed, 61 insertions(+), 82 deletions(-) diff --git a/crates/pool/src/mempool/pool.rs b/crates/pool/src/mempool/pool.rs index 84d9a09bb..c285c2ccf 100644 --- a/crates/pool/src/mempool/pool.rs +++ b/crates/pool/src/mempool/pool.rs @@ -12,7 +12,7 @@ // If not, see https://www.gnu.org/licenses/. use std::{ - cmp::{self, Ordering}, + cmp::Ordering, collections::{hash_map::Entry, BTreeSet, HashMap, HashSet}, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -24,11 +24,12 @@ use metrics::{Gauge, Histogram}; use metrics_derive::Metrics; use parking_lot::RwLock; use rundler_provider::DAGasOracleSync; +use rundler_sim::FeeUpdate; use rundler_types::{ chain::ChainSpec, da::DAGasBlockData, pool::{MempoolError, PoolOperation}, - Entity, EntityType, GasFees, Timestamp, UserOperation, UserOperationId, UserOperationVariant, + Entity, EntityType, Timestamp, UserOperation, UserOperationId, UserOperationVariant, }; use rundler_utils::{emit::WithEntryPoint, math}; use tokio::sync::broadcast; @@ -207,7 +208,6 @@ where )); let hash = self.add_operation_internal(pool_op)?; - self.update_metrics(); Ok(hash) } @@ -233,8 +233,7 @@ where block_number: u64, block_timestamp: Timestamp, block_da_data: Option<&DAGasBlockData>, - candidate_gas_fees: GasFees, - base_fee: u128, + gas_fees: FeeUpdate, ) { let sys_block_time = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -242,7 +241,6 @@ where let block_delta_time = sys_block_time.saturating_sub(self.prev_sys_block_time); let block_delta_height = block_number.saturating_sub(self.prev_block_number); - let candidate_gas_price = base_fee + candidate_gas_fees.max_priority_fee_per_gas; let mut expired = Vec::new(); let mut num_candidates = 0; let mut events = vec![]; @@ -266,7 +264,7 @@ where let required_da_gas = da_gas_oracle.calc_da_gas_sync( &op.po.da_gas_data, block_da_data, - op.uo().gas_price(base_fee), + op.uo().gas_price(gas_fees.base_fee), ); let required_pvg = op.uo().required_pre_verification_gas( @@ -298,11 +296,9 @@ where } } - let uo_gas_price = cmp::min( - op.uo().max_fee_per_gas(), - op.uo().max_priority_fee_per_gas() + base_fee, - ); - if candidate_gas_price > uo_gas_price { + if op.uo().max_fee_per_gas() < gas_fees.uo_fees.max_fee_per_gas + || op.uo().max_priority_fee_per_gas() < gas_fees.uo_fees.max_priority_fee_per_gas + { // don't mark as ineligible, but also not a candidate continue; } @@ -324,6 +320,7 @@ where self.metrics.num_candidates.set(num_candidates as f64); self.prev_block_number = block_number; self.prev_sys_block_time = sys_block_time; + self.update_metrics(); } pub(crate) fn address_count(&self, address: &Address) -> usize { @@ -343,9 +340,7 @@ where } pub(crate) fn remove_operation_by_hash(&mut self, hash: B256) -> Option> { - let ret = self.remove_operation_internal(hash, None); - self.update_metrics(); - ret + self.remove_operation_internal(hash, None) } // STO-040 @@ -428,10 +423,7 @@ where .uo() .hash(mined_op.entry_point, self.config.chain_spec.id); - let ret = self.remove_operation_internal(hash, Some(block_number)); - - self.update_metrics(); - ret + self.remove_operation_internal(hash, Some(block_number)) } pub(crate) fn unmine_operation(&mut self, mined_op: &MinedOp) -> Option> { @@ -440,10 +432,9 @@ where self.mined_hashes_with_block_numbers .remove(&(block_number, hash)); - if let Err(error) = self.put_back_unmined_operation(op.clone()) { + if let Err(error) = self.add_operation_internal(op.clone()) { info!("Could not put back unmined operation: {error}"); }; - self.update_metrics(); Some(op.po.clone()) } @@ -479,7 +470,6 @@ where for &hash in &to_remove { self.remove_operation_internal(hash, None); } - self.update_metrics(); to_remove } @@ -495,7 +485,6 @@ where for &hash in &to_remove { self.remove_operation_internal(hash, None); } - self.update_metrics(); to_remove } @@ -510,7 +499,6 @@ where } self.mined_hashes_with_block_numbers.remove(&(bn, hash)); } - self.update_metrics(); } pub(crate) fn clear(&mut self) { @@ -546,10 +534,6 @@ where Ok(removed) } - fn put_back_unmined_operation(&mut self, op: Arc) -> MempoolResult { - self.add_operation_internal(op) - } - fn add_operation_internal( &mut self, pool_op: Arc, @@ -592,6 +576,7 @@ where Err(MempoolError::DiscardedOnInsert)?; } + self.update_metrics(); Ok(hash) } @@ -619,6 +604,7 @@ where } self.pool_size -= op.mem_size(); + self.update_metrics(); Some(op.po.clone()) } @@ -1201,7 +1187,7 @@ mod tests { po1.valid_time_range.valid_until = Timestamp::from(1); let hash = pool.add_operation(po1.clone(), 0).unwrap(); - pool.do_maintenance(0, Timestamp::from(2), None, GasFees::default(), 0); + pool.do_maintenance(0, Timestamp::from(2), None, FeeUpdate::default()); assert_eq!(None, pool.get_operation_by_hash(hash)); } @@ -1221,7 +1207,7 @@ mod tests { po3.valid_time_range.valid_until = 9.into(); let hash3 = pool.add_operation(po3.clone(), 0).unwrap(); - pool.do_maintenance(0, Timestamp::from(10), None, GasFees::default(), 0); + pool.do_maintenance(0, Timestamp::from(10), None, FeeUpdate::default()); assert_eq!(None, pool.get_operation_by_hash(hash1)); assert!(pool.get_operation_by_hash(hash2).is_some()); @@ -1271,8 +1257,7 @@ mod tests { 0, 0.into(), Some(&DAGasBlockData::default()), - GasFees::default(), - 0, + FeeUpdate::default(), ); assert_eq!(pool.best_operations().collect::>().len(), 1); // UO is now eligible @@ -1307,8 +1292,7 @@ mod tests { 0, 0.into(), Some(&DAGasBlockData::default()), - GasFees::default(), - 0, + FeeUpdate::default(), ); assert_eq!(pool.best_operations().collect::>().len(), 0); @@ -1343,8 +1327,7 @@ mod tests { 0, 0.into(), Some(&DAGasBlockData::default()), - GasFees::default(), - 0, + FeeUpdate::default(), ); assert_eq!(pool.best_operations().collect::>().len(), 1); @@ -1382,8 +1365,7 @@ mod tests { 0, 0.into(), Some(&DAGasBlockData::default()), - GasFees::default(), - base_fee, + FeeUpdate::default(), ); assert_eq!(pool.best_operations().collect::>().len(), 0); diff --git a/crates/pool/src/mempool/uo_pool.rs b/crates/pool/src/mempool/uo_pool.rs index b0d8f28e5..7278a68e6 100644 --- a/crates/pool/src/mempool/uo_pool.rs +++ b/crates/pool/src/mempool/uo_pool.rs @@ -23,13 +23,13 @@ use parking_lot::RwLock; use rundler_provider::{ DAGasOracleSync, EvmProvider, ProvidersWithEntryPointT, SimulationProvider, StateOverride, }; -use rundler_sim::{Prechecker, Simulator}; +use rundler_sim::{FeeUpdate, Prechecker, Simulator}; use rundler_types::{ pool::{ MempoolError, PaymasterMetadata, PoolOperation, Reputation, ReputationStatus, StakeStatus, }, - Entity, EntityUpdate, EntityUpdateType, EntryPointVersion, GasFees, UserOperation, - UserOperationId, UserOperationVariant, + Entity, EntityUpdate, EntityUpdateType, EntryPointVersion, UserOperation, UserOperationId, + UserOperationVariant, }; use rundler_utils::emit::WithEntryPoint; use tokio::sync::broadcast; @@ -67,8 +67,7 @@ struct UoPoolState { throttled_ops: HashSet, block_number: u64, block_hash: B256, - gas_fees: GasFees, - base_fee: u128, + gas_fees: FeeUpdate, } impl UoPool @@ -95,8 +94,7 @@ where throttled_ops: HashSet::new(), block_number: 0, block_hash: B256::ZERO, - gas_fees: GasFees::default(), - base_fee: 0, + gas_fees: FeeUpdate::default(), }), reputation, paymaster, @@ -347,15 +345,15 @@ where // update required bundle fees and update metrics match self.pool_providers.prechecker().update_fees().await { - Ok((bundle_fees, base_fee)) => { - let max_fee = match format_units(bundle_fees.max_fee_per_gas, "gwei") { + Ok(fees) => { + let max_fee = match format_units(fees.bundle_fees.max_fee_per_gas, "gwei") { Ok(s) => s.parse::().unwrap_or_default(), Err(_) => 0.0, }; self.metrics.current_max_fee_gwei.set(max_fee); let max_priority_fee = - match format_units(bundle_fees.max_priority_fee_per_gas, "gwei") { + match format_units(fees.bundle_fees.max_priority_fee_per_gas, "gwei") { Ok(s) => s.parse::().unwrap_or_default(), Err(_) => 0.0, }; @@ -363,7 +361,7 @@ where .current_max_priority_fee_gwei .set(max_priority_fee); - let base_fee_f64 = match format_units(base_fee, "gwei") { + let base_fee_f64 = match format_units(fees.base_fee, "gwei") { Ok(s) => s.parse::().unwrap_or_default(), Err(_) => 0.0, }; @@ -374,8 +372,7 @@ where let mut state = self.state.write(); state.block_number = update.latest_block_number; state.block_hash = update.latest_block_hash; - state.gas_fees = bundle_fees; - state.base_fee = base_fee; + state.gas_fees = fees; } } Err(e) => { @@ -441,13 +438,11 @@ where // pool maintenance let gas_fees = state.gas_fees; - let base_fee = state.base_fee; state.pool.do_maintenance( update.latest_block_number, update.latest_block_timestamp, da_block_data.as_ref(), gas_fees, - base_fee, ); } let maintenance_time = start.elapsed(); @@ -923,7 +918,7 @@ mod tests { da::DAGasUOData, pool::{PrecheckViolation, SimulationViolation}, v0_6::UserOperation, - EntityInfo, EntityInfos, EntityType, EntryPointVersion, GasFees, + EntityInfo, EntityInfos, EntityType, EntryPointVersion, UserOperation as UserOperationTrait, ValidTimeRange, }; @@ -1886,15 +1881,9 @@ mod tests { args.allowlist.clone().unwrap_or_default(), )); - prechecker.expect_update_fees().returning(|| { - Ok(( - GasFees { - max_fee_per_gas: 0, - max_priority_fee_per_gas: 0, - }, - 0, - )) - }); + prechecker + .expect_update_fees() + .returning(|| Ok(FeeUpdate::default())); for op in ops { prechecker.expect_check().returning(move |_, _| { diff --git a/crates/sim/src/lib.rs b/crates/sim/src/lib.rs index e7419d9d3..41a5a736c 100644 --- a/crates/sim/src/lib.rs +++ b/crates/sim/src/lib.rs @@ -49,8 +49,8 @@ mod precheck; #[cfg(feature = "test-utils")] pub use precheck::MockPrechecker; pub use precheck::{ - PrecheckError, PrecheckReturn, Prechecker, PrecheckerImpl, Settings as PrecheckSettings, - MIN_CALL_GAS_LIMIT, + FeeUpdate, PrecheckError, PrecheckReturn, Prechecker, PrecheckerImpl, + Settings as PrecheckSettings, MIN_CALL_GAS_LIMIT, }; /// Simulation and violation checking diff --git a/crates/sim/src/precheck.rs b/crates/sim/src/precheck.rs index 538501dbf..33972e465 100644 --- a/crates/sim/src/precheck.rs +++ b/crates/sim/src/precheck.rs @@ -43,6 +43,17 @@ pub struct PrecheckReturn { pub required_pre_verification_gas: u128, } +/// Updated fees from the fee estimator +#[derive(Copy, Clone, Debug, Default)] +pub struct FeeUpdate { + /// Bundle fees + pub bundle_fees: GasFees, + /// User operation fees + pub uo_fees: GasFees, + /// Current base fee + pub base_fee: u128, +} + /// Trait for checking if a user operation is valid before simulation /// according to the spec rules. #[cfg_attr(feature = "test-utils", automock(type UO = rundler_types::v0_6::UserOperation;))] @@ -61,7 +72,7 @@ pub trait Prechecker: Send + Sync { /// Update and return the bundle fees. /// /// This MUST be called at block boundaries before checking any operations. - async fn update_fees(&self) -> anyhow::Result<(GasFees, u128)>; + async fn update_fees(&self) -> anyhow::Result; } /// Precheck error @@ -145,13 +156,7 @@ struct AsyncData { #[derive(Copy, Clone, Debug)] struct AsyncDataCache { - fees: Option, -} - -#[derive(Copy, Clone, Debug)] -struct FeeCache { - bundle_fees: GasFees, - base_fee: u128, + fees: Option, } #[async_trait::async_trait] @@ -183,16 +188,19 @@ where }) } - async fn update_fees(&self) -> anyhow::Result<(GasFees, u128)> { + async fn update_fees(&self) -> anyhow::Result { let (bundle_fees, base_fee) = self.fee_estimator.required_bundle_fees(None).await?; - - let mut cache = self.cache.write().unwrap(); - cache.fees = Some(FeeCache { + let uo_fees = self.fee_estimator.required_op_fees(bundle_fees); + let fee_update = FeeUpdate { bundle_fees, + uo_fees, base_fee, - }); + }; + + let mut cache = self.cache.write().unwrap(); + cache.fees = Some(fee_update); - Ok((bundle_fees, base_fee)) + Ok(fee_update) } } @@ -358,7 +366,7 @@ where op: &UO, block: BlockHashOrNumber, ) -> anyhow::Result { - let (_, base_fee) = self.get_fees().await?; + let FeeUpdate { base_fee, .. } = self.get_fees().await?; let ( factory_exists, @@ -424,9 +432,9 @@ where .context("precheck should get sender balance") } - async fn get_fees(&self) -> anyhow::Result<(GasFees, u128)> { + async fn get_fees(&self) -> anyhow::Result { if let Some(fees) = self.cache.read().unwrap().fees { - return Ok((fees.bundle_fees, fees.base_fee)); + return Ok(fees); } self.update_fees().await } From 02d9d77816dbb47c5c7e9446b5f2a8b378ecca2e Mon Sep 17 00:00:00 2001 From: Andy Date: Thu, 19 Dec 2024 17:27:31 -0500 Subject: [PATCH 2/4] fix: impl timeout layer. (#926) --- Cargo.lock | 30 +++- Cargo.toml | 1 + bin/rundler/src/cli/builder.rs | 3 + bin/rundler/src/cli/mod.rs | 9 ++ crates/builder/src/sender/mod.rs | 9 +- crates/builder/src/task.rs | 12 +- crates/provider/Cargo.toml | 2 + crates/provider/src/alloy/metrics.rs | 5 + crates/provider/src/alloy/mod.rs | 61 +++++++- crates/provider/src/alloy/provider_timeout.rs | 137 ++++++++++++++++++ crates/types/src/task/status_code.rs | 1 + 11 files changed, 258 insertions(+), 12 deletions(-) create mode 100644 crates/provider/src/alloy/provider_timeout.rs diff --git a/Cargo.lock b/Cargo.lock index 3ffb61fec..f34fdb9b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -756,6 +756,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "ascii" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" + [[package]] name = "async-channel" version = "1.9.0" @@ -1641,6 +1647,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "chunked_transfer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" + [[package]] name = "clang-sys" version = "1.8.1" @@ -2717,7 +2729,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -3236,7 +3248,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -4677,6 +4689,7 @@ dependencies = [ "const-hex", "futures-util", "mockall", + "pin-project", "reqwest", "reth-tasks", "rundler-bindings-fastlz", @@ -4685,6 +4698,7 @@ dependencies = [ "rundler-types", "rundler-utils", "thiserror", + "tiny_http", "tokio", "tower 0.4.13", "tracing", @@ -5655,6 +5669,18 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tiny_http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" +dependencies = [ + "ascii", + "chunked_transfer", + "httpdate", + "log", +] + [[package]] name = "tinyvec" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index e44885b73..67d22489a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,3 +90,4 @@ tower = { version = "0.4.13", features = ["timeout"] } tracing = "0.1.40" strum = { version = "0.26.3", features = ["derive"] } url = "2.5.2" +tiny_http = "0.12.0" diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 6376361ed..0e5f485a1 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -336,6 +336,8 @@ impl BuilderArgs { let da_gas_tracking_enabled = super::lint_da_gas_tracking(common.da_gas_tracking_enabled, &chain_spec); + let provider_client_timeout_seconds = common.provider_client_timeout_seconds; + Ok(BuilderTaskArgs { entry_points, chain_spec, @@ -358,6 +360,7 @@ impl BuilderArgs { max_replacement_underpriced_blocks: self.max_replacement_underpriced_blocks, remote_address, da_gas_tracking_enabled, + provider_client_timeout_seconds, }) } diff --git a/bin/rundler/src/cli/mod.rs b/bin/rundler/src/cli/mod.rs index 577ee8ad7..959c37773 100644 --- a/bin/rundler/src/cli/mod.rs +++ b/bin/rundler/src/cli/mod.rs @@ -346,6 +346,14 @@ pub struct CommonArgs { default_value = "false" )] pub da_gas_tracking_enabled: bool, + + #[arg( + long = "provider_client_timeout_seconds", + name = "provider_client_timeout_seconds", + env = "PROVIDER_CLIENT_TIMEOUT_SECONDS", + default_value = "10" + )] + pub provider_client_timeout_seconds: u64, } const SIMULATION_GAS_OVERHEAD: u64 = 100_000; @@ -594,6 +602,7 @@ pub fn construct_providers( ) -> anyhow::Result { let provider = Arc::new(rundler_provider::new_alloy_provider( args.node_http.as_ref().context("must provide node_http")?, + args.provider_client_timeout_seconds, )?); let (da_gas_oracle, da_gas_oracle_sync) = rundler_provider::new_alloy_da_gas_oracle(chain_spec, provider.clone()); diff --git a/crates/builder/src/sender/mod.rs b/crates/builder/src/sender/mod.rs index 78ddd9eb7..97c8f55bd 100644 --- a/crates/builder/src/sender/mod.rs +++ b/crates/builder/src/sender/mod.rs @@ -171,12 +171,17 @@ impl TransactionSenderArgs { self, rpc_url: &str, signer: S, + provider_client_timeout_seconds: u64, ) -> std::result::Result, SenderConstructorErrors> { - let provider = rundler_provider::new_alloy_evm_provider(rpc_url)?; + let provider = + rundler_provider::new_alloy_evm_provider(rpc_url, provider_client_timeout_seconds)?; let sender = match self { Self::Raw(args) => { - let submitter = rundler_provider::new_alloy_evm_provider(&args.submit_url)?; + let submitter = rundler_provider::new_alloy_evm_provider( + &args.submit_url, + provider_client_timeout_seconds, + )?; if args.use_submit_for_status { TransactionSenderEnum::Raw(RawTransactionSender::new( diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index b0d6e5230..8f8c22ce2 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -89,6 +89,8 @@ pub struct Args { pub entry_points: Vec, /// Enable DA tracking pub da_gas_tracking_enabled: bool, + /// Provider client timeout + pub provider_client_timeout_seconds: u64, } /// Builder settings for an entrypoint @@ -355,11 +357,11 @@ where da_gas_tracking_enabled: self.args.da_gas_tracking_enabled, }; - let transaction_sender = self - .args - .sender_args - .clone() - .into_sender(&self.args.rpc_url, signer)?; + let transaction_sender = self.args.sender_args.clone().into_sender( + &self.args.rpc_url, + signer, + self.args.provider_client_timeout_seconds, + )?; let tracker_settings = transaction_tracker::Settings { replacement_fee_percent_increase: self.args.replacement_fee_percent_increase, diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 2e73669d6..d9339cbd1 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -33,6 +33,7 @@ async-trait.workspace = true auto_impl.workspace = true const-hex.workspace = true futures-util.workspace = true +pin-project.workspace = true reqwest.workspace = true thiserror.workspace = true tokio.workspace = true @@ -50,4 +51,5 @@ alloy-node-bindings = "0.4.2" alloy-provider = { workspace = true, features = ["debug-api", "anvil-node"] } alloy-sol-macro.workspace = true rundler-provider = { workspace = true, features = ["test-utils"] } +tiny_http.workspace = true tokio.workspace = true diff --git a/crates/provider/src/alloy/metrics.rs b/crates/provider/src/alloy/metrics.rs index 18650c353..5c293021e 100644 --- a/crates/provider/src/alloy/metrics.rs +++ b/crates/provider/src/alloy/metrics.rs @@ -126,6 +126,11 @@ where method_logger.record_http(HttpCode::TwoHundreds); method_logger.record_rpc(RpcCode::Success); } + // for timeout error + alloy_json_rpc::RpcError::LocalUsageError(_) => { + method_logger.record_http(HttpCode::FourHundreds); + method_logger.record_rpc(RpcCode::ClientSideTimeout); + } _ => {} } } diff --git a/crates/provider/src/alloy/mod.rs b/crates/provider/src/alloy/mod.rs index 49f08fac7..1256b9cca 100644 --- a/crates/provider/src/alloy/mod.rs +++ b/crates/provider/src/alloy/mod.rs @@ -11,6 +11,8 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. +use std::time::Duration; + use alloy_provider::{Provider as AlloyProvider, ProviderBuilder}; use alloy_rpc_client::ClientBuilder; use alloy_transport::layers::RetryBackoffService; @@ -18,6 +20,7 @@ use alloy_transport_http::Http; use anyhow::Context; use evm::AlloyEvmProvider; use metrics::{AlloyMetricLayer, AlloyMetricMiddleware}; +use provider_timeout::{ProviderTimeout, ProviderTimeoutLayer}; use reqwest::Client; use url::Url; @@ -28,27 +31,79 @@ pub use da::new_alloy_da_gas_oracle; pub(crate) mod entry_point; pub(crate) mod evm; pub(crate) mod metrics; +mod provider_timeout; /// Create a new alloy evm provider from a given RPC URL -pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result { - let provider = new_alloy_provider(rpc_url)?; +pub fn new_alloy_evm_provider( + rpc_url: &str, + provider_client_timeout_seconds: u64, +) -> anyhow::Result { + let provider = new_alloy_provider(rpc_url, provider_client_timeout_seconds)?; Ok(AlloyEvmProvider::new(provider)) } /// Create a new alloy provider from a given RPC URL pub fn new_alloy_provider( rpc_url: &str, + provider_client_timeout_seconds: u64, ) -> anyhow::Result< - impl AlloyProvider>>> + Clone, + impl AlloyProvider>>>> + + Clone, > { let url = Url::parse(rpc_url).context("invalid rpc url")?; let metric_layer = AlloyMetricLayer::default(); // TODO: make this configurable: use a large number for CUPS for now let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 1_000_000); + // add a timeout layer here. + let timeout_layer = + ProviderTimeoutLayer::new(Duration::from_secs(provider_client_timeout_seconds)); let client = ClientBuilder::default() .layer(retry_layer) .layer(metric_layer) + .layer(timeout_layer) .http(url); let provider = ProviderBuilder::new().on_client(client); Ok(provider) } + +#[cfg(test)] +mod tests { + use std::{ + thread::{self, sleep}, + time::Duration, + }; + + use alloy_provider::Provider; + use tiny_http::{Response, Server}; + + use crate::new_alloy_provider; + fn setup() { + let server = Server::http("0.0.0.0:8000").unwrap(); + for request in server.incoming_requests() { + sleep(Duration::from_secs(10)); + let _ = request.respond(Response::from_string( + "{\"jsonrpc\": \"2.0\", \"id\": 1, \"result\": \"0x146b6d7\"}", + )); + } + } + #[tokio::test] + async fn test_timeout() { + thread::spawn(move || { + setup(); + }); + { + // Wait 11 seconds and get result + let provider = new_alloy_provider("http://localhost:8000", 15) + .expect("can not initialize provider"); + let x = provider.get_block_number().await; + assert!(x.is_ok()); + } + { + // Wait 9 seconds and timeout form client side + let provider = new_alloy_provider("http://localhost:8000", 5) + .expect("can not initialize provider"); + let x = provider.get_block_number().await; + assert!(x.is_err()); + } + } +} diff --git a/crates/provider/src/alloy/provider_timeout.rs b/crates/provider/src/alloy/provider_timeout.rs new file mode 100644 index 000000000..2d9db2e3e --- /dev/null +++ b/crates/provider/src/alloy/provider_timeout.rs @@ -0,0 +1,137 @@ +//! Middleware that applies a timeout to requests. +//! +//! If the response does not complete within the specified timeout, the response +//! will be aborted. + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use alloy_transport::TransportError; +use pin_project::pin_project; +use tokio::time::Sleep; +use tower::{Layer, Service}; + +/// Applies a timeout to requests via the supplied inner service. +#[derive(Debug, Clone)] +pub(crate) struct ProviderTimeoutLayer { + timeout: Duration, +} + +impl ProviderTimeoutLayer { + /// Create a timeout from a duration + pub(crate) fn new(timeout: Duration) -> Self { + ProviderTimeoutLayer { timeout } + } +} + +impl Layer for ProviderTimeoutLayer +where + S: Service + Sync, +{ + type Service = ProviderTimeout; + + fn layer(&self, service: S) -> Self::Service { + ProviderTimeout::new(service, self.timeout) + } +} + +/// Applies a timeout to requests. +#[derive(Debug)] +pub struct ProviderTimeout { + service: S, + timeout: Duration, +} + +// ===== impl Timeout ===== + +impl ProviderTimeout +where + S: Service + Sync, +{ + /// Creates a new [`Timeout`] + pub const fn new(service: S, timeout: Duration) -> Self { + ProviderTimeout { service, timeout } + } +} + +impl Clone for ProviderTimeout +where + S: Clone, +{ + fn clone(&self) -> Self { + Self { + service: self.service.clone(), + timeout: self.timeout, + } + } +} +impl Service for ProviderTimeout +where + S: Service + + Sync + + Send + + Clone + + 'static, + S::Future: Send, +{ + type Response = S::Response; + type Error = TransportError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.service.poll_ready(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)), + } + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let response = self.service.call(request); + let sleep = tokio::time::sleep(self.timeout); + ResponseFuture::new(response, sleep) + } +} + +#[pin_project] +#[derive(Debug)] +pub struct ResponseFuture { + #[pin] + response: T, + #[pin] + sleep: Sleep, +} + +impl ResponseFuture { + pub(crate) fn new(response: T, sleep: Sleep) -> Self { + ResponseFuture { response, sleep } + } +} + +impl Future for ResponseFuture +where + F: Future>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + // First, try polling the future + match this.response.poll(cx) { + Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)), + Poll::Pending => {} + } + // Now check the sleep + match this.sleep.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => Poll::Ready(Err(TransportError::local_usage_str( + "provider request timeout from client side", + ))), + } + } +} diff --git a/crates/types/src/task/status_code.rs b/crates/types/src/task/status_code.rs index 610e0ccdd..fe9471764 100644 --- a/crates/types/src/task/status_code.rs +++ b/crates/types/src/task/status_code.rs @@ -24,6 +24,7 @@ pub enum RpcCode { Other, InvalidParams, DeadlineExceed, + ClientSideTimeout, MethodNotFound, AlreadyExist, PermissionDenied, From a77876c0ae921816c9bca482cb71a745a6448428 Mon Sep 17 00:00:00 2001 From: Andy Date: Fri, 20 Dec 2024 11:46:39 -0500 Subject: [PATCH 3/4] fix: ignore flaky tests (#947) --- crates/provider/src/alloy/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/provider/src/alloy/mod.rs b/crates/provider/src/alloy/mod.rs index 1256b9cca..36bf77125 100644 --- a/crates/provider/src/alloy/mod.rs +++ b/crates/provider/src/alloy/mod.rs @@ -78,14 +78,15 @@ mod tests { use crate::new_alloy_provider; fn setup() { - let server = Server::http("0.0.0.0:8000").unwrap(); + let server = Server::http("0.0.0.0:9009").unwrap(); for request in server.incoming_requests() { - sleep(Duration::from_secs(10)); + sleep(Duration::from_secs(5)); let _ = request.respond(Response::from_string( "{\"jsonrpc\": \"2.0\", \"id\": 1, \"result\": \"0x146b6d7\"}", )); } } + #[ignore = "this test is flaky with github action, should only run locally"] #[tokio::test] async fn test_timeout() { thread::spawn(move || { @@ -93,14 +94,14 @@ mod tests { }); { // Wait 11 seconds and get result - let provider = new_alloy_provider("http://localhost:8000", 15) + let provider = new_alloy_provider("http://localhost:9009", 15) .expect("can not initialize provider"); let x = provider.get_block_number().await; assert!(x.is_ok()); } { // Wait 9 seconds and timeout form client side - let provider = new_alloy_provider("http://localhost:8000", 5) + let provider = new_alloy_provider("http://localhost:9009", 1) .expect("can not initialize provider"); let x = provider.get_block_number().await; assert!(x.is_err()); From d9729c3ff1818f5b900ebcc11704aad4c98f05bc Mon Sep 17 00:00:00 2001 From: dancoombs Date: Fri, 20 Dec 2024 15:59:45 -0600 Subject: [PATCH 4/4] fix(builder): fix entity removal in bundle building --- crates/builder/src/bundle_proposer.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/builder/src/bundle_proposer.rs b/crates/builder/src/bundle_proposer.rs index b43d33f96..62931f406 100644 --- a/crates/builder/src/bundle_proposer.rs +++ b/crates/builder/src/bundle_proposer.rs @@ -1287,6 +1287,11 @@ impl ProposalContext { fn reject_aggregator(&mut self, address: Address) { self.groups_by_aggregator.remove(&Some(address)); + if let Some(group) = self.groups_by_aggregator.remove(&Some(address)) { + for op in group.ops_with_simulations { + self.rejected_ops.push((op.op, op.simulation.entity_infos)); + } + } } fn reject_paymaster(&mut self, address: Address) -> Vec
{ @@ -1310,6 +1315,8 @@ impl ProposalContext { for op in mem::take(&mut group.ops_with_simulations) { if !filter(&op.op) { group.ops_with_simulations.push(op); + } else { + self.rejected_ops.push((op.op, op.simulation.entity_infos)); } } if group.ops_with_simulations.is_empty() { @@ -1998,7 +2005,7 @@ mod tests { }, ] ); - assert_eq!(bundle.rejected_ops, vec![]); + assert_eq!(bundle.rejected_ops, vec![op1, op2, op4, op5]); assert_eq!( bundle.ops_per_aggregator, vec![UserOpsPerAggregator {