Skip to content

Commit

Permalink
Merge branch 'main' into niveda/conditional
Browse files Browse the repository at this point in the history
  • Loading branch information
niveda-krish authored Dec 11, 2024
2 parents 5c34bae + 6b9a2ad commit b661f61
Showing 1 changed file with 52 additions and 14 deletions.
66 changes: 52 additions & 14 deletions crates/pool/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{

use alloy_primitives::{Address, B256, U256};
use alloy_sol_types::SolEvent;
use anyhow::{ensure, Context};
use anyhow::{bail, ensure, Context};
use futures::future;
use metrics::{Counter, Gauge};
use metrics_derive::Metrics;
Expand All @@ -44,6 +44,7 @@ use tokio::{
use tracing::{info, warn};

const MAX_LOAD_OPS_CONCURRENCY: usize = 64;
const SYNC_ERROR_COUNT_MAX: usize = 50;

/// A data structure that holds the currently known recent state of the chain,
/// with logic for updating itself and returning what has changed.
Expand All @@ -59,6 +60,7 @@ pub(crate) struct Chain<P: EvmProvider> {
blocks: VecDeque<BlockSummary>,
/// Semaphore to limit the number of concurrent `eth_getLogs` calls.
load_ops_semaphore: Semaphore,
sync_error_count: usize,
/// Filter template.
filter_template: Filter,
/// Metrics of chain events.
Expand Down Expand Up @@ -170,6 +172,7 @@ impl<P: EvmProvider> Chain<P> {
provider,
settings,
blocks: VecDeque::new(),
sync_error_count: 0,
load_ops_semaphore: Semaphore::new(MAX_LOAD_OPS_CONCURRENCY),
filter_template,
metrics: ChainMetrics::default(),
Expand Down Expand Up @@ -240,10 +243,18 @@ impl<P: EvmProvider> Chain<P> {
};
let current_block_number = current_block.number;
let new_block_number = new_head.number;
ensure!(
current_block_number < new_block_number + self.settings.history_size,

if current_block_number > new_block_number + self.settings.history_size {
self.sync_error_count += 1;

if self.sync_error_count >= SYNC_ERROR_COUNT_MAX {
return self.reset_and_initialize(new_head).await;
}

bail!(
"new block number {new_block_number} should be greater than start of history (current block: {current_block_number})"
);
)
}

if current_block_number + self.settings.history_size < new_block_number {
warn!(
Expand All @@ -267,6 +278,7 @@ impl<P: EvmProvider> Chain<P> {
.context("should load full history when resetting chain")?;
self.load_ops_into_block_summaries(&mut blocks).await?;
self.blocks = blocks;
self.sync_error_count = 0;
let mined_ops: Vec<_> = self
.blocks
.iter()
Expand Down Expand Up @@ -398,6 +410,29 @@ impl<P: EvmProvider> Chain<P> {
Ok(added_blocks)
}

async fn fetch_block_with_retries(&self, block_hash: B256) -> Option<Block> {
for attempt in 1..=self.settings.max_sync_retries {
match self.provider.get_block(block_hash.into()).await {
Ok(Some(block)) => return Some(block),
Ok(None) => warn!(
"Block with hash {:?} not found. Retrying... (attempt {}/{})",
block_hash, attempt, self.settings.max_sync_retries
),
Err(err) => warn!(
"Error fetching block with hash {:?}: {}. Retrying... (attempt {}/{})",
block_hash, err, attempt, self.settings.max_sync_retries
),
}
time::sleep(self.settings.poll_interval).await;
}

warn!(
"Failed to fetch block with hash {:?} after {} attempts.",
block_hash, self.settings.max_sync_retries
);
None
}

async fn load_blocks_back_to_number_no_ops(
&self,
head: BlockSummary,
Expand All @@ -408,16 +443,19 @@ impl<P: EvmProvider> Chain<P> {
blocks.push_front(head);
while blocks[0].number > min_block_number {
let parent_hash = blocks[0].parent_hash;
let parent = self
.provider
.get_block(parent_hash.into())
.await
.context("should load parent block by hash")?
.context("block with parent hash of known block should exist")?;
blocks.push_front(BlockSummary::try_from_block_without_ops(
parent,
Some(blocks[0].number - 1),
)?);
let parent = self.fetch_block_with_retries(parent_hash).await;

if let Some(parent) = parent {
blocks.push_front(BlockSummary::try_from_block_without_ops(
parent,
Some(blocks[0].number - 1),
)?);
} else {
bail!(
"Unable to backtrack chain history beyond block number {} due to missing parent block.",
blocks[0].number
);
}
}
Ok(blocks)
}
Expand Down

0 comments on commit b661f61

Please sign in to comment.