Skip to content

Commit

Permalink
Reap expired connections on drop
Browse files Browse the repository at this point in the history
The reaper only runs against the connections in its idle pool. This is
fine for reaping idle connections, but for hotly contested connections
beyond their maximum lifetime this can prove problematic.

Consider an active connection beyond its lifetime and a reaper that runs
every 3 seconds:
- [t0] Connection is idle
- [t1] Connection is active
- [t2] Reaper runs, does not see connection
- [t3] Connection is idle

This pattern can repeat infinitely with the connection never being reaped.

By checking the max lifetime on drop, we can ensure that expired
connections are reaped in a reason amount of time (assuming they
eventually do get dropped).
  • Loading branch information
tneely authored and djc committed Oct 17, 2024
1 parent a24f49b commit 8210b0e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
12 changes: 10 additions & 2 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,20 @@ where
"handled in caller"
);

let is_broken = self.inner.manager.has_broken(&mut conn.conn);
let is_expired = match self.inner.statics.max_lifetime {
Some(lt) => conn.is_expired(Instant::now(), lt),
None => false,
};

let mut locked = self.inner.internals.lock();
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
match (state, is_broken || is_expired) {
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
(_, is_broken) => {
_ => {
if is_broken {
self.inner.statistics.record(StatsKind::ClosedBroken);
} else if is_expired {
self.inner.statistics.record_connections_reaped(0, 1);
}
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
Expand Down
37 changes: 36 additions & 1 deletion bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::{error, fmt};
use async_trait::async_trait;
use futures_util::future::{err, lazy, ok, pending, ready, try_join_all, FutureExt};
use futures_util::stream::{FuturesUnordered, TryStreamExt};
use tokio::{sync::oneshot, time::timeout};
use tokio::sync::oneshot;
use tokio::time::{sleep, timeout};

#[derive(Debug, PartialEq, Eq)]
pub struct Error;
Expand Down Expand Up @@ -585,6 +586,40 @@ async fn test_max_lifetime() {
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5);
}

#[tokio::test]
async fn test_max_lifetime_reap_on_drop() {
static DROPPED: AtomicUsize = AtomicUsize::new(0);

#[derive(Default)]
struct Connection;

impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

let manager = OkManager::<Connection>::new();
let pool = Pool::builder()
.max_lifetime(Some(Duration::from_secs(1)))
.connection_timeout(Duration::from_secs(1))
.reaper_rate(Duration::from_secs(999))
.build(manager)
.await
.unwrap();

let conn = pool.get().await;

// And wait.
sleep(Duration::from_secs(2)).await;
assert_eq!(DROPPED.load(Ordering::SeqCst), 0);

// Connection is reaped on drop.
drop(conn);
assert_eq!(DROPPED.load(Ordering::SeqCst), 1);
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 1);
}

#[tokio::test]
async fn test_min_idle() {
let pool = Pool::builder()
Expand Down

0 comments on commit 8210b0e

Please sign in to comment.