Skip to content

Commit

Permalink
fix: limit number of connections (#546)
Browse files Browse the repository at this point in the history
* fix: limit number of connections

* fix: formatting

* fix: explicitelly build reqwest client

* fix: nit

* fix: review
  • Loading branch information
atanmarko authored Aug 27, 2024
1 parent 8771b9a commit 32a6ba7
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 23 deletions.
2 changes: 1 addition & 1 deletion zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(crate) async fn client_main(
rpc_params.rpc_url.clone(),
rpc_params.backoff,
rpc_params.max_retries,
),
)?,
));
check_previous_proof_and_checkpoint(
params.checkpoint_block_number,
Expand Down
6 changes: 4 additions & 2 deletions zero_bin/rpc/src/jerigon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ where
{
// Grab trace information
let tx_results = cached_provider
.as_provider()
.get_provider()
.await?
.raw_request::<_, Vec<ZeroTxResult>>(
"debug_traceBlockByNumber".into(),
(target_block_id, json!({"tracer": "zeroTracer"})),
Expand All @@ -39,7 +40,8 @@ where
// Grab block witness info (packed as combined trie pre-images)

let block_witness = cached_provider
.as_provider()
.get_provider()
.await?
.raw_request::<_, String>("eth_getWitness".into(), vec![target_block_id])
.await?;

Expand Down
4 changes: 2 additions & 2 deletions zero_bin/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ where
// We use that execution not to produce a new contract bytecode - instead, we
// return hashes. To look at the code use `cast disassemble <bytecode>`.
let bytes = cached_provider
.as_provider()
.get_provider().await?
.raw_request::<_, Bytes>(
"eth_call".into(),
(json!({"input": "0x60005B60010180430340816020025280610101116300000002576120205FF3"}), target_block_number),
Expand Down Expand Up @@ -216,7 +216,7 @@ where
.header
.number
.context("target block is missing field `number`")?;
let chain_id = cached_provider.as_provider().get_chain_id().await?;
let chain_id = cached_provider.get_provider().await?.get_chain_id().await?;
let prev_hashes = fetch_previous_block_hashes(cached_provider, target_block_number).await?;

let other_data = OtherBlockData {
Expand Down
5 changes: 3 additions & 2 deletions zero_bin/rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Cli {
self.config.rpc_url.clone(),
self.config.backoff,
self.config.max_retries,
)));
)?));

match self.command {
Command::Fetch {
Expand All @@ -141,7 +141,8 @@ impl Cli {
// Get transaction info.
match cached_provider
.clone()
.as_provider()
.get_provider()
.await?
.get_transaction_by_hash(tx_hash)
.await?
{
Expand Down
3 changes: 2 additions & 1 deletion zero_bin/rpc/src/native/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;

use alloy::{
Expand Down Expand Up @@ -53,7 +54,7 @@ where
.await?;

let (code_db, txn_info) =
txn::process_transactions(&block, cached_provider.as_provider()).await?;
txn::process_transactions(&block, cached_provider.get_provider().await?.deref()).await?;
let trie_pre_images = state::process_state_witness(cached_provider, block, &txn_info).await?;

Ok(BlockTrace {
Expand Down
6 changes: 4 additions & 2 deletions zero_bin/rpc/src/native/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ where
let provider = provider.clone();
async move {
let proof = provider
.as_provider()
.get_provider()
.await?
.get_proof(address, keys.into_iter().collect())
.block_id((block_number - 1).into())
.await
Expand All @@ -196,7 +197,8 @@ where
let provider = provider.clone();
async move {
let proof = provider
.as_provider()
.get_provider()
.await?
.get_proof(address, keys.into_iter().collect())
.block_id(block_number.into())
.await
Expand Down
44 changes: 35 additions & 9 deletions zero_bin/rpc/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,57 @@
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use alloy::primitives::BlockHash;
use alloy::rpc::types::{Block, BlockId, BlockTransactionsKind};
use alloy::{providers::Provider, transports::Transport};
use anyhow::Context;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Semaphore, SemaphorePermit};

const CACHE_SIZE: usize = 1024;
const MAX_NUMBER_OF_PARALLEL_REQUESTS: usize = 64;

/// Wrapper around alloy provider to cache blocks and other
/// frequently used data.
pub struct CachedProvider<ProviderT, TransportT> {
provider: ProviderT,
provider: Arc<ProviderT>,
// `Alloy` provider is using `Reqwest` http client under the hood. It has an unbounded
// connection pool. We need to limit the number of parallel connections by ourselves, so we
// use semaphore to count the number of parallel RPC requests happening at any moment with
// CachedProvider.
semaphore: Arc<Semaphore>,
blocks_by_number: Arc<Mutex<lru::LruCache<u64, Block>>>,
blocks_by_hash: Arc<Mutex<lru::LruCache<BlockHash, u64>>>,
_phantom: std::marker::PhantomData<TransportT>,
}

pub struct ProviderGuard<'a, ProviderT> {
provider: Arc<ProviderT>,
_permit: SemaphorePermit<'a>,
}

impl<'a, ProviderT> Deref for ProviderGuard<'a, ProviderT> {
type Target = Arc<ProviderT>;

fn deref(&self) -> &Self::Target {
&self.provider
}
}

impl<ProviderT> DerefMut for ProviderGuard<'_, ProviderT> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.provider
}
}

impl<ProviderT, TransportT> CachedProvider<ProviderT, TransportT>
where
ProviderT: Provider<TransportT>,
TransportT: Transport + Clone,
{
pub fn new(provider: ProviderT) -> Self {
Self {
provider,
provider: provider.into(),
semaphore: Arc::new(Semaphore::new(MAX_NUMBER_OF_PARALLEL_REQUESTS)),
blocks_by_number: Arc::new(Mutex::new(lru::LruCache::new(
std::num::NonZero::new(CACHE_SIZE).unwrap(),
))),
Expand All @@ -35,12 +62,11 @@ where
}
}

pub fn as_mut_provider(&mut self) -> &mut ProviderT {
&mut self.provider
}

pub const fn as_provider(&self) -> &ProviderT {
&self.provider
pub async fn get_provider(&self) -> Result<ProviderGuard<ProviderT>, anyhow::Error> {
Ok(ProviderGuard {
provider: self.provider.clone(),
_permit: self.semaphore.acquire().await?,
})
}

/// Retrieves block by number or hash, caching it if it's not already
Expand Down
24 changes: 20 additions & 4 deletions zero_bin/rpc/src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::time::Duration;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use alloy::transports::http::reqwest;
use alloy::{
providers::{ProviderBuilder, RootProvider},
rpc::{
Expand All @@ -14,6 +16,9 @@ use alloy::{
};
use tower::{retry::Policy, Layer, Service};

const HTTP_CLIENT_CONNECTION_POOL_IDLE_TIMEOUT: u64 = 90;
const HTTP_CLIENT_MAX_IDLE_CONNECTIONS_PER_HOST: usize = 64;

#[derive(Debug)]
pub struct RetryPolicy {
backoff: tokio::time::Duration,
Expand Down Expand Up @@ -138,11 +143,22 @@ pub fn build_http_retry_provider(
rpc_url: url::Url,
backoff: u64,
max_retries: u32,
) -> RootProvider<RetryService<alloy::transports::http::ReqwestTransport>> {
) -> Result<RootProvider<RetryService<alloy::transports::http::ReqwestTransport>>, anyhow::Error> {
let retry_policy = RetryLayer::new(RetryPolicy::new(
tokio::time::Duration::from_millis(backoff),
Duration::from_millis(backoff),
max_retries,
));
let client = ClientBuilder::default().layer(retry_policy).http(rpc_url);
ProviderBuilder::new().on_client(client)
let reqwest_client = reqwest::ClientBuilder::new()
.pool_max_idle_per_host(HTTP_CLIENT_MAX_IDLE_CONNECTIONS_PER_HOST)
.pool_idle_timeout(Duration::from_secs(
HTTP_CLIENT_CONNECTION_POOL_IDLE_TIMEOUT,
))
.build()?;

let http = alloy::transports::http::Http::with_client(reqwest_client, rpc_url);
let is_local = http.guess_local();
let client = ClientBuilder::default()
.layer(retry_policy)
.transport(http, is_local);
Ok(ProviderBuilder::new().on_client(client))
}

0 comments on commit 32a6ba7

Please sign in to comment.