Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds new statistics attributes for reaped connections #206

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ pub struct Statistics {
pub connections_closed_broken: u64,
/// Total connections that were closed due to be considered invalid.
pub connections_closed_invalid: u64,
/// Total connections that were closed because they reached the max
/// lifetime.
pub connections_closed_max_lifetime: u64,
/// Total connections that were closed because they reached the max
/// idle timeout.
pub connections_closed_idle_timeout: u64,
}

/// A builder for a connection pool.
Expand Down
45 changes: 40 additions & 5 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ where

pub(crate) fn reap(&self) -> ApprovalIter {
let mut locked = self.internals.lock();
locked.reap(&self.statics)
let (iter, closed_idle_timeout, closed_max_lifetime) = locked.reap(&self.statics);
drop(locked);
self.statistics
.record_connections_reaped(closed_idle_timeout, closed_max_lifetime);
iter
}

pub(crate) fn forward_error(&self, err: M::Error) {
Expand Down Expand Up @@ -139,22 +143,34 @@ where
ApprovalIter { num: num as usize }
}

pub(crate) fn reap(&mut self, config: &Builder<M>) -> ApprovalIter {
pub(crate) fn reap(&mut self, config: &Builder<M>) -> (ApprovalIter, u64, u64) {
let mut closed_max_lifetime = 0;
let mut closed_idle_timeout = 0;
let now = Instant::now();
let before = self.conns.len();

self.conns.retain(|conn| {
let mut keep = true;
if let Some(timeout) = config.idle_timeout {
keep &= now - conn.idle_start < timeout;
if now - conn.idle_start >= timeout {
closed_idle_timeout += 1;
keep &= false;
}
}
if let Some(lifetime) = config.max_lifetime {
keep &= now - conn.conn.birth < lifetime;
if now - conn.conn.birth >= lifetime {
closed_max_lifetime += 1;
keep &= false;
}
}
keep
});

self.dropped((before - self.conns.len()) as u32, config)
(
self.dropped((before - self.conns.len()) as u32, config),
closed_idle_timeout,
closed_max_lifetime,
)
}

pub(crate) fn state(&self, statistics: Statistics) -> State {
Expand Down Expand Up @@ -258,6 +274,8 @@ pub(crate) struct AtomicStatistics {
pub(crate) get_wait_time_micros: AtomicU64,
pub(crate) connections_closed_broken: AtomicU64,
pub(crate) connections_closed_invalid: AtomicU64,
pub(crate) connections_closed_max_lifetime: AtomicU64,
pub(crate) connections_closed_idle_timeout: AtomicU64,
}

impl AtomicStatistics {
Expand All @@ -282,6 +300,17 @@ impl AtomicStatistics {
}
.fetch_add(1, Ordering::SeqCst);
}

pub(crate) fn record_connections_reaped(
&self,
closed_idle_timeout: u64,
closed_max_lifetime: u64,
) {
self.connections_closed_idle_timeout
.fetch_add(closed_idle_timeout, Ordering::SeqCst);
self.connections_closed_max_lifetime
.fetch_add(closed_max_lifetime, Ordering::SeqCst);
}
}

impl From<&AtomicStatistics> for Statistics {
Expand All @@ -293,6 +322,12 @@ impl From<&AtomicStatistics> for Statistics {
get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)),
connections_closed_broken: item.connections_closed_broken.load(Ordering::SeqCst),
connections_closed_invalid: item.connections_closed_invalid.load(Ordering::SeqCst),
connections_closed_max_lifetime: item
.connections_closed_max_lifetime
.load(Ordering::SeqCst),
connections_closed_idle_timeout: item
.connections_closed_idle_timeout
.load(Ordering::SeqCst),
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,65 @@ async fn test_now_invalid() {
assert_eq!(pool.state().statistics.connections_closed_invalid, 2);
}

#[tokio::test]
async fn test_idle_timeout() {
static DROPPED: AtomicUsize = AtomicUsize::new(0);

#[derive(Default)]
struct Connection;
impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

let manager = NthConnectionFailManager::<Connection>::new(5);
let pool = Pool::builder()
.idle_timeout(Some(Duration::from_secs(1)))
.connection_timeout(Duration::from_secs(1))
.reaper_rate(Duration::from_secs(1))
.max_size(5)
.min_idle(Some(5))
.build(manager)
.await
.unwrap();

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let conn = clone.get().await.unwrap();
tx1.send(()).unwrap();
// NB: If we sleep here we'll block this thread's event loop, and the
// reaper can't run.
let _ = rx2
.map(|r| match r {
Ok(v) => Ok((v, conn)),
Err(_) => Err((Error, conn)),
})
.await;
});

rx1.await.unwrap();

// And wait.
assert!(timeout(Duration::from_secs(2), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 4);

tx2.send(()).unwrap();

// And wait some more.
assert!(timeout(Duration::from_secs(3), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 idle connections were closed due to max idle time
assert_eq!(pool.state().statistics.connections_closed_idle_timeout, 5);
}

#[tokio::test]
async fn test_max_lifetime() {
static DROPPED: AtomicUsize = AtomicUsize::new(0);
Expand Down Expand Up @@ -513,6 +572,9 @@ async fn test_max_lifetime() {
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 connections were closed due to max lifetime
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5);
}

#[tokio::test]
Expand Down
Loading