Skip to content

Commit

Permalink
Add new get wait time statistic (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
pfreixes authored Jun 10, 2024
1 parent 276235a commit d5f6bfa
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 3 deletions.
2 changes: 2 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub struct Statistics {
pub get_waited: u64,
/// Total gets performed that timed out while waiting for a connection.
pub get_timed_out: u64,
/// Total time accumulated waiting for a connection.
pub get_wait_time: Duration,
}

/// A builder for a connection pool.
Expand Down
4 changes: 3 additions & 1 deletion bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ where

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let mut kind = StatsKind::Direct;
let mut wait_time_start = None;

let future = async {
loop {
Expand All @@ -98,6 +99,7 @@ where
let mut conn = match conn {
Some(conn) => PooledConnection::new(self, conn),
None => {
wait_time_start = Some(Instant::now());
kind = StatsKind::Waited;
self.inner.notify.notified().await;
continue;
Expand Down Expand Up @@ -127,7 +129,7 @@ where
}
};

self.inner.statistics.record(kind);
self.inner.statistics.record_get(kind, wait_time_start);
result
}

Expand Down
12 changes: 10 additions & 2 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::cmp::min;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use tokio::sync::Notify;

Expand Down Expand Up @@ -255,15 +255,22 @@ pub(crate) struct AtomicStatistics {
pub(crate) get_direct: AtomicU64,
pub(crate) get_waited: AtomicU64,
pub(crate) get_timed_out: AtomicU64,
pub(crate) get_wait_time_micros: AtomicU64,
}

impl AtomicStatistics {
pub(crate) fn record(&self, kind: StatsKind) {
pub(crate) fn record_get(&self, kind: StatsKind, wait_time_start: Option<Instant>) {
match kind {
StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst),
};

if let Some(wait_time_start) = wait_time_start {
let wait_time = Instant::now() - wait_time_start;
self.get_wait_time_micros
.fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst);
}
}
}

Expand All @@ -273,6 +280,7 @@ impl From<&AtomicStatistics> for Statistics {
get_direct: item.get_direct.load(Ordering::SeqCst),
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,4 +924,5 @@ async fn test_state_get_contention() {
let statistics = pool.state().statistics;
assert_eq!(statistics.get_direct, 1);
assert_eq!(statistics.get_waited, 1);
assert!(statistics.get_wait_time > Duration::from_micros(0));
}

0 comments on commit d5f6bfa

Please sign in to comment.