Skip to content

Commit

Permalink
fix: limit number of connections
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko committed Aug 27, 2024
1 parent 8771b9a commit bb833e6
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 14 deletions.
6 changes: 4 additions & 2 deletions zero_bin/rpc/src/jerigon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ where
{
// Grab trace information
let tx_results = cached_provider
.as_provider()
.get_provider()
.await?
.raw_request::<_, Vec<ZeroTxResult>>(
"debug_traceBlockByNumber".into(),
(target_block_id, json!({"tracer": "zeroTracer"})),
Expand All @@ -39,7 +40,8 @@ where
// Grab block witness info (packed as combined trie pre-images)

let block_witness = cached_provider
.as_provider()
.get_provider()
.await?
.raw_request::<_, String>("eth_getWitness".into(), vec![target_block_id])
.await?;

Expand Down
3 changes: 2 additions & 1 deletion zero_bin/rpc/src/native/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;

use alloy::{
Expand Down Expand Up @@ -53,7 +54,7 @@ where
.await?;

let (code_db, txn_info) =
txn::process_transactions(&block, cached_provider.as_provider()).await?;
txn::process_transactions(&block, cached_provider.get_provider().await?.deref()).await?;
let trie_pre_images = state::process_state_witness(cached_provider, block, &txn_info).await?;

Ok(BlockTrace {
Expand Down
6 changes: 4 additions & 2 deletions zero_bin/rpc/src/native/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ where
let provider = provider.clone();
async move {
let proof = provider
.as_provider()
.get_provider()
.await?
.get_proof(address, keys.into_iter().collect())
.block_id((block_number - 1).into())
.await
Expand All @@ -196,7 +197,8 @@ where
let provider = provider.clone();
async move {
let proof = provider
.as_provider()
.get_provider()
.await?
.get_proof(address, keys.into_iter().collect())
.block_id(block_number.into())
.await
Expand Down
40 changes: 31 additions & 9 deletions zero_bin/rpc/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,53 @@
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use alloy::primitives::BlockHash;
use alloy::rpc::types::{Block, BlockId, BlockTransactionsKind};
use alloy::{providers::Provider, transports::Transport};
use anyhow::Context;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Semaphore, SemaphorePermit};

const CACHE_SIZE: usize = 1024;
const MAX_NUMBER_OF_PARALLEL_REQUESTS: usize = 64;

/// Wrapper around alloy provider to cache blocks and other
/// frequently used data.
pub struct CachedProvider<ProviderT, TransportT> {
provider: ProviderT,
provider: Arc<ProviderT>,
semaphore: Arc<Semaphore>,
blocks_by_number: Arc<Mutex<lru::LruCache<u64, Block>>>,
blocks_by_hash: Arc<Mutex<lru::LruCache<BlockHash, u64>>>,
_phantom: std::marker::PhantomData<TransportT>,
}

pub struct ProviderGuard<'a, ProviderT> {
provider: Arc<ProviderT>,
_semaphore: SemaphorePermit<'a>,
}

impl<'a, ProviderT> Deref for ProviderGuard<'a, ProviderT> {
type Target = Arc<ProviderT>;

fn deref(&self) -> &Self::Target {
&self.provider
}
}

impl<ProviderT> DerefMut for ProviderGuard<'_, ProviderT> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.provider
}
}

impl<ProviderT, TransportT> CachedProvider<ProviderT, TransportT>
where
ProviderT: Provider<TransportT>,
TransportT: Transport + Clone,
{
pub fn new(provider: ProviderT) -> Self {
Self {
provider,
provider: provider.into(),
semaphore: Arc::new(Semaphore::new(MAX_NUMBER_OF_PARALLEL_REQUESTS)),
blocks_by_number: Arc::new(Mutex::new(lru::LruCache::new(
std::num::NonZero::new(CACHE_SIZE).unwrap(),
))),
Expand All @@ -35,12 +58,11 @@ where
}
}

pub fn as_mut_provider(&mut self) -> &mut ProviderT {
&mut self.provider
}

pub const fn as_provider(&self) -> &ProviderT {
&self.provider
pub async fn get_provider(&self) -> Result<ProviderGuard<ProviderT>, anyhow::Error> {
Ok(ProviderGuard {
provider: self.provider.clone(),
_semaphore: self.semaphore.acquire().await?,
})
}

/// Retrieves block by number or hash, caching it if it's not already
Expand Down

0 comments on commit bb833e6

Please sign in to comment.