Skip to content

Commit

Permalink
Add Pool::add
Browse files Browse the repository at this point in the history
Fixes #212

This adds `Pool::add`, which allows for externally created
connections to be added and managed by the pool. If the pool
is at maximum capacity when this method is called, it will
return the input connection as part of the Err response.

I considered allowing `Pool:add` to ignore `max_size` when
adding to the pool, but felt it could lead to confusion if
the pool is allowed to exceed its capacity in this specific
case.

This change required making PoolInternals::approvals visible
within the crate to get the approval needed to add a new
connection. The alternative would have required defining a
new pub(crate) method for this specific use case, which feels
worse. I'm open to suggestions on how to more cleanly integrate
this change into the package.
  • Loading branch information
tneely authored and djc committed Aug 28, 2024
1 parent 408fe7b commit 593a154
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 2 deletions.
35 changes: 35 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ impl<M: ManageConnection> Pool<M> {
pub fn state(&self) -> State {
self.inner.state()
}

/// Adds a connection to the pool.
///
/// If the connection is broken, or the pool is at capacity, the
/// connection is not added and instead returned to the caller in Err.
pub fn add(&self, conn: M::Connection) -> Result<(), AddError<M::Connection>> {
self.inner.try_put(conn)
}
}

/// Information about the state of a `Pool`.
Expand Down Expand Up @@ -526,6 +534,33 @@ where
}
}

/// Error type returned by `Pool::add(conn)`
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AddError<C> {
/// The connection was broken before it could be added.
Broken(C),
/// Unable to add the connection to the pool due to insufficient capacity.
NoCapacity(C),
}

impl<E: error::Error + 'static> fmt::Display for AddError<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
AddError::Broken(_) => write!(f, "The connection was broken before it could be added"),
AddError::NoCapacity(_) => write!(
f,
"Unable to add the connection to the pool due to insufficient capacity"
),
}
}
}

impl<E: error::Error + 'static> error::Error for AddError<E> {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
None
}
}

/// A trait to receive errors generated by connection management that aren't
/// tied to any particular caller.
pub trait ErrorSink<E>: fmt::Debug + Send + Sync + 'static {
Expand Down
13 changes: 12 additions & 1 deletion bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use futures_util::TryFutureExt;
use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
use crate::api::{
AddError, Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State,
};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsGetKind, StatsKind};

pub(crate) struct PoolInner<M>
Expand Down Expand Up @@ -161,6 +163,15 @@ where
}
}

/// Adds an external connection to the pool if there is capacity for it.
pub(crate) fn try_put(&self, mut conn: M::Connection) -> Result<(), AddError<M::Connection>> {
if self.inner.manager.has_broken(&mut conn) {
Err(AddError::Broken(conn))
} else {
self.inner.try_put(conn).map_err(AddError::NoCapacity)
}
}

/// Returns information about the current state of the pool.
pub(crate) fn state(&self) -> State {
self.inner
Expand Down
11 changes: 11 additions & 0 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ where
(conn, approvals)
}

pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
let mut locked = self.internals.lock();
let mut approvals = locked.approvals(&self.statics, 1);
let Some(approval) = approvals.next() else {
return Err(conn);
};
let conn = Conn::new(conn);
locked.put(conn, Some(approval), self.clone());
Ok(())
}

pub(crate) fn reap(&self) -> ApprovalIter {
let mut locked = self.internals.lock();
let (iter, closed_idle_timeout, closed_max_lifetime) = locked.reap(&self.statics);
Expand Down
2 changes: 1 addition & 1 deletion bb8/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

mod api;
pub use api::{
Builder, CustomizeConnection, ErrorSink, ManageConnection, NopErrorSink, Pool,
AddError, Builder, CustomizeConnection, ErrorSink, ManageConnection, NopErrorSink, Pool,
PooledConnection, QueueStrategy, RunError, State, Statistics,
};

Expand Down
48 changes: 48 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,3 +1020,51 @@ async fn test_statistics_connections_created() {

assert_eq!(pool.state().statistics.connections_created, 1);
}

#[tokio::test]
async fn test_can_use_added_connections() {
let pool = Pool::builder()
.connection_timeout(Duration::from_millis(1))
.build_unchecked(NthConnectionFailManager::<FakeConnection>::new(0));

// Assert pool can't replenish connections on its own
let res = pool.get().await;
assert_eq!(res.unwrap_err(), RunError::TimedOut);

pool.add(FakeConnection).unwrap();
let res = pool.get().await;
assert!(res.is_ok());
}

#[tokio::test]
async fn test_add_ok_until_max_size() {
let pool = Pool::builder()
.min_idle(1)
.max_size(3)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();

for _ in 0..2 {
let conn = pool.dedicated_connection().await.unwrap();
pool.add(conn).unwrap();
}

let conn = pool.dedicated_connection().await.unwrap();
let res = pool.add(conn);
assert!(matches!(res, Err(AddError::NoCapacity(_))));
}

#[tokio::test]
async fn test_add_checks_broken_connections() {
let pool = Pool::builder()
.min_idle(1)
.max_size(3)
.build(BrokenConnectionManager::<FakeConnection>::default())
.await
.unwrap();

let conn = pool.dedicated_connection().await.unwrap();
let res = pool.add(conn);
assert!(matches!(res, Err(AddError::Broken(_))));
}

0 comments on commit 593a154

Please sign in to comment.