diff --git a/Cargo.lock b/Cargo.lock index 6a320a0..b6e7218 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1786,6 +1786,8 @@ dependencies = [ "async-mutex", "async-trait", "blake3", + "cadence", + "cadence-macros", "figment", "futures", "log", diff --git a/plerkle/src/geyser_plugin_nft.rs b/plerkle/src/geyser_plugin_nft.rs index 8257bfb..2e8ff7d 100644 --- a/plerkle/src/geyser_plugin_nft.rs +++ b/plerkle/src/geyser_plugin_nft.rs @@ -665,8 +665,7 @@ impl GeyserPlugin for Plerkle<'static> { } metric! { - let slt_idx = format!("{}-{}", slot, transaction_info.index); - statsd_count!("transaction_seen_event", 1, "slot-idx" => &slt_idx); + statsd_count!("transaction_seen_event", 1); } Ok(()) } diff --git a/plerkle_messenger/Cargo.toml b/plerkle_messenger/Cargo.toml index cda73a1..36c0893 100644 --- a/plerkle_messenger/Cargo.toml +++ b/plerkle_messenger/Cargo.toml @@ -18,6 +18,8 @@ futures = "0.3" async-mutex = "1.4.0" serde = {version = "1.0.137", features = ["derive"] } blake3 = "1.3.3" +cadence = "0.29.0" +cadence-macros = "0.29.0" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/plerkle_messenger/src/lib.rs b/plerkle_messenger/src/lib.rs index 13c040e..e8fb5fb 100644 --- a/plerkle_messenger/src/lib.rs +++ b/plerkle_messenger/src/lib.rs @@ -2,6 +2,7 @@ pub mod redis_messenger; mod error; +mod metrics; mod plerkle_messenger; pub use crate::{error::*, plerkle_messenger::*}; diff --git a/plerkle_messenger/src/metrics.rs b/plerkle_messenger/src/metrics.rs new file mode 100644 index 0000000..5dda5c0 --- /dev/null +++ b/plerkle_messenger/src/metrics.rs @@ -0,0 +1,10 @@ +#[macro_export] +macro_rules! metric { + {$($block:stmt;)*} => { + if cadence_macros::is_global_default_set() { + $( + $block + )* + } + }; +} diff --git a/plerkle_messenger/src/redis_messenger.rs b/plerkle_messenger/src/redis_messenger.rs index 207e13a..9eaec6e 100644 --- a/plerkle_messenger/src/redis_messenger.rs +++ b/plerkle_messenger/src/redis_messenger.rs @@ -1,8 +1,10 @@ use crate::{ - error::MessengerError, ConsumptionType, Messenger, MessengerConfig, MessengerType, RecvData, + error::MessengerError, metric, ConsumptionType, Messenger, MessengerConfig, MessengerType, + RecvData, }; use async_trait::async_trait; +use cadence_macros::statsd_count; use log::*; use redis::{ aio::ConnectionManager, @@ -76,10 +78,8 @@ impl RedisMessenger { .await .map_err(|e| MessengerError::AutoclaimError { msg: e.to_string() })?; - id = result.0; let range_reply = result.1; - - if id == "0-0" || range_reply.ids.is_empty() { + if range_reply.ids.is_empty() { // We've reached the end of the PEL. return Ok(Vec::new()); } @@ -116,13 +116,13 @@ impl RedisMessenger { let info = if let Some(info) = pending.get(&id) { info } else { - println!("No pending info for ID {id}"); + warn!("No pending info for ID {id}"); continue; }; let data = if let Some(data) = map.get(DATA_KEY) { data } else { - println!("No Data was stored in Redis for ID {id}"); + info!("No Data was stored in Redis for ID {id}"); continue; }; // Get data from map. @@ -130,12 +130,15 @@ impl RedisMessenger { let bytes = match data { Value::Data(bytes) => bytes, _ => { - println!("Redis data for ID {id} in wrong format"); + error!("Redis data for ID {id} in wrong format"); continue; } }; if info.times_delivered > self.retries { + metric! { + statsd_count!("plerkle.messenger.retries.exceeded", 1); + } error!("Message has reached maximum retries {} for id", id); ack_list.push(id.clone()); continue; diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 7cbcbb0..31578d3 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.64.0" \ No newline at end of file +channel = "stable" \ No newline at end of file