Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: limit number of connections #546

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(crate) async fn client_main(
rpc_params.rpc_url.clone(),
rpc_params.backoff,
rpc_params.max_retries,
),
)?,
));
check_previous_proof_and_checkpoint(
params.checkpoint_block_number,
Expand Down
6 changes: 4 additions & 2 deletions zero_bin/rpc/src/jerigon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ where
{
// Grab trace information
let tx_results = cached_provider
.as_provider()
.get_provider()
.await?
.raw_request::<_, Vec<ZeroTxResult>>(
"debug_traceBlockByNumber".into(),
(target_block_id, json!({"tracer": "zeroTracer"})),
Expand All @@ -39,7 +40,8 @@ where
// Grab block witness info (packed as combined trie pre-images)

let block_witness = cached_provider
.as_provider()
.get_provider()
.await?
.raw_request::<_, String>("eth_getWitness".into(), vec![target_block_id])
.await?;

Expand Down
4 changes: 2 additions & 2 deletions zero_bin/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ where
// We use that execution not to produce a new contract bytecode - instead, we
// return hashes. To look at the code use `cast disassemble <bytecode>`.
let bytes = cached_provider
.as_provider()
.get_provider().await?
.raw_request::<_, Bytes>(
"eth_call".into(),
(json!({"input": "0x60005B60010180430340816020025280610101116300000002576120205FF3"}), target_block_number),
Expand Down Expand Up @@ -216,7 +216,7 @@ where
.header
.number
.context("target block is missing field `number`")?;
let chain_id = cached_provider.as_provider().get_chain_id().await?;
let chain_id = cached_provider.get_provider().await?.get_chain_id().await?;
let prev_hashes = fetch_previous_block_hashes(cached_provider, target_block_number).await?;

let other_data = OtherBlockData {
Expand Down
5 changes: 3 additions & 2 deletions zero_bin/rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Cli {
self.config.rpc_url.clone(),
self.config.backoff,
self.config.max_retries,
)));
)?));

match self.command {
Command::Fetch {
Expand All @@ -141,7 +141,8 @@ impl Cli {
// Get transaction info.
match cached_provider
.clone()
.as_provider()
.get_provider()
.await?
.get_transaction_by_hash(tx_hash)
.await?
{
Expand Down
3 changes: 2 additions & 1 deletion zero_bin/rpc/src/native/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;

use alloy::{
Expand Down Expand Up @@ -53,7 +54,7 @@ where
.await?;

let (code_db, txn_info) =
txn::process_transactions(&block, cached_provider.as_provider()).await?;
txn::process_transactions(&block, cached_provider.get_provider().await?.deref()).await?;
let trie_pre_images = state::process_state_witness(cached_provider, block, &txn_info).await?;

Ok(BlockTrace {
Expand Down
6 changes: 4 additions & 2 deletions zero_bin/rpc/src/native/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ where
let provider = provider.clone();
async move {
let proof = provider
.as_provider()
.get_provider()
.await?
.get_proof(address, keys.into_iter().collect())
.block_id((block_number - 1).into())
.await
Expand All @@ -196,7 +197,8 @@ where
let provider = provider.clone();
async move {
let proof = provider
.as_provider()
.get_provider()
.await?
.get_proof(address, keys.into_iter().collect())
.block_id(block_number.into())
.await
Expand Down
44 changes: 35 additions & 9 deletions zero_bin/rpc/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,57 @@
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use alloy::primitives::BlockHash;
use alloy::rpc::types::{Block, BlockId, BlockTransactionsKind};
use alloy::{providers::Provider, transports::Transport};
use anyhow::Context;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Semaphore, SemaphorePermit};

const CACHE_SIZE: usize = 1024;
const MAX_NUMBER_OF_PARALLEL_REQUESTS: usize = 64;

/// Wrapper around alloy provider to cache blocks and other
/// frequently used data.
pub struct CachedProvider<ProviderT, TransportT> {
provider: ProviderT,
provider: Arc<ProviderT>,
// `Alloy` provider is using `Reqwest` http client under the hood. It has an unbounded
// connection pool. We need to limit the number of parallel connections by ourselves, so we
// use semaphore to count the number of parallel RPC requests happening at any moment with
// CachedProvider.
semaphore: Arc<Semaphore>,
blocks_by_number: Arc<Mutex<lru::LruCache<u64, Block>>>,
blocks_by_hash: Arc<Mutex<lru::LruCache<BlockHash, u64>>>,
_phantom: std::marker::PhantomData<TransportT>,
}

pub struct ProviderGuard<'a, ProviderT> {
provider: Arc<ProviderT>,
_permit: SemaphorePermit<'a>,
}

impl<'a, ProviderT> Deref for ProviderGuard<'a, ProviderT> {
type Target = Arc<ProviderT>;

fn deref(&self) -> &Self::Target {
&self.provider
}
}

impl<ProviderT> DerefMut for ProviderGuard<'_, ProviderT> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.provider
}
}

impl<ProviderT, TransportT> CachedProvider<ProviderT, TransportT>
where
ProviderT: Provider<TransportT>,
TransportT: Transport + Clone,
{
pub fn new(provider: ProviderT) -> Self {
Self {
provider,
provider: provider.into(),
semaphore: Arc::new(Semaphore::new(MAX_NUMBER_OF_PARALLEL_REQUESTS)),
blocks_by_number: Arc::new(Mutex::new(lru::LruCache::new(
std::num::NonZero::new(CACHE_SIZE).unwrap(),
))),
Expand All @@ -35,12 +62,11 @@ where
}
}

pub fn as_mut_provider(&mut self) -> &mut ProviderT {
&mut self.provider
}

pub const fn as_provider(&self) -> &ProviderT {
&self.provider
pub async fn get_provider(&self) -> Result<ProviderGuard<ProviderT>, anyhow::Error> {
Ok(ProviderGuard {
provider: self.provider.clone(),
_permit: self.semaphore.acquire().await?,
})
}

/// Retrieves block by number or hash, caching it if it's not already
Expand Down
24 changes: 20 additions & 4 deletions zero_bin/rpc/src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::time::Duration;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use alloy::transports::http::reqwest;
use alloy::{
providers::{ProviderBuilder, RootProvider},
rpc::{
Expand All @@ -14,6 +16,9 @@ use alloy::{
};
use tower::{retry::Policy, Layer, Service};

const HTTP_CLIENT_CONNECTION_POOL_IDLE_TIMEOUT: u64 = 90;
const HTTP_CLIENT_MAX_IDLE_CONNECTIONS_PER_HOST: usize = 64;

#[derive(Debug)]
pub struct RetryPolicy {
backoff: tokio::time::Duration,
Expand Down Expand Up @@ -138,11 +143,22 @@ pub fn build_http_retry_provider(
rpc_url: url::Url,
backoff: u64,
max_retries: u32,
) -> RootProvider<RetryService<alloy::transports::http::ReqwestTransport>> {
) -> Result<RootProvider<RetryService<alloy::transports::http::ReqwestTransport>>, anyhow::Error> {
let retry_policy = RetryLayer::new(RetryPolicy::new(
tokio::time::Duration::from_millis(backoff),
Duration::from_millis(backoff),
max_retries,
));
let client = ClientBuilder::default().layer(retry_policy).http(rpc_url);
ProviderBuilder::new().on_client(client)
let reqwest_client = reqwest::ClientBuilder::new()
.pool_max_idle_per_host(HTTP_CLIENT_MAX_IDLE_CONNECTIONS_PER_HOST)
.pool_idle_timeout(Duration::from_secs(
HTTP_CLIENT_CONNECTION_POOL_IDLE_TIMEOUT,
))
.build()?;

let http = alloy::transports::http::Http::with_client(reqwest_client, rpc_url);
let is_local = http.guess_local();
let client = ClientBuilder::default()
.layer(retry_policy)
.transport(http, is_local);
Ok(ProviderBuilder::new().on_client(client))
}
Loading