Skip to content

Commit

Permalink
Try to merge State with Statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
pfreixes committed May 31, 2024
1 parent 9b92366 commit 9f89588
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 40 deletions.
20 changes: 14 additions & 6 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::time::Duration;
use async_trait::async_trait;

use crate::inner::PoolInner;
pub use crate::inner::Statistics;
use crate::internals::Conn;

/// A generic connection pool.
Expand Down Expand Up @@ -45,11 +44,6 @@ impl<M: ManageConnection> Pool<M> {
Builder::new()
}

/// Returns statistics about the historical usage of the pool.
pub fn statistics(&self) -> Statistics {
self.inner.statistics()
}

/// Retrieves a connection from the pool.
pub async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
self.inner.get().await
Expand Down Expand Up @@ -91,6 +85,20 @@ pub struct State {
pub connections: u32,
/// The number of idle connections.
pub idle_connections: u32,
/// Statistics about the historical usage of the pool.
pub statistics: Statistics,
}

/// Statistics about the historical usage of the `Pool`.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct Statistics {
/// Total gets performed that did not have to wait for a connection.
pub get_direct: u64,
/// Total gets performed that had to wait for a connection available.
pub get_waited: u64,
/// Total gets performed that timed out while waiting for a connection.
pub get_timed_out: u64,
}

/// A builder for a connection pool.
Expand Down
35 changes: 4 additions & 31 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::cmp::{max, min};
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

Expand All @@ -11,7 +10,7 @@ use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
use crate::internals::{Approval, ApprovalIter, AtomicStatistics, Conn, GetKind, SharedPool};
use crate::internals::{Approval, ApprovalIter, Conn, GetKind, SharedPool};

pub(crate) struct PoolInner<M>
where
Expand Down Expand Up @@ -157,14 +156,11 @@ where
}
}

/// Returns statistics about the historical usage of the pool.
pub(crate) fn statistics(&self) -> Statistics {
(&(self.inner.statistics)).into()
}

/// Returns information about the current state of the pool.
pub(crate) fn state(&self) -> State {
(&*self.inner.internals.lock()).into()
let mut state: State = (&*self.inner.internals.lock()).into();
state.statistics = (&(self.inner.statistics)).into();
state
}

// Outside of Pool to avoid borrow splitting issues on self
Expand Down Expand Up @@ -260,26 +256,3 @@ impl<M: ManageConnection> Reaper<M> {
}
}
}

/// Statistics about the historical usage of the `Pool`.
#[derive(Debug)]
#[non_exhaustive]
pub struct Statistics {
/// Information about gets.
/// Total gets performed that did not have to wait for a connection.
pub get_direct: u64,
/// Total gets performed that had to wait for a connection available.
pub get_waited: u64,
/// Total gets performed that timed out while waiting for a connection.
pub get_timed_out: u64,
}

impl From<&AtomicStatistics> for Statistics {
fn from(item: &AtomicStatistics) -> Self {
Statistics {
get_direct: item.get_direct.load(Ordering::SeqCst),
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
}
}
}
13 changes: 12 additions & 1 deletion bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Instant;
use crate::{api::QueueStrategy, lock::Mutex};
use tokio::sync::Notify;

use crate::api::{Builder, ManageConnection, State};
use crate::api::{Builder, ManageConnection, State, Statistics};
use std::collections::VecDeque;

/// The guts of a `Pool`.
Expand Down Expand Up @@ -164,6 +164,7 @@ impl<M: ManageConnection> Into<State> for &PoolInternals<M> {
State {
connections: self.num_conns,
idle_connections: self.conns.len() as u32,
statistics: Statistics::default(),
}
}
}
Expand Down Expand Up @@ -265,6 +266,16 @@ pub(crate) struct AtomicStatistics {
pub(crate) get_timed_out: AtomicU64,
}

impl From<&AtomicStatistics> for Statistics {
fn from(item: &AtomicStatistics) -> Self {
Statistics {
get_direct: item.get_direct.load(Ordering::SeqCst),
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
}
}
}

impl AtomicStatistics {
pub(crate) fn record_get(&self, get_kind: GetKind) {
match get_kind {
Expand Down
4 changes: 2 additions & 2 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ async fn test_get_timeout() {
ready(r).await.unwrap();

// check that the timed out was tracked
let statistics = pool.statistics();
let statistics = pool.state().statistics;
assert_eq!(statistics.get_timed_out, 1);
}

Expand Down Expand Up @@ -925,7 +925,7 @@ async fn test_statistics_get_waited() {
// Wait for the second attempt to get a connection.
f.await.unwrap();

let statistics = pool.statistics();
let statistics = pool.state().statistics;
assert_eq!(statistics.get_direct, 1);
assert_eq!(statistics.get_waited, 1);
}

0 comments on commit 9f89588

Please sign in to comment.