Skip to content

Commit

Permalink
Adds new statistics attributes for tracking connections closed
Browse files Browse the repository at this point in the history
The two new attributes `connections_closed_broken` and
`connections_closed_invalid` can be used for respectively understand how
many conections were closed due to be considered broken or invalid.
  • Loading branch information
pfreixes committed Jun 12, 2024
1 parent c9e8947 commit 2b787ac
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 2 deletions.
4 changes: 4 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ pub struct Statistics {
pub get_timed_out: u64,
/// Total time accumulated waiting for a connection.
pub get_wait_time: Duration,
/// Total connections that were closed due to be in broken state.
pub connections_closed_broken: u64,
/// Total connections that were closed due to be considered invalid.
pub connections_closed_invalid: u64,
}

/// A builder for a connection pool.
Expand Down
14 changes: 12 additions & 2 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsGetKind};
use crate::internals::{
Approval, ApprovalIter, Conn, SharedPool, StatsConnectionClosedKind, StatsGetKind,
};

pub(crate) struct PoolInner<M>
where
Expand Down Expand Up @@ -113,6 +115,9 @@ where
match self.inner.manager.is_valid(&mut conn).await {
Ok(()) => return Ok(conn),
Err(e) => {
self.inner
.statistics
.record(StatsConnectionClosedKind::Invalid);
self.inner.forward_error(e);
conn.state = ConnectionState::Invalid;
continue;
Expand Down Expand Up @@ -149,7 +154,12 @@ where
let mut locked = self.inner.internals.lock();
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
(_, _) => {
(_, is_broken) => {
if is_broken {
self.inner
.statistics
.record(StatsConnectionClosedKind::Broken);
}
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
self.inner.notify.notify_waiters();
Expand Down
20 changes: 20 additions & 0 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ pub(crate) struct AtomicStatistics {
pub(crate) get_waited: AtomicU64,
pub(crate) get_timed_out: AtomicU64,
pub(crate) get_wait_time_micros: AtomicU64,
pub(crate) connections_closed_broken: AtomicU64,
pub(crate) connections_closed_invalid: AtomicU64,
}

impl AtomicStatistics {
Expand All @@ -272,6 +274,17 @@ impl AtomicStatistics {
.fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst);
}
}

pub(crate) fn record(&self, kind: StatsConnectionClosedKind) {
match kind {
StatsConnectionClosedKind::Broken => self
.connections_closed_broken
.fetch_add(1, Ordering::SeqCst),
StatsConnectionClosedKind::Invalid => self
.connections_closed_invalid
.fetch_add(1, Ordering::SeqCst),
};
}
}

impl From<&AtomicStatistics> for Statistics {
Expand All @@ -281,6 +294,8 @@ impl From<&AtomicStatistics> for Statistics {
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)),
connections_closed_broken: item.connections_closed_broken.load(Ordering::SeqCst),
connections_closed_invalid: item.connections_closed_invalid.load(Ordering::SeqCst),
}
}
}
Expand All @@ -290,3 +305,8 @@ pub(crate) enum StatsGetKind {
Waited,
TimedOut,
}

pub(crate) enum StatsConnectionClosedKind {
Broken,
Invalid,
}
4 changes: 4 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ async fn test_drop_on_broken() {
}

assert!(DROPPED.load(Ordering::SeqCst));
assert_eq!(pool.state().statistics.connections_closed_broken, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -453,6 +454,9 @@ async fn test_now_invalid() {
// Now try to get a new connection.
let r = pool.get().await;
assert!(r.is_err());

// both connections in the pool were considered invalid
assert_eq!(pool.state().statistics.connections_closed_invalid, 2);
}

#[tokio::test]
Expand Down

0 comments on commit 2b787ac

Please sign in to comment.