diff --git a/bb8/src/api.rs b/bb8/src/api.rs index 649103b..a74459c 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -101,6 +101,16 @@ pub struct Statistics { pub get_timed_out: u64, /// Total time accumulated waiting for a connection. pub get_wait_time: Duration, + /// Total connections that were closed due to be in broken state. + pub connections_closed_broken: u64, + /// Total connections that were closed due to be considered invalid. + pub connections_closed_invalid: u64, + /// Total connections that were closed because they reached the max + /// lifetime. + pub connections_closed_max_lifetime: u64, + /// Total connections that were closed because they reached the max + /// idle timeout. + pub connections_closed_idle_timeout: u64, } /// A builder for a connection pool. diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 3c17e25..5de7c22 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -10,7 +10,9 @@ use tokio::spawn; use tokio::time::{interval_at, sleep, timeout, Interval}; use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State}; -use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsKind}; +use crate::internals::{ + Approval, ApprovalIter, Conn, SharedPool, StatsConnectionClosedKind, StatsGetKind, +}; pub(crate) struct PoolInner where @@ -85,7 +87,7 @@ where } pub(crate) async fn get(&self) -> Result, RunError> { - let mut kind = StatsKind::Direct; + let mut kind = StatsGetKind::Direct; let mut wait_time_start = None; let future = async { @@ -100,7 +102,7 @@ where Some(conn) => PooledConnection::new(self, conn), None => { wait_time_start = Some(Instant::now()); - kind = StatsKind::Waited; + kind = StatsGetKind::Waited; self.inner.notify.notified().await; continue; } @@ -114,6 +116,9 @@ where Ok(()) => return Ok(conn), Err(e) => { self.inner.forward_error(e); + self.inner + .statistics + .record_connection_closed(StatsConnectionClosedKind::Invalid); conn.state = ConnectionState::Invalid; continue; } @@ -124,7 +129,7 @@ where let result = match timeout(self.inner.statics.connection_timeout, future).await { Ok(result) => result, _ => { - kind = StatsKind::TimedOut; + kind = StatsGetKind::TimedOut; Err(RunError::TimedOut) } }; @@ -149,7 +154,12 @@ where let mut locked = self.inner.internals.lock(); match (state, self.inner.manager.has_broken(&mut conn.conn)) { (ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()), - (_, _) => { + (_, is_broken) => { + if is_broken { + self.inner + .statistics + .record_connection_closed(StatsConnectionClosedKind::Broken); + } let approvals = locked.dropped(1, &self.inner.statics); self.spawn_replenishing_approvals(approvals); self.inner.notify.notify_waiters(); diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 95266c5..415d28e 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -49,7 +49,11 @@ where pub(crate) fn reap(&self) -> ApprovalIter { let mut locked = self.internals.lock(); - locked.reap(&self.statics) + let (iter, closed_idle_timeout, closed_max_lifetime) = locked.reap(&self.statics); + drop(locked); + self.statistics + .record_connections_reaped(closed_idle_timeout, closed_max_lifetime); + iter } pub(crate) fn forward_error(&self, err: M::Error) { @@ -139,22 +143,34 @@ where ApprovalIter { num: num as usize } } - pub(crate) fn reap(&mut self, config: &Builder) -> ApprovalIter { + pub(crate) fn reap(&mut self, config: &Builder) -> (ApprovalIter, u64, u64) { + let mut closed_max_lifetime = 0; + let mut closed_idle_timeout = 0; let now = Instant::now(); let before = self.conns.len(); self.conns.retain(|conn| { let mut keep = true; if let Some(timeout) = config.idle_timeout { - keep &= now - conn.idle_start < timeout; + if now - conn.idle_start >= timeout { + closed_idle_timeout += 1; + keep &= false; + } } if let Some(lifetime) = config.max_lifetime { - keep &= now - conn.conn.birth < lifetime; + if now - conn.conn.birth >= lifetime { + closed_max_lifetime += 1; + keep &= false; + } } keep }); - self.dropped((before - self.conns.len()) as u32, config) + ( + self.dropped((before - self.conns.len()) as u32, config), + closed_idle_timeout, + closed_max_lifetime, + ) } pub(crate) fn state(&self, statistics: Statistics) -> State { @@ -256,14 +272,18 @@ pub(crate) struct AtomicStatistics { pub(crate) get_waited: AtomicU64, pub(crate) get_timed_out: AtomicU64, pub(crate) get_wait_time_micros: AtomicU64, + pub(crate) connections_closed_broken: AtomicU64, + pub(crate) connections_closed_invalid: AtomicU64, + pub(crate) connections_closed_max_lifetime: AtomicU64, + pub(crate) connections_closed_idle_timeout: AtomicU64, } impl AtomicStatistics { - pub(crate) fn record_get(&self, kind: StatsKind, wait_time_start: Option) { + pub(crate) fn record_get(&self, kind: StatsGetKind, wait_time_start: Option) { match kind { - StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst), - StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst), - StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst), + StatsGetKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst), + StatsGetKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst), + StatsGetKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst), }; if let Some(wait_time_start) = wait_time_start { @@ -272,6 +292,28 @@ impl AtomicStatistics { .fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst); } } + + pub(crate) fn record_connection_closed(&self, kind: StatsConnectionClosedKind) { + match kind { + StatsConnectionClosedKind::Broken => self + .connections_closed_broken + .fetch_add(1, Ordering::SeqCst), + StatsConnectionClosedKind::Invalid => self + .connections_closed_invalid + .fetch_add(1, Ordering::SeqCst), + }; + } + + pub(crate) fn record_connections_reaped( + &self, + closed_idle_timeout: u64, + closed_max_lifetime: u64, + ) { + self.connections_closed_idle_timeout + .fetch_add(closed_idle_timeout, Ordering::SeqCst); + self.connections_closed_max_lifetime + .fetch_add(closed_max_lifetime, Ordering::SeqCst); + } } impl From<&AtomicStatistics> for Statistics { @@ -281,12 +323,25 @@ impl From<&AtomicStatistics> for Statistics { get_waited: item.get_waited.load(Ordering::SeqCst), get_timed_out: item.get_timed_out.load(Ordering::SeqCst), get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)), + connections_closed_broken: item.connections_closed_broken.load(Ordering::SeqCst), + connections_closed_invalid: item.connections_closed_invalid.load(Ordering::SeqCst), + connections_closed_max_lifetime: item + .connections_closed_max_lifetime + .load(Ordering::SeqCst), + connections_closed_idle_timeout: item + .connections_closed_idle_timeout + .load(Ordering::SeqCst), } } } -pub(crate) enum StatsKind { +pub(crate) enum StatsGetKind { Direct, Waited, TimedOut, } + +pub(crate) enum StatsConnectionClosedKind { + Broken, + Invalid, +} diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 5219a1c..f0db198 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -246,6 +246,7 @@ async fn test_drop_on_broken() { } assert!(DROPPED.load(Ordering::SeqCst)); + assert_eq!(pool.state().statistics.connections_closed_broken, 1); } #[tokio::test] @@ -453,6 +454,68 @@ async fn test_now_invalid() { // Now try to get a new connection. let r = pool.get().await; assert!(r.is_err()); + + // both connections in the pool were considered invalid + assert_eq!(pool.state().statistics.connections_closed_invalid, 2); +} + +#[tokio::test] +async fn test_idle_timeout() { + static DROPPED: AtomicUsize = AtomicUsize::new(0); + + #[derive(Default)] + struct Connection; + impl Drop for Connection { + fn drop(&mut self) { + DROPPED.fetch_add(1, Ordering::SeqCst); + } + } + + let manager = NthConnectionFailManager::::new(5); + let pool = Pool::builder() + .idle_timeout(Some(Duration::from_secs(1))) + .connection_timeout(Duration::from_secs(1)) + .reaper_rate(Duration::from_secs(1)) + .max_size(5) + .min_idle(Some(5)) + .build(manager) + .await + .unwrap(); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let clone = pool.clone(); + tokio::spawn(async move { + let conn = clone.get().await.unwrap(); + tx1.send(()).unwrap(); + // NB: If we sleep here we'll block this thread's event loop, and the + // reaper can't run. + let _ = rx2 + .map(|r| match r { + Ok(v) => Ok((v, conn)), + Err(_) => Err((Error, conn)), + }) + .await; + }); + + rx1.await.unwrap(); + + // And wait. + assert!(timeout(Duration::from_secs(2), pending::<()>()) + .await + .is_err()); + assert_eq!(DROPPED.load(Ordering::SeqCst), 4); + + tx2.send(()).unwrap(); + + // And wait some more. + assert!(timeout(Duration::from_secs(3), pending::<()>()) + .await + .is_err()); + assert_eq!(DROPPED.load(Ordering::SeqCst), 5); + + // all 5 idle connections were closed due to max idle time + assert_eq!(pool.state().statistics.connections_closed_idle_timeout, 5); } #[tokio::test] @@ -509,6 +572,9 @@ async fn test_max_lifetime() { .await .is_err()); assert_eq!(DROPPED.load(Ordering::SeqCst), 5); + + // all 5 connections were closed due to max lifetime + assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5); } #[tokio::test]