diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs new file mode 100644 index 0000000000..8918da99f9 --- /dev/null +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/metrics.rs @@ -0,0 +1,65 @@ +use prometheus::IntGaugeVec; + +use crate::CoreMetrics; + +/// Struct encapsulating prometheus metrics used by SequenceAware and RateLimited cursors. +#[derive(Debug, Clone)] +pub struct CursorMetrics { + /// Current block of the cursor. + /// Used by both sequence aware and rate limited cursors. + /// Labels: + /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. + /// - `chain`: Chain the cursor is collecting data from. + /// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited`. + pub cursor_current_block: IntGaugeVec, + + /// Current sequence of the cursor. + /// Only used by sequence aware cursors. + /// Labels: + /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. + /// - `chain`: Chain the cursor is collecting data from. + /// - `cursor_type`: The type of cursor. E.g. `forward_sequenced`, `backward_sequenced`, `forward_rate_limited`. + pub cursor_current_sequence: IntGaugeVec, + + /// Max sequence of the cursor. + /// Only used by sequence aware cursors. + /// Labels: + /// - `event_type`: the event type the cursor is indexing. Could be anything implementing `Indexable`. + /// - `chain`: Chain the cursor is collecting data from. + pub cursor_max_sequence: IntGaugeVec, +} + +impl CursorMetrics { + /// Instantiate a new CursorMetrics object. + pub fn new(metrics: &CoreMetrics) -> Self { + let cursor_current_block = metrics + .new_int_gauge( + "cursor_current_block", + "Current block of the cursor", + &["event_type", "chain", "cursor_type"], + ) + .expect("failed to register cursor_current_block metric"); + + let cursor_current_sequence = metrics + .new_int_gauge( + "cursor_current_sequence", + "Current sequence of the cursor", + &["event_type", "chain", "cursor_type"], + ) + .expect("failed to register cursor_current_sequence metric"); + + let cursor_max_sequence = metrics + .new_int_gauge( + "cursor_max_sequence", + "Max sequence of the cursor", + &["event_type", "chain"], + ) + .expect("failed to register cursor_max_sequence metric"); + + CursorMetrics { + cursor_current_block, + cursor_current_sequence, + cursor_max_sequence, + } + } +} diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs index 563d0fcc74..2fb36453e4 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/mod.rs @@ -1,13 +1,16 @@ -pub(crate) mod sequence_aware; - use hyperlane_core::{ Delivery, HyperlaneDomainProtocol, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, }; + +pub(crate) mod sequence_aware; pub(crate) use sequence_aware::ForwardBackwardSequenceAwareSyncCursor; pub(crate) mod rate_limited; pub(crate) use rate_limited::RateLimitedContractSyncCursor; +pub(crate) mod metrics; +pub(crate) use metrics::CursorMetrics; + pub enum CursorType { SequenceAware, RateLimited, @@ -24,6 +27,8 @@ pub trait Indexable { fn broadcast_channel_size() -> Option { None } + /// Returns the name of the type for metrics. + fn name() -> &'static str; } impl Indexable for HyperlaneMessage { @@ -40,6 +45,10 @@ impl Indexable for HyperlaneMessage { fn broadcast_channel_size() -> Option { TX_ID_CHANNEL_CAPACITY } + + fn name() -> &'static str { + "hyperlane_message" + } } impl Indexable for InterchainGasPayment { @@ -51,6 +60,10 @@ impl Indexable for InterchainGasPayment { HyperlaneDomainProtocol::Cosmos => CursorType::RateLimited, } } + + fn name() -> &'static str { + "interchain_gas_payment" + } } impl Indexable for MerkleTreeInsertion { @@ -62,6 +75,10 @@ impl Indexable for MerkleTreeInsertion { HyperlaneDomainProtocol::Cosmos => CursorType::SequenceAware, } } + + fn name() -> &'static str { + "merkle_tree_insertion" + } } impl Indexable for Delivery { @@ -73,4 +90,8 @@ impl Indexable for Delivery { HyperlaneDomainProtocol::Cosmos => CursorType::RateLimited, } } + + fn name() -> &'static str { + "delivery" + } } diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs index 9428d6bfd4..88c5784824 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs @@ -8,12 +8,16 @@ use std::{ use async_trait::async_trait; use derive_new::new; use eyre::Result; + use hyperlane_core::{ - ContractSyncCursor, CursorAction, HyperlaneWatermarkedLogStore, Indexed, Indexer, LogMeta, + ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneWatermarkedLogStore, Indexed, + Indexer, LogMeta, }; use crate::contract_sync::eta_calculator::SyncerEtaCalculator; +use super::{CursorMetrics, Indexable}; + /// Time window for the moving average used in the eta calculator in seconds. const ETA_TIME_WINDOW: f64 = 2. * 60.; @@ -83,12 +87,16 @@ pub(crate) struct RateLimitedContractSyncCursor { last_tip_update: Instant, eta_calculator: SyncerEtaCalculator, sync_state: SyncState, + metrics: Arc, + domain: HyperlaneDomain, } -impl RateLimitedContractSyncCursor { +impl RateLimitedContractSyncCursor { /// Construct a new contract sync helper. pub async fn new( indexer: Arc>, + metrics: Arc, + domain: &HyperlaneDomain, store: Arc>, chunk_size: u32, initial_height: u32, @@ -107,6 +115,8 @@ impl RateLimitedContractSyncCursor { // The rate limited cursor currently only syncs in the forward direction. SyncDirection::Forward, ), + metrics, + domain: domain.to_owned(), }) } @@ -155,12 +165,24 @@ impl RateLimitedContractSyncCursor { Duration::from_secs(0) } } + + async fn update_metrics(&self) { + let latest_block = self.latest_queried_block(); + let chain_name = self.domain.name(); + // The rate limited cursor currently only syncs in the forward direction. + let label_values = &[T::name(), chain_name, "forward_rate_limited"]; + + self.metrics + .cursor_current_block + .with_label_values(label_values) + .set(latest_block as i64); + } } #[async_trait] impl ContractSyncCursor for RateLimitedContractSyncCursor where - T: Send + Sync + Debug + 'static, + T: Indexable + Send + Sync + Debug + 'static, { async fn next_action(&mut self) -> Result<(CursorAction, Duration)> { let eta = self.sync_eta(); @@ -187,6 +209,7 @@ where _: Vec<(Indexed, LogMeta)>, range: RangeInclusive, ) -> Result<()> { + self.update_metrics().await; // Store a relatively conservative view of the high watermark, which should allow a single watermark to be // safely shared across multiple cursors, so long as they are running sufficiently in sync self.store @@ -216,12 +239,13 @@ where } } -impl Debug for RateLimitedContractSyncCursor { +impl Debug for RateLimitedContractSyncCursor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RateLimitedContractSyncCursor") .field("tip", &self.tip) .field("last_tip_update", &self.last_tip_update) .field("sync_state", &self.sync_state) + .field("domain", &self.domain) .finish() } } @@ -229,50 +253,92 @@ impl Debug for RateLimitedContractSyncCursor { #[cfg(test)] pub(crate) mod test { use super::*; - use hyperlane_core::{ChainResult, HyperlaneLogStore}; + use crate::cursors::CursorType; + use hyperlane_core::{ChainResult, HyperlaneDomainProtocol, HyperlaneLogStore}; use mockall::{self, Sequence}; const CHUNK_SIZE: u32 = 10; const INITIAL_HEIGHT: u32 = 0; + #[derive(Debug, Clone)] + struct MockIndexable; + + unsafe impl Sync for MockIndexable {} + unsafe impl Send for MockIndexable {} + + impl Indexable for MockIndexable { + fn indexing_cursor(_domain: HyperlaneDomainProtocol) -> CursorType { + CursorType::RateLimited + } + + fn name() -> &'static str { + "mock_indexable" + } + } + mockall::mock! { - pub Indexer {} + pub Indexer {} - impl Debug for Indexer { + impl Debug for Indexer { fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result; } #[async_trait] - impl Indexer<()> for Indexer { - async fn fetch_logs_in_range(&self, range: RangeInclusive) -> ChainResult , LogMeta)>>; + impl Indexer for Indexer { + async fn fetch_logs_in_range(&self, range: RangeInclusive) -> ChainResult, LogMeta)>>; async fn get_finalized_block_number(&self) -> ChainResult; } } mockall::mock! { - pub Db {} + pub Db {} - impl Debug for Db { + impl Debug for Db { fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result; } #[async_trait] - impl HyperlaneLogStore<()> for Db { - async fn store_logs(&self, logs: &[(hyperlane_core::Indexed<()> , LogMeta)]) -> Result; + impl HyperlaneLogStore for Db { + async fn store_logs(&self, logs: &[(hyperlane_core::Indexed, LogMeta)]) -> Result; } #[async_trait] - impl HyperlaneWatermarkedLogStore<()> for Db { + impl HyperlaneWatermarkedLogStore for Db { async fn retrieve_high_watermark(&self) -> Result>; async fn store_high_watermark(&self, block_number: u32) -> Result<()>; } } - async fn mock_rate_limited_cursor( + fn mock_cursor_metrics() -> CursorMetrics { + CursorMetrics { + cursor_current_block: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_current_block", "Current block of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain", "cursor_type"], + ) + .unwrap(), + cursor_current_sequence: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_current_sequence", "Current sequence of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain", "cursor_type"], + ) + .unwrap(), + cursor_max_sequence: prometheus::IntGaugeVec::new( + prometheus::Opts::new("cursor_max_sequence", "Max sequence of the cursor") + .namespace("mock") + .subsystem("cursor"), + &["event_type", "chain"], + ) + .unwrap(), + } + } + async fn mock_rate_limited_cursor( custom_chain_tips: Option>, - ) -> RateLimitedContractSyncCursor<()> { + ) -> RateLimitedContractSyncCursor { let mut seq = Sequence::new(); - let mut indexer = MockIndexer::new(); + let mut indexer = MockIndexer::::new(); match custom_chain_tips { Some(chain_tips) => { for tip in chain_tips { @@ -294,11 +360,14 @@ pub(crate) mod test { } let mut db = MockDb::new(); + let metrics = mock_cursor_metrics(); db.expect_store_high_watermark().returning(|_| Ok(())); let chunk_size = CHUNK_SIZE; let initial_height = INITIAL_HEIGHT; RateLimitedContractSyncCursor::new( Arc::new(indexer), + Arc::new(metrics), + &HyperlaneDomain::new_test_domain("test"), Arc::new(db), chunk_size, initial_height, @@ -309,7 +378,7 @@ pub(crate) mod test { #[tokio::test] async fn test_next_action_retries_if_update_isnt_called() { - let mut cursor = mock_rate_limited_cursor(None).await; + let mut cursor = mock_rate_limited_cursor::(None).await; let (action_1, _) = cursor.next_action().await.unwrap(); let (_action_2, _) = cursor.next_action().await.unwrap(); @@ -319,7 +388,7 @@ pub(crate) mod test { #[tokio::test] async fn test_next_action_changes_if_update_is_called() { - let mut cursor = mock_rate_limited_cursor(None).await; + let mut cursor = mock_rate_limited_cursor::(None).await; let (action_1, _) = cursor.next_action().await.unwrap(); let range = match action_1 { @@ -336,7 +405,7 @@ pub(crate) mod test { #[tokio::test] async fn test_next_action_sleeps_if_tip_is_not_updated() { let chain_tips = vec![10]; - let mut cursor = mock_rate_limited_cursor(Some(chain_tips)).await; + let mut cursor = mock_rate_limited_cursor::(Some(chain_tips)).await; let (action, _) = cursor.next_action().await.unwrap(); assert!(matches!(action, CursorAction::Sleep(_))); } diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs index 179ae3dd3c..866ae61fc7 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs @@ -76,6 +76,11 @@ impl BackwardSequenceAwareSyncCursor { } } + /// Get the last indexed sequence or 0 if no logs have been indexed yet. + pub fn last_sequence(&self) -> u32 { + self.last_indexed_snapshot.sequence.unwrap_or(0) + } + /// Gets the next range of logs to query. /// If the cursor is fully synced, this returns None. /// Otherwise, it returns the next range to query, either by block or sequence depending on the mode. diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs index 967f4e6053..0aa8620f1c 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs @@ -88,6 +88,22 @@ impl ForwardSequenceAwareSyncCursor { } } + /// Get target sequence or return 0 if request failed + pub async fn target_sequence(&self) -> u32 { + let (count, _) = self + .latest_sequence_querier + .latest_sequence_count_and_tip() + .await + .ok() + .unwrap_or((None, 0)); + count.unwrap_or(0).saturating_sub(1) + } + + /// Get the last indexed sequence or 0 if no logs have been indexed yet. + pub fn last_sequence(&self) -> u32 { + self.last_indexed_snapshot.sequence.unwrap_or(0) + } + /// Gets the next range of logs to index. /// If there are no logs to index, returns `None`. /// If there are logs to index, returns the range of logs, either by sequence or block number diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs index 74e7ebe014..899a0be781 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs @@ -1,12 +1,13 @@ +use std::ops::RangeInclusive; use std::{fmt::Debug, sync::Arc, time::Duration}; use async_trait::async_trait; use eyre::Result; + use hyperlane_core::{ - ChainCommunicationError, ContractSyncCursor, CursorAction, + ChainCommunicationError, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneSequenceAwareIndexerStoreReader, IndexMode, Indexed, LogMeta, SequenceAwareIndexer, }; -use std::ops::RangeInclusive; mod backward; mod forward; @@ -14,6 +15,8 @@ mod forward; pub(crate) use backward::BackwardSequenceAwareSyncCursor; pub(crate) use forward::ForwardSequenceAwareSyncCursor; +use super::{CursorMetrics, Indexable}; + #[derive(Debug, Clone, PartialEq, Eq)] struct LastIndexedSnapshot { /// The last sequence that was indexed. @@ -67,11 +70,17 @@ pub(crate) struct ForwardBackwardSequenceAwareSyncCursor { forward: ForwardSequenceAwareSyncCursor, backward: BackwardSequenceAwareSyncCursor, last_direction: SyncDirection, + metrics: Arc, + domain: HyperlaneDomain, } -impl ForwardBackwardSequenceAwareSyncCursor { +impl + ForwardBackwardSequenceAwareSyncCursor +{ /// Construct a new contract sync helper. pub async fn new( + domain: &HyperlaneDomain, + metrics: Arc, latest_sequence_querier: Arc>, store: Arc>, chunk_size: u32, @@ -97,12 +106,48 @@ impl ForwardBackwardSequenceAwareSyncCursor { forward: forward_cursor, backward: backward_cursor, last_direction: SyncDirection::Forward, + metrics, + domain: domain.to_owned(), }) } + + async fn update_metrics(&self) { + let (cursor_type, latest_block, sequence) = match self.last_direction { + SyncDirection::Forward => ( + "forward_sequenced", + self.forward.latest_queried_block(), + self.forward.last_sequence(), + ), + SyncDirection::Backward => ( + "backward_sequenced", + self.backward.latest_queried_block(), + self.backward.last_sequence(), + ), + }; + + let chain_name = self.domain.name(); + let label_values = &[T::name(), chain_name, cursor_type]; + + self.metrics + .cursor_current_block + .with_label_values(label_values) + .set(latest_block as i64); + + self.metrics + .cursor_current_sequence + .with_label_values(label_values) + .set(sequence as i64); + + let max_sequence = self.forward.target_sequence().await as i64; + self.metrics + .cursor_max_sequence + .with_label_values(&[T::name(), chain_name]) + .set(max_sequence); + } } #[async_trait] -impl ContractSyncCursor +impl ContractSyncCursor for ForwardBackwardSequenceAwareSyncCursor { async fn next_action(&mut self) -> Result<(CursorAction, Duration)> { @@ -131,6 +176,7 @@ impl ContractSyncCursor logs: Vec<(Indexed, LogMeta)>, range: RangeInclusive, ) -> Result<()> { + self.update_metrics().await; match self.last_direction { SyncDirection::Forward => self.forward.update(logs, range).await, SyncDirection::Backward => self.backward.update(logs, range).await, diff --git a/rust/main/hyperlane-base/src/contract_sync/metrics.rs b/rust/main/hyperlane-base/src/contract_sync/metrics.rs index 30f3fd02bf..54e3c7a2cb 100644 --- a/rust/main/hyperlane-base/src/contract_sync/metrics.rs +++ b/rust/main/hyperlane-base/src/contract_sync/metrics.rs @@ -1,6 +1,11 @@ -use crate::CoreMetrics; +use std::sync::Arc; + use prometheus::{IntCounterVec, IntGaugeVec}; +use crate::CoreMetrics; + +use super::cursors::CursorMetrics; + /// Struct encapsulating prometheus metrics used by the ContractSync. #[derive(Debug, Clone)] pub struct ContractSyncMetrics { @@ -20,6 +25,9 @@ pub struct ContractSyncMetrics { /// See `last_known_message_nonce` in CoreMetrics. pub message_nonce: IntGaugeVec, + + /// Metrics for SequenceAware and RateLimited cursors. + pub cursor_metrics: Arc, } impl ContractSyncMetrics { @@ -42,11 +50,13 @@ impl ContractSyncMetrics { .expect("failed to register stored_events metric"); let message_nonce = metrics.last_known_message_nonce(); + let cursor_metrics = Arc::new(CursorMetrics::new(metrics)); ContractSyncMetrics { indexed_height, stored_events, message_nonce, + cursor_metrics, } } } diff --git a/rust/main/hyperlane-base/src/contract_sync/mod.rs b/rust/main/hyperlane-base/src/contract_sync/mod.rs index df9563d8a7..c9048b4808 100644 --- a/rust/main/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/mod.rs @@ -312,6 +312,8 @@ where Ok(Box::new( RateLimitedContractSyncCursor::new( Arc::new(self.indexer.clone()), + self.metrics.cursor_metrics.clone(), + self.domain(), self.store.clone(), index_settings.chunk_size, index_settings.from, @@ -352,6 +354,8 @@ where ) -> Result>> { Ok(Box::new( ForwardBackwardSequenceAwareSyncCursor::new( + self.domain(), + self.metrics.cursor_metrics.clone(), self.indexer.clone(), Arc::new(self.store.clone()), index_settings.chunk_size,