diff --git a/bb8/src/api.rs b/bb8/src/api.rs index 649103b..ac82dfb 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -101,6 +101,10 @@ pub struct Statistics { pub get_timed_out: u64, /// Total time accumulated waiting for a connection. pub get_wait_time: Duration, + /// Total connections that were closed due to be in broken state. + pub connections_closed_broken: u64, + /// Total connections that were closed due to be considered invalid. + pub connections_closed_invalid: u64, } /// A builder for a connection pool. diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index bfc9c2a..e417804 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -10,7 +10,9 @@ use tokio::spawn; use tokio::time::{interval_at, sleep, timeout, Interval}; use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State}; -use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsGetKind}; +use crate::internals::{ + Approval, ApprovalIter, Conn, SharedPool, StatsConnectionClosedKind, StatsGetKind, +}; pub(crate) struct PoolInner where @@ -113,6 +115,9 @@ where match self.inner.manager.is_valid(&mut conn).await { Ok(()) => return Ok(conn), Err(e) => { + self.inner + .statistics + .record(StatsConnectionClosedKind::Invalid); self.inner.forward_error(e); conn.state = ConnectionState::Invalid; continue; @@ -149,7 +154,12 @@ where let mut locked = self.inner.internals.lock(); match (state, self.inner.manager.has_broken(&mut conn.conn)) { (ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()), - (_, _) => { + (_, is_broken) => { + if is_broken { + self.inner + .statistics + .record(StatsConnectionClosedKind::Broken); + } let approvals = locked.dropped(1, &self.inner.statics); self.spawn_replenishing_approvals(approvals); self.inner.notify.notify_waiters(); diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 8917ac9..d2dbfe4 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -256,6 +256,8 @@ pub(crate) struct AtomicStatistics { pub(crate) get_waited: AtomicU64, pub(crate) get_timed_out: AtomicU64, pub(crate) get_wait_time_micros: AtomicU64, + pub(crate) connections_closed_broken: AtomicU64, + pub(crate) connections_closed_invalid: AtomicU64, } impl AtomicStatistics { @@ -272,6 +274,17 @@ impl AtomicStatistics { .fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst); } } + + pub(crate) fn record(&self, kind: StatsConnectionClosedKind) { + match kind { + StatsConnectionClosedKind::Broken => self + .connections_closed_broken + .fetch_add(1, Ordering::SeqCst), + StatsConnectionClosedKind::Invalid => self + .connections_closed_invalid + .fetch_add(1, Ordering::SeqCst), + }; + } } impl From<&AtomicStatistics> for Statistics { @@ -281,6 +294,8 @@ impl From<&AtomicStatistics> for Statistics { get_waited: item.get_waited.load(Ordering::SeqCst), get_timed_out: item.get_timed_out.load(Ordering::SeqCst), get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)), + connections_closed_broken: item.connections_closed_broken.load(Ordering::SeqCst), + connections_closed_invalid: item.connections_closed_invalid.load(Ordering::SeqCst), } } } @@ -290,3 +305,8 @@ pub(crate) enum StatsGetKind { Waited, TimedOut, } + +pub(crate) enum StatsConnectionClosedKind { + Broken, + Invalid, +} diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 5219a1c..755a921 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -246,6 +246,7 @@ async fn test_drop_on_broken() { } assert!(DROPPED.load(Ordering::SeqCst)); + assert_eq!(pool.state().statistics.connections_closed_broken, 1); } #[tokio::test] @@ -453,6 +454,9 @@ async fn test_now_invalid() { // Now try to get a new connection. let r = pool.get().await; assert!(r.is_err()); + + // both connections in the pool were considered invalid + assert_eq!(pool.state().statistics.connections_closed_invalid, 2); } #[tokio::test]