Skip to content

Commit

Permalink
Provide statistics of gets and contention
Browse files Browse the repository at this point in the history
  • Loading branch information
pfreixes authored and djc committed Jun 5, 2024
1 parent 3190c75 commit 8c1450c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 12 deletions.
14 changes: 14 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ pub struct State {
pub connections: u32,
/// The number of idle connections.
pub idle_connections: u32,
/// Statistics about the historical usage of the pool.
pub statistics: Statistics,
}

/// Statistics about the historical usage of the `Pool`.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct Statistics {
/// Total gets performed that did not have to wait for a connection.
pub get_direct: u64,
/// Total gets performed that had to wait for a connection available.
pub get_waited: u64,
/// Total gets performed that timed out while waiting for a connection.
pub get_timed_out: u64,
}

/// A builder for a connection pool.
Expand Down
22 changes: 17 additions & 5 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, StatsKind};

pub(crate) struct PoolInner<M>
where
Expand Down Expand Up @@ -85,6 +85,8 @@ where
}

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let mut kind = StatsKind::Direct;

let future = async {
loop {
let (conn, approvals) = self.inner.pop();
Expand All @@ -96,6 +98,7 @@ where
let mut conn = match conn {
Some(conn) => PooledConnection::new(self, conn),
None => {
kind = StatsKind::Waited;
self.inner.notify.notified().await;
continue;
}
Expand All @@ -116,10 +119,16 @@ where
}
};

match timeout(self.inner.statics.connection_timeout, future).await {
let result = match timeout(self.inner.statics.connection_timeout, future).await {
Ok(result) => result,
_ => Err(RunError::TimedOut),
}
_ => {
kind = StatsKind::TimedOut;
Err(RunError::TimedOut)
}
};

self.inner.statistics.record(kind);
result
}

pub(crate) async fn connect(&self) -> Result<M::Connection, M::Error> {
Expand Down Expand Up @@ -148,7 +157,10 @@ where

/// Returns information about the current state of the pool.
pub(crate) fn state(&self) -> State {
(&*self.inner.internals.lock()).into()
self.inner
.internals
.lock()
.state((&self.inner.statistics).into())
}

// Outside of Pool to avoid borrow splitting issues on self
Expand Down
48 changes: 41 additions & 7 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::cmp::min;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;

use crate::{api::QueueStrategy, lock::Mutex};
use tokio::sync::Notify;

use crate::api::{Builder, ManageConnection, State};
use std::collections::VecDeque;
use crate::api::{Builder, ManageConnection, QueueStrategy, State, Statistics};
use crate::lock::Mutex;

/// The guts of a `Pool`.
#[allow(missing_debug_implementations)]
Expand All @@ -18,6 +19,7 @@ where
pub(crate) manager: M,
pub(crate) internals: Mutex<PoolInternals<M>>,
pub(crate) notify: Arc<Notify>,
pub(crate) statistics: AtomicStatistics,
}

impl<M> SharedPool<M>
Expand All @@ -30,6 +32,7 @@ where
manager,
internals: Mutex::new(PoolInternals::default()),
notify: Arc::new(Notify::new()),
statistics: AtomicStatistics::default(),
}
}

Expand Down Expand Up @@ -153,14 +156,12 @@ where

self.dropped((before - self.conns.len()) as u32, config)
}
}

#[allow(clippy::from_over_into)] // Keep this more private with the internal type
impl<M: ManageConnection> Into<State> for &PoolInternals<M> {
fn into(self) -> State {
pub(crate) fn state(&self, statistics: Statistics) -> State {
State {
connections: self.num_conns,
idle_connections: self.conns.len() as u32,
statistics,
}
}
}
Expand Down Expand Up @@ -248,3 +249,36 @@ impl<C: Send> From<Conn<C>> for IdleConn<C> {
}
}
}

#[derive(Default)]
pub(crate) struct AtomicStatistics {
pub(crate) get_direct: AtomicU64,
pub(crate) get_waited: AtomicU64,
pub(crate) get_timed_out: AtomicU64,
}

impl From<&AtomicStatistics> for Statistics {
fn from(item: &AtomicStatistics) -> Self {
Self {
get_direct: item.get_direct.load(Ordering::SeqCst),
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
}
}
}

impl AtomicStatistics {
pub(crate) fn record(&self, kind: StatsKind) {
match kind {
StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst),
};
}
}

pub(crate) enum StatsKind {
Direct,
Waited,
TimedOut,
}
40 changes: 40 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,3 +885,43 @@ async fn test_broken_connections_dont_starve_pool() {
future.await.unwrap();
}
}

#[tokio::test]
async fn test_state_get_contention() {
let pool = Pool::builder()
.max_size(1)
.min_idle(1)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let conn = clone.get().await.unwrap();
tx1.send(()).unwrap();
let _ = rx2
.then(|r| match r {
Ok(v) => ok((v, conn)),
Err(_) => err((Error, conn)),
})
.await;
});

// Get the first connection.
rx1.await.unwrap();

// Now try to get a new connection without waiting.
let f = pool.get();

// Release the first connection.
tx2.send(()).unwrap();

// Wait for the second attempt to get a connection.
f.await.unwrap();

let statistics = pool.state().statistics;
assert_eq!(statistics.get_direct, 1);
assert_eq!(statistics.get_waited, 1);
}

0 comments on commit 8c1450c

Please sign in to comment.