diff --git a/crates/pool/src/chain.rs b/crates/pool/src/chain.rs index 0940e062e..ef6635c45 100644 --- a/crates/pool/src/chain.rs +++ b/crates/pool/src/chain.rs @@ -19,7 +19,7 @@ use std::{ use alloy_primitives::{Address, B256, U256}; use alloy_sol_types::SolEvent; -use anyhow::{ensure, Context}; +use anyhow::{bail, ensure, Context}; use futures::future; use metrics::{Counter, Gauge}; use metrics_derive::Metrics; @@ -44,6 +44,7 @@ use tokio::{ use tracing::{info, warn}; const MAX_LOAD_OPS_CONCURRENCY: usize = 64; +const SYNC_ERROR_COUNT_MAX: usize = 50; /// A data structure that holds the currently known recent state of the chain, /// with logic for updating itself and returning what has changed. @@ -59,6 +60,7 @@ pub(crate) struct Chain { blocks: VecDeque, /// Semaphore to limit the number of concurrent `eth_getLogs` calls. load_ops_semaphore: Semaphore, + sync_error_count: usize, /// Filter template. filter_template: Filter, /// Metrics of chain events. @@ -170,6 +172,7 @@ impl Chain

{ provider, settings, blocks: VecDeque::new(), + sync_error_count: 0, load_ops_semaphore: Semaphore::new(MAX_LOAD_OPS_CONCURRENCY), filter_template, metrics: ChainMetrics::default(), @@ -240,10 +243,18 @@ impl Chain

{ }; let current_block_number = current_block.number; let new_block_number = new_head.number; - ensure!( - current_block_number < new_block_number + self.settings.history_size, + + if current_block_number > new_block_number + self.settings.history_size { + self.sync_error_count += 1; + + if self.sync_error_count >= SYNC_ERROR_COUNT_MAX { + return self.reset_and_initialize(new_head).await; + } + + bail!( "new block number {new_block_number} should be greater than start of history (current block: {current_block_number})" - ); + ) + } if current_block_number + self.settings.history_size < new_block_number { warn!( @@ -267,6 +278,7 @@ impl Chain

{ .context("should load full history when resetting chain")?; self.load_ops_into_block_summaries(&mut blocks).await?; self.blocks = blocks; + self.sync_error_count = 0; let mined_ops: Vec<_> = self .blocks .iter() @@ -398,6 +410,29 @@ impl Chain

{ Ok(added_blocks) } + async fn fetch_block_with_retries(&self, block_hash: B256) -> Option { + for attempt in 1..=self.settings.max_sync_retries { + match self.provider.get_block(block_hash.into()).await { + Ok(Some(block)) => return Some(block), + Ok(None) => warn!( + "Block with hash {:?} not found. Retrying... (attempt {}/{})", + block_hash, attempt, self.settings.max_sync_retries + ), + Err(err) => warn!( + "Error fetching block with hash {:?}: {}. Retrying... (attempt {}/{})", + block_hash, err, attempt, self.settings.max_sync_retries + ), + } + time::sleep(self.settings.poll_interval).await; + } + + warn!( + "Failed to fetch block with hash {:?} after {} attempts.", + block_hash, self.settings.max_sync_retries + ); + None + } + async fn load_blocks_back_to_number_no_ops( &self, head: BlockSummary, @@ -408,16 +443,19 @@ impl Chain

{ blocks.push_front(head); while blocks[0].number > min_block_number { let parent_hash = blocks[0].parent_hash; - let parent = self - .provider - .get_block(parent_hash.into()) - .await - .context("should load parent block by hash")? - .context("block with parent hash of known block should exist")?; - blocks.push_front(BlockSummary::try_from_block_without_ops( - parent, - Some(blocks[0].number - 1), - )?); + let parent = self.fetch_block_with_retries(parent_hash).await; + + if let Some(parent) = parent { + blocks.push_front(BlockSummary::try_from_block_without_ops( + parent, + Some(blocks[0].number - 1), + )?); + } else { + bail!( + "Unable to backtrack chain history beyond block number {} due to missing parent block.", + blocks[0].number + ); + } } Ok(blocks) }