Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batcher queue len and queue size in bytes metrics #1593

Open
wants to merge 5 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::time::{timeout, Instant};
use types::batch_state::BatchState;
use types::user_state::UserState;

use batch_queue::calculate_batch_size;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
Expand Down Expand Up @@ -1043,10 +1044,13 @@ impl Batcher {
BatchQueueEntryPriority::new(max_fee, nonce),
);

info!(
"Current batch queue length: {}",
batch_state_lock.batch_queue.len()
);
// Update metrics
let queue_len = batch_state_lock.batch_queue.len();
let queue_size_bytes = calculate_batch_size(&batch_state_lock.batch_queue)?;
self.metrics
.update_queue_metrics(queue_len as i64, queue_size_bytes as i64);

info!("Current batch queue length: {}", queue_len);

let mut proof_submitter_addr = proof_submitter_addr;

Expand Down Expand Up @@ -1226,6 +1230,13 @@ impl Batcher {
))?;
}

// Update metrics
let queue_len = batch_state_lock.batch_queue.len();
let queue_size_bytes = calculate_batch_size(&batch_state_lock.batch_queue)?;

self.metrics
.update_queue_metrics(queue_len as i64, queue_size_bytes as i64);

Ok(())
}

Expand Down Expand Up @@ -1373,6 +1384,8 @@ impl Batcher {
batch_state_lock
.user_states
.insert(nonpaying_replacement_addr, nonpaying_user_state);

self.metrics.update_queue_metrics(0, 0);
}

/// Receives new block numbers, checks if conditions are met for submission and
Expand Down
16 changes: 16 additions & 0 deletions batcher/aligned-batcher/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct BatcherMetrics {
pub batcher_started: IntCounter,
pub gas_price_used_on_latest_batch: IntGauge,
pub broken_ws_connections: IntCounter,
pub queue_len: IntGauge,
pub queue_size_bytes: IntGauge,
pub s3_duration: IntGauge,
pub create_new_task_duration: IntGauge,
pub cancel_create_new_task_duration: IntGauge,
Expand Down Expand Up @@ -49,6 +51,11 @@ impl BatcherMetrics {
"broken_ws_connections_count",
"Broken websocket connections"
))?;
let queue_len = register_int_gauge!(opts!("queue_len", "Amount of proofs in the queue"))?;
let queue_size_bytes = register_int_gauge!(opts!(
"queue_size_bytes",
"Accumulated size in bytes of all proofs in the queue"
))?;
let s3_duration = register_int_gauge!(opts!("s3_duration", "S3 Duration"))?;
let create_new_task_duration = register_int_gauge!(opts!(
"create_new_task_duration",
Expand All @@ -68,6 +75,8 @@ impl BatcherMetrics {
registry.register(Box::new(gas_price_used_on_latest_batch.clone()))?;
registry.register(Box::new(batcher_started.clone()))?;
registry.register(Box::new(broken_ws_connections.clone()))?;
registry.register(Box::new(queue_len.clone()))?;
registry.register(Box::new(queue_size_bytes.clone()))?;
registry.register(Box::new(s3_duration.clone()))?;
registry.register(Box::new(create_new_task_duration.clone()))?;
registry.register(Box::new(cancel_create_new_task_duration.clone()))?;
Expand All @@ -92,6 +101,8 @@ impl BatcherMetrics {
batcher_started,
gas_price_used_on_latest_batch,
broken_ws_connections,
queue_len,
queue_size_bytes,
s3_duration,
create_new_task_duration,
cancel_create_new_task_duration,
Expand Down Expand Up @@ -124,4 +135,9 @@ impl BatcherMetrics {
pub fn user_error(&self, label_values: &[&str]) {
self.user_errors.with_label_values(label_values).inc();
}

pub fn update_queue_metrics(&self, queue_len: i64, queue_size: i64) {
self.queue_len.set(queue_len);
self.queue_size_bytes.set(queue_size);
}
}
Loading
Loading