Skip to content

Commit

Permalink
fix: potential rate-limiting bugfix (#3191)
Browse files Browse the repository at this point in the history
### Description

<!--
What's included in this PR?
-->

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

- Fixes #2586

### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

<!--
What kind of testing have these changes undergone?

None/Manual/Unit Tests
-->
  • Loading branch information
daniel-savu authored Jan 30, 2024
1 parent d0190ec commit 9e61465
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions rust/hyperlane-base/src/contract_sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};
use std::{
collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration,
};

use cursor::*;
use derive_new::new;
Expand All @@ -9,14 +11,16 @@ use hyperlane_core::{
};
pub use metrics::ContractSyncMetrics;
use tokio::time::sleep;
use tracing::{debug, info};
use tracing::{debug, info, warn};

use crate::settings::IndexSettings;

mod cursor;
mod eta_calculator;
mod metrics;

const SLEEP_DURATION: Duration = Duration::from_secs(5);

/// Entity that drives the syncing of an agent's db with on-chain data.
/// Extracts chain-specific data (emitted checkpoints, messages, etc) from an
/// `indexer` and fills the agent's db with this data.
Expand Down Expand Up @@ -60,13 +64,23 @@ where
loop {
indexed_height.set(cursor.latest_block() as i64);
let Ok((action, eta)) = cursor.next_action().await else {
sleep(SLEEP_DURATION).await;
continue;
};
match action {
CursorAction::Query(range) => {
let sleep_duration = match action {
// Use `loop` but always break - this allows for returning a value
// from the loop (the sleep duration)
#[allow(clippy::never_loop)]
CursorAction::Query(range) => loop {
debug!(?range, "Looking for for events in index range");

let logs = self.indexer.fetch_logs(range.clone()).await?;
let logs = match self.indexer.fetch_logs(range.clone()).await {
Ok(logs) => logs,
Err(err) => {
warn!(?err, "Failed to fetch logs");
break SLEEP_DURATION;
}
};
let deduped_logs = HashSet::<_>::from_iter(logs);
let logs = Vec::from_iter(deduped_logs);

Expand All @@ -77,16 +91,25 @@ where
"Found log(s) in index range"
);
// Store deliveries
let stored = self.db.store_logs(&logs).await?;
let stored = match self.db.store_logs(&logs).await {
Ok(stored) => stored,
Err(err) => {
warn!(?err, "Failed to store logs in db");
break SLEEP_DURATION;
}
};
// Report amount of deliveries stored into db
stored_logs.inc_by(stored as u64);
// Update cursor
cursor.update(logs).await?;
}
CursorAction::Sleep(duration) => {
sleep(duration).await;
}
}
if let Err(err) = cursor.update(logs).await {
warn!(?err, "Failed to store logs in db");
break SLEEP_DURATION;
};
break Default::default();
},
CursorAction::Sleep(duration) => duration,
};
sleep(sleep_duration).await;
}
}
}
Expand Down

0 comments on commit 9e61465

Please sign in to comment.