From df962337c18661de623178941f73d2ff82e268b1 Mon Sep 17 00:00:00 2001 From: 0xfourzerofour Date: Mon, 2 Dec 2024 11:34:56 -0500 Subject: [PATCH] feat(chain): add a sync error count to reset in edge cases --- crates/pool/src/chain.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/pool/src/chain.rs b/crates/pool/src/chain.rs index 0940e062e..63c2737ea 100644 --- a/crates/pool/src/chain.rs +++ b/crates/pool/src/chain.rs @@ -19,7 +19,7 @@ use std::{ use alloy_primitives::{Address, B256, U256}; use alloy_sol_types::SolEvent; -use anyhow::{ensure, Context}; +use anyhow::{bail, ensure, Context}; use futures::future; use metrics::{Counter, Gauge}; use metrics_derive::Metrics; @@ -44,6 +44,7 @@ use tokio::{ use tracing::{info, warn}; const MAX_LOAD_OPS_CONCURRENCY: usize = 64; +const SYNC_ERROR_COUNT_MAX: usize = 50; /// A data structure that holds the currently known recent state of the chain, /// with logic for updating itself and returning what has changed. @@ -59,6 +60,7 @@ pub(crate) struct Chain { blocks: VecDeque, /// Semaphore to limit the number of concurrent `eth_getLogs` calls. load_ops_semaphore: Semaphore, + sync_error_count: usize, /// Filter template. filter_template: Filter, /// Metrics of chain events. @@ -170,6 +172,7 @@ impl Chain

{ provider, settings, blocks: VecDeque::new(), + sync_error_count: 0, load_ops_semaphore: Semaphore::new(MAX_LOAD_OPS_CONCURRENCY), filter_template, metrics: ChainMetrics::default(), @@ -240,10 +243,17 @@ impl Chain

{ }; let current_block_number = current_block.number; let new_block_number = new_head.number; - ensure!( - current_block_number < new_block_number + self.settings.history_size, + + if self.sync_error_count >= SYNC_ERROR_COUNT_MAX { + return self.reset_and_initialize(new_head).await; + } + + if current_block_number < new_block_number + self.settings.history_size { + self.sync_error_count += 1; + bail!( "new block number {new_block_number} should be greater than start of history (current block: {current_block_number})" - ); + ) + } if current_block_number + self.settings.history_size < new_block_number { warn!( @@ -267,6 +277,7 @@ impl Chain

{ .context("should load full history when resetting chain")?; self.load_ops_into_block_summaries(&mut blocks).await?; self.blocks = blocks; + self.sync_error_count = 0; let mined_ops: Vec<_> = self .blocks .iter()