Skip to content

Commit

Permalink
feat(chain): add a sync error count to reset in edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
0xfourzerofour committed Dec 2, 2024
1 parent d283ce1 commit df96233
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions crates/pool/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{

use alloy_primitives::{Address, B256, U256};
use alloy_sol_types::SolEvent;
use anyhow::{ensure, Context};
use anyhow::{bail, ensure, Context};
use futures::future;
use metrics::{Counter, Gauge};
use metrics_derive::Metrics;
Expand All @@ -44,6 +44,7 @@ use tokio::{
use tracing::{info, warn};

const MAX_LOAD_OPS_CONCURRENCY: usize = 64;
const SYNC_ERROR_COUNT_MAX: usize = 50;

/// A data structure that holds the currently known recent state of the chain,
/// with logic for updating itself and returning what has changed.
Expand All @@ -59,6 +60,7 @@ pub(crate) struct Chain<P: EvmProvider> {
blocks: VecDeque<BlockSummary>,
/// Semaphore to limit the number of concurrent `eth_getLogs` calls.
load_ops_semaphore: Semaphore,
sync_error_count: usize,
/// Filter template.
filter_template: Filter,
/// Metrics of chain events.
Expand Down Expand Up @@ -170,6 +172,7 @@ impl<P: EvmProvider> Chain<P> {
provider,
settings,
blocks: VecDeque::new(),
sync_error_count: 0,
load_ops_semaphore: Semaphore::new(MAX_LOAD_OPS_CONCURRENCY),
filter_template,
metrics: ChainMetrics::default(),
Expand Down Expand Up @@ -240,10 +243,17 @@ impl<P: EvmProvider> Chain<P> {
};
let current_block_number = current_block.number;
let new_block_number = new_head.number;
ensure!(
current_block_number < new_block_number + self.settings.history_size,

if self.sync_error_count >= SYNC_ERROR_COUNT_MAX {
return self.reset_and_initialize(new_head).await;
}

if current_block_number < new_block_number + self.settings.history_size {
self.sync_error_count += 1;
bail!(
"new block number {new_block_number} should be greater than start of history (current block: {current_block_number})"
);
)
}

if current_block_number + self.settings.history_size < new_block_number {
warn!(
Expand All @@ -267,6 +277,7 @@ impl<P: EvmProvider> Chain<P> {
.context("should load full history when resetting chain")?;
self.load_ops_into_block_summaries(&mut blocks).await?;
self.blocks = blocks;
self.sync_error_count = 0;
let mined_ops: Vec<_> = self
.blocks
.iter()
Expand Down

0 comments on commit df96233

Please sign in to comment.