Skip to content

Commit

Permalink
Add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Nov 10, 2023
1 parent 29592e7 commit 8b2f969
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 3 deletions.
13 changes: 11 additions & 2 deletions storage_broker/src/bin/storage_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use tracing::*;
use utils::signals::ShutdownSignals;

use metrics::{Encoder, TextEncoder};
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
use storage_broker::metrics::{
BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
};
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use storage_broker::proto::{
Expand Down Expand Up @@ -378,6 +381,8 @@ impl Registry {

// Send msg to relevant subscribers.
pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
PROCESSED_MESSAGES_TOTAL.inc();

// send message to subscribers for everything
let shared_state = self.shared_state.read();
// Err means there is no subscribers, it is fine.
Expand Down Expand Up @@ -493,8 +498,10 @@ impl BrokerService for Broker {
Message::SafekeeperTimelineInfo(info) => yield info,
_ => {},
}
BROADCASTED_MESSAGES_TOTAL.inc();
},
Err(RecvError::Lagged(skipped_msg)) => {
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
Expand Down Expand Up @@ -551,9 +558,11 @@ impl BrokerService for Broker {
let msg_type = msg.message_type() as i32;
if types_set.contains(&msg_type) {
yield msg.as_typed_message();
BROADCASTED_MESSAGES_TOTAL.inc();
}
},
Err(RecvError::Lagged(skipped_msg)) => {
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
missed_msgs += skipped_msg;
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
Expand All @@ -579,8 +588,8 @@ impl BrokerService for Broker {
&self,
request: Request<TypedMessage>,
) -> std::result::Result<Response<()>, Status> {
// TODO: update metrics?
let msg = Message::from(request.into_inner())?;
PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
self.registry.send_msg(&msg)?;
Ok(Response::new(()))
}
Expand Down
34 changes: 33 additions & 1 deletion storage_broker/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Broker metrics.
use metrics::{register_int_gauge, IntGauge};
use metrics::{register_int_counter, register_int_gauge, IntCounter, IntGauge};
use once_cell::sync::Lazy;

pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| {
Expand All @@ -23,3 +23,35 @@ pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
)
.expect("Failed to register metric")
});

pub static PROCESSED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_processed_messages_total",
"Number of messages received by storage broker, before routing and broadcasting"
)
.expect("Failed to register metric")
});

pub static BROADCASTED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_broadcasted_messages_total",
"Number of messages broadcasted to subscribers"
)
.expect("Failed to register metric")
});

pub static BROADCAST_DROPPED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_broadcast_dropped_messages_total",
"Number of messages dropped due to channel capacity overflow"
)
.expect("Failed to register metric")
});

pub static PUBLISHED_ONEOFF_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"storage_broker_published_oneoff_messages_total",
"Number of one-off messages published to subscribers"
)
.expect("Failed to register metric")
});

0 comments on commit 8b2f969

Please sign in to comment.