Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new statistics for knowing when and why connections were closed #204

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ pub struct Statistics {
pub get_timed_out: u64,
/// Total time accumulated waiting for a connection.
pub get_wait_time: Duration,
/// Total connections that were closed due to be in broken state.
pub connections_closed_broken: u64,
/// Total connections that were closed due to be considered invalid.
pub connections_closed_invalid: u64,
/// Total connections that were closed because they reached the max
/// lifetime.
pub connections_closed_max_lifetime: u64,
/// Total connections that were closed because they reached the max
/// idle timeout.
pub connections_closed_idle_timeout: u64,
}

/// A builder for a connection pool.
Expand Down
20 changes: 15 additions & 5 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsKind};
use crate::internals::{
Approval, ApprovalIter, Conn, SharedPool, StatsConnectionClosedKind, StatsGetKind,
};

pub(crate) struct PoolInner<M>
where
Expand Down Expand Up @@ -85,7 +87,7 @@ where
}

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let mut kind = StatsKind::Direct;
let mut kind = StatsGetKind::Direct;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put this change in a separate commit or PR.

let mut wait_time_start = None;

let future = async {
Expand All @@ -100,7 +102,7 @@ where
Some(conn) => PooledConnection::new(self, conn),
None => {
wait_time_start = Some(Instant::now());
kind = StatsKind::Waited;
kind = StatsGetKind::Waited;
self.inner.notify.notified().await;
continue;
}
Expand All @@ -114,6 +116,9 @@ where
Ok(()) => return Ok(conn),
Err(e) => {
self.inner.forward_error(e);
self.inner
.statistics
.record_connection_closed(StatsConnectionClosedKind::Invalid);
conn.state = ConnectionState::Invalid;
continue;
}
Expand All @@ -124,7 +129,7 @@ where
let result = match timeout(self.inner.statics.connection_timeout, future).await {
Ok(result) => result,
_ => {
kind = StatsKind::TimedOut;
kind = StatsGetKind::TimedOut;
Err(RunError::TimedOut)
}
};
Expand All @@ -149,7 +154,12 @@ where
let mut locked = self.inner.internals.lock();
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
(_, _) => {
(_, is_broken) => {
if is_broken {
self.inner
.statistics
.record_connection_closed(StatsConnectionClosedKind::Broken);
}
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
self.inner.notify.notify_waiters();
Expand Down
75 changes: 65 additions & 10 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ where

pub(crate) fn reap(&self) -> ApprovalIter {
let mut locked = self.internals.lock();
locked.reap(&self.statics)
let (iter, closed_idle_timeout, closed_max_lifetime) = locked.reap(&self.statics);
drop(locked);
self.statistics
.record_connections_reaped(closed_idle_timeout, closed_max_lifetime);
iter
}

pub(crate) fn forward_error(&self, err: M::Error) {
Expand Down Expand Up @@ -139,22 +143,34 @@ where
ApprovalIter { num: num as usize }
}

pub(crate) fn reap(&mut self, config: &Builder<M>) -> ApprovalIter {
pub(crate) fn reap(&mut self, config: &Builder<M>) -> (ApprovalIter, u64, u64) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's split the reap() changes out in a separate PR.

let mut closed_max_lifetime = 0;
let mut closed_idle_timeout = 0;
let now = Instant::now();
let before = self.conns.len();

self.conns.retain(|conn| {
let mut keep = true;
if let Some(timeout) = config.idle_timeout {
keep &= now - conn.idle_start < timeout;
if now - conn.idle_start >= timeout {
closed_idle_timeout += 1;
keep &= false;
}
}
if let Some(lifetime) = config.max_lifetime {
keep &= now - conn.conn.birth < lifetime;
if now - conn.conn.birth >= lifetime {
closed_max_lifetime += 1;
keep &= false;
}
}
keep
});

self.dropped((before - self.conns.len()) as u32, config)
(
self.dropped((before - self.conns.len()) as u32, config),
closed_idle_timeout,
closed_max_lifetime,
)
}

pub(crate) fn state(&self, statistics: Statistics) -> State {
Expand Down Expand Up @@ -256,14 +272,18 @@ pub(crate) struct AtomicStatistics {
pub(crate) get_waited: AtomicU64,
pub(crate) get_timed_out: AtomicU64,
pub(crate) get_wait_time_micros: AtomicU64,
pub(crate) connections_closed_broken: AtomicU64,
pub(crate) connections_closed_invalid: AtomicU64,
pub(crate) connections_closed_max_lifetime: AtomicU64,
pub(crate) connections_closed_idle_timeout: AtomicU64,
}

impl AtomicStatistics {
pub(crate) fn record_get(&self, kind: StatsKind, wait_time_start: Option<Instant>) {
pub(crate) fn record_get(&self, kind: StatsGetKind, wait_time_start: Option<Instant>) {
match kind {
StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst),
StatsGetKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
StatsGetKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
StatsGetKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst),
};

if let Some(wait_time_start) = wait_time_start {
Expand All @@ -272,6 +292,28 @@ impl AtomicStatistics {
.fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst);
}
}

pub(crate) fn record_connection_closed(&self, kind: StatsConnectionClosedKind) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can just be named record().

match kind {
StatsConnectionClosedKind::Broken => self
.connections_closed_broken
.fetch_add(1, Ordering::SeqCst),
StatsConnectionClosedKind::Invalid => self
.connections_closed_invalid
.fetch_add(1, Ordering::SeqCst),
};
}

pub(crate) fn record_connections_reaped(
&self,
closed_idle_timeout: u64,
closed_max_lifetime: u64,
) {
self.connections_closed_idle_timeout
.fetch_add(closed_idle_timeout, Ordering::SeqCst);
self.connections_closed_max_lifetime
.fetch_add(closed_max_lifetime, Ordering::SeqCst);
}
}

impl From<&AtomicStatistics> for Statistics {
Expand All @@ -281,12 +323,25 @@ impl From<&AtomicStatistics> for Statistics {
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)),
connections_closed_broken: item.connections_closed_broken.load(Ordering::SeqCst),
connections_closed_invalid: item.connections_closed_invalid.load(Ordering::SeqCst),
connections_closed_max_lifetime: item
.connections_closed_max_lifetime
.load(Ordering::SeqCst),
connections_closed_idle_timeout: item
.connections_closed_idle_timeout
.load(Ordering::SeqCst),
}
}
}

pub(crate) enum StatsKind {
pub(crate) enum StatsGetKind {
Direct,
Waited,
TimedOut,
}

pub(crate) enum StatsConnectionClosedKind {
Broken,
Invalid,
}
66 changes: 66 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ async fn test_drop_on_broken() {
}

assert!(DROPPED.load(Ordering::SeqCst));
assert_eq!(pool.state().statistics.connections_closed_broken, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -453,6 +454,68 @@ async fn test_now_invalid() {
// Now try to get a new connection.
let r = pool.get().await;
assert!(r.is_err());

// both connections in the pool were considered invalid
assert_eq!(pool.state().statistics.connections_closed_invalid, 2);
}

#[tokio::test]
async fn test_idle_timeout() {
static DROPPED: AtomicUsize = AtomicUsize::new(0);

#[derive(Default)]
struct Connection;
impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

let manager = NthConnectionFailManager::<Connection>::new(5);
let pool = Pool::builder()
.idle_timeout(Some(Duration::from_secs(1)))
.connection_timeout(Duration::from_secs(1))
.reaper_rate(Duration::from_secs(1))
.max_size(5)
.min_idle(Some(5))
.build(manager)
.await
.unwrap();

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let conn = clone.get().await.unwrap();
tx1.send(()).unwrap();
// NB: If we sleep here we'll block this thread's event loop, and the
// reaper can't run.
let _ = rx2
.map(|r| match r {
Ok(v) => Ok((v, conn)),
Err(_) => Err((Error, conn)),
})
.await;
});

rx1.await.unwrap();

// And wait.
assert!(timeout(Duration::from_secs(2), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 4);

tx2.send(()).unwrap();

// And wait some more.
assert!(timeout(Duration::from_secs(3), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 idle connections were closed due to max idle time
assert_eq!(pool.state().statistics.connections_closed_idle_timeout, 5);
}

#[tokio::test]
Expand Down Expand Up @@ -509,6 +572,9 @@ async fn test_max_lifetime() {
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 connections were closed due to max lifetime
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5);
}

#[tokio::test]
Expand Down
Loading