Skip to content

Commit

Permalink
Add new statistics for knowing when and why connections were closed
Browse files Browse the repository at this point in the history
We add 4 new statistics which will be used by the users for knowing when
the connections were closed and what was the reason, the new four
statistics are:

- `connections_closed_broken`: Total connections that were closed due to be in broken state.
- `connections_closed_invalid`: Total connections that were closed due to be considered invalid.
- `connections_closed_max_lifetime`: Total connections that were closed
  because they reached the max lifetime.
- `connections_closed_idle_timeout`: Total connections that were closed
  because they reached the idle timeout.
  • Loading branch information
pfreixes committed Jun 10, 2024
1 parent d5f6bfa commit dfa5feb
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 15 deletions.
10 changes: 10 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ pub struct Statistics {
pub get_timed_out: u64,
/// Total time accumulated waiting for a connection.
pub get_wait_time: Duration,
/// Total connections that were closed due to be in broken state.
pub connections_closed_broken: u64,
/// Total connections that were closed due to be considered invalid.
pub connections_closed_invalid: u64,
/// Total connections that were closed because they reached the max
/// lifetime.
pub connections_closed_max_lifetime: u64,
/// Total connections that were closed because they reached the max
/// idle timeout.
pub connections_closed_idle_timeout: u64,
}

/// A builder for a connection pool.
Expand Down
20 changes: 15 additions & 5 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsKind};
use crate::internals::{
Approval, ApprovalIter, Conn, SharedPool, StatsConnectionClosedKind, StatsGetKind,
};

pub(crate) struct PoolInner<M>
where
Expand Down Expand Up @@ -85,7 +87,7 @@ where
}

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let mut kind = StatsKind::Direct;
let mut kind = StatsGetKind::Direct;
let mut wait_time_start = None;

let future = async {
Expand All @@ -100,7 +102,7 @@ where
Some(conn) => PooledConnection::new(self, conn),
None => {
wait_time_start = Some(Instant::now());
kind = StatsKind::Waited;
kind = StatsGetKind::Waited;
self.inner.notify.notified().await;
continue;
}
Expand All @@ -114,6 +116,9 @@ where
Ok(()) => return Ok(conn),
Err(e) => {
self.inner.forward_error(e);
self.inner
.statistics
.record_connection_closed(StatsConnectionClosedKind::Invalid);
conn.state = ConnectionState::Invalid;
continue;
}
Expand All @@ -124,7 +129,7 @@ where
let result = match timeout(self.inner.statics.connection_timeout, future).await {
Ok(result) => result,
_ => {
kind = StatsKind::TimedOut;
kind = StatsGetKind::TimedOut;
Err(RunError::TimedOut)
}
};
Expand All @@ -149,7 +154,12 @@ where
let mut locked = self.inner.internals.lock();
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
(_, _) => {
(_, is_broken) => {
if is_broken {
self.inner
.statistics
.record_connection_closed(StatsConnectionClosedKind::Broken);
}
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
self.inner.notify.notify_waiters();
Expand Down
75 changes: 65 additions & 10 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ where

pub(crate) fn reap(&self) -> ApprovalIter {
let mut locked = self.internals.lock();
locked.reap(&self.statics)
let (iter, closed_idle_timeout, closed_max_lifetime) = locked.reap(&self.statics);
drop(locked);
self.statistics
.record_connections_reaped(closed_idle_timeout, closed_max_lifetime);
iter
}

pub(crate) fn forward_error(&self, err: M::Error) {
Expand Down Expand Up @@ -139,22 +143,34 @@ where
ApprovalIter { num: num as usize }
}

pub(crate) fn reap(&mut self, config: &Builder<M>) -> ApprovalIter {
pub(crate) fn reap(&mut self, config: &Builder<M>) -> (ApprovalIter, u64, u64) {
let mut closed_max_lifetime = 0;
let mut closed_idle_timeout = 0;
let now = Instant::now();
let before = self.conns.len();

self.conns.retain(|conn| {
let mut keep = true;
if let Some(timeout) = config.idle_timeout {
keep &= now - conn.idle_start < timeout;
if now - conn.idle_start >= timeout {
closed_idle_timeout += 1;
keep &= false;
}
}
if let Some(lifetime) = config.max_lifetime {
keep &= now - conn.conn.birth < lifetime;
if now - conn.conn.birth >= lifetime {
closed_max_lifetime += 1;
keep &= false;
}
}
keep
});

self.dropped((before - self.conns.len()) as u32, config)
(
self.dropped((before - self.conns.len()) as u32, config),
closed_idle_timeout,
closed_max_lifetime,
)
}

pub(crate) fn state(&self, statistics: Statistics) -> State {
Expand Down Expand Up @@ -256,14 +272,18 @@ pub(crate) struct AtomicStatistics {
pub(crate) get_waited: AtomicU64,
pub(crate) get_timed_out: AtomicU64,
pub(crate) get_wait_time_micros: AtomicU64,
pub(crate) connections_closed_broken: AtomicU64,
pub(crate) connections_closed_invalid: AtomicU64,
pub(crate) connections_closed_max_lifetime: AtomicU64,
pub(crate) connections_closed_idle_timeout: AtomicU64,
}

impl AtomicStatistics {
pub(crate) fn record_get(&self, kind: StatsKind, wait_time_start: Option<Instant>) {
pub(crate) fn record_get(&self, kind: StatsGetKind, wait_time_start: Option<Instant>) {
match kind {
StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst),
StatsGetKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
StatsGetKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
StatsGetKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst),
};

if let Some(wait_time_start) = wait_time_start {
Expand All @@ -272,6 +292,28 @@ impl AtomicStatistics {
.fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst);
}
}

pub(crate) fn record_connection_closed(&self, kind: StatsConnectionClosedKind) {
match kind {
StatsConnectionClosedKind::Broken => self
.connections_closed_broken
.fetch_add(1, Ordering::SeqCst),
StatsConnectionClosedKind::Invalid => self
.connections_closed_invalid
.fetch_add(1, Ordering::SeqCst),
};
}

pub(crate) fn record_connections_reaped(
&self,
closed_idle_timeout: u64,
closed_max_lifetime: u64,
) {
self.connections_closed_idle_timeout
.fetch_add(closed_idle_timeout, Ordering::SeqCst);
self.connections_closed_max_lifetime
.fetch_add(closed_max_lifetime, Ordering::SeqCst);
}
}

impl From<&AtomicStatistics> for Statistics {
Expand All @@ -281,12 +323,25 @@ impl From<&AtomicStatistics> for Statistics {
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)),
connections_closed_broken: item.connections_closed_broken.load(Ordering::SeqCst),
connections_closed_invalid: item.connections_closed_invalid.load(Ordering::SeqCst),
connections_closed_max_lifetime: item
.connections_closed_max_lifetime
.load(Ordering::SeqCst),
connections_closed_idle_timeout: item
.connections_closed_idle_timeout
.load(Ordering::SeqCst),
}
}
}

pub(crate) enum StatsKind {
pub(crate) enum StatsGetKind {
Direct,
Waited,
TimedOut,
}

pub(crate) enum StatsConnectionClosedKind {
Broken,
Invalid,
}
66 changes: 66 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ async fn test_drop_on_broken() {
}

assert!(DROPPED.load(Ordering::SeqCst));
assert_eq!(pool.state().statistics.connections_closed_broken, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -453,6 +454,68 @@ async fn test_now_invalid() {
// Now try to get a new connection.
let r = pool.get().await;
assert!(r.is_err());

// both connections in the pool were considered invalid
assert_eq!(pool.state().statistics.connections_closed_invalid, 2);
}

#[tokio::test]
async fn test_idle_timeout() {
static DROPPED: AtomicUsize = AtomicUsize::new(0);

#[derive(Default)]
struct Connection;
impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

let manager = NthConnectionFailManager::<Connection>::new(5);
let pool = Pool::builder()
.idle_timeout(Some(Duration::from_secs(1)))
.connection_timeout(Duration::from_secs(1))
.reaper_rate(Duration::from_secs(1))
.max_size(5)
.min_idle(Some(5))
.build(manager)
.await
.unwrap();

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let conn = clone.get().await.unwrap();
tx1.send(()).unwrap();
// NB: If we sleep here we'll block this thread's event loop, and the
// reaper can't run.
let _ = rx2
.map(|r| match r {
Ok(v) => Ok((v, conn)),
Err(_) => Err((Error, conn)),
})
.await;
});

rx1.await.unwrap();

// And wait.
assert!(timeout(Duration::from_secs(2), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 4);

tx2.send(()).unwrap();

// And wait some more.
assert!(timeout(Duration::from_secs(3), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 idle connections were closed due to max idle time
assert_eq!(pool.state().statistics.connections_closed_idle_timeout, 5);
}

#[tokio::test]
Expand Down Expand Up @@ -509,6 +572,9 @@ async fn test_max_lifetime() {
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 connections were closed due to max lifetime
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5);
}

#[tokio::test]
Expand Down

0 comments on commit dfa5feb

Please sign in to comment.