Skip to content

Commit

Permalink
Reap expired connections on drop
Browse files Browse the repository at this point in the history
The reaper only runs against the connections in its idle pool. This is
fine for reaping idle connections, but for hotly contested connections
beyond their maximum lifetime this can prove problematic.

Consider an active connection beyond its lifetime and a reaper that runs
every 3 seconds:
- [t0] Connection is idle
- [t1] Connection is active
- [t2] Reaper runs, does not see connection
- [t3] Connection is idle

This pattern can repeat infinitely with the connection never being reaped.

By checking the max lifetime on drop, we can ensure that expired
connections are reaped in a reason amount of time (assuming they
eventually do get dropped).
  • Loading branch information
tneely committed Oct 16, 2024
1 parent 4528b77 commit 341432d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
10 changes: 8 additions & 2 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,18 @@ where
"handled in caller"
);

let max_lifetime = self.inner.statics.max_lifetime;
let is_expired = max_lifetime.map_or(false, |lt| conn.is_expired(Instant::now() - lt));
let is_broken = self.inner.manager.has_broken(&mut conn.conn);

let mut locked = self.inner.internals.lock();
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
match (state, is_broken || is_expired) {
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
(_, is_broken) => {
_ => {
if is_broken {
self.inner.statistics.record(StatsKind::ClosedBroken);
} else if is_expired {
self.inner.statistics.record_connections_reaped(0, 1);
}
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
Expand Down
35 changes: 35 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bb8::*;
use tokio::time::sleep;

use std::future::Future;
use std::marker::PhantomData;
Expand Down Expand Up @@ -585,6 +586,40 @@ async fn test_max_lifetime() {
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5);
}

#[tokio::test]
async fn test_max_lifetime_reap_on_drop() {
static DROPPED: AtomicUsize = AtomicUsize::new(0);

#[derive(Default)]
struct Connection;

impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

let manager = OkManager::<Connection>::new();
let pool = Pool::builder()
.max_lifetime(Some(Duration::from_secs(1)))
.connection_timeout(Duration::from_secs(1))
.reaper_rate(Duration::from_secs(999))
.build(manager)
.await
.unwrap();

let conn = pool.get().await;

// And wait.
sleep(Duration::from_secs(2)).await;
assert_eq!(DROPPED.load(Ordering::SeqCst), 0);

// Connection is reaped on drop.
drop(conn);
assert_eq!(DROPPED.load(Ordering::SeqCst), 1);
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 1);
}

#[tokio::test]
async fn test_min_idle() {
let pool = Pool::builder()
Expand Down

0 comments on commit 341432d

Please sign in to comment.