Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds new statistics for tracking connections created and closed #202

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ pub struct Statistics {
pub get_waited: u64,
/// Total gets performed that timed out while waiting for a connection.
pub get_timed_out: u64,
/// Total time accumulated waiting for a connection.
pub get_waited_time: Duration,
/// Total connections created.
pub connections_created: u64,
/// Total connections that were closed due to be in broken state.
pub connections_broken_closed: u64,
/// Total connections that were closed due to be considered invalid.
pub connections_invalid_closed: u64,
/// Total connections that were closed because they reached the max
/// lifetime.
pub connections_max_lifetime_closed: u64,
/// Total connections that were closed because they reached the max
/// idle timeout.
pub connections_max_idle_timeout_closed: u64,
}

/// A builder for a connection pool.
Expand Down
31 changes: 26 additions & 5 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ where
}

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let mut kind = StatsKind::Direct;
let mut get_kind = StatsKind::GetDirect;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't rename this get_kind, please.

let mut wait_time_start = None;

let future = async {
loop {
Expand All @@ -98,7 +99,8 @@ where
let mut conn = match conn {
Some(conn) => PooledConnection::new(self, conn),
None => {
kind = StatsKind::Waited;
wait_time_start = Some(Instant::now());
get_kind = StatsKind::GetWaited;
self.inner.notify.notified().await;
continue;
}
Expand All @@ -111,6 +113,9 @@ where
match self.inner.manager.is_valid(&mut conn).await {
Ok(()) => return Ok(conn),
Err(e) => {
self.inner
.statistics
.record(StatsKind::ConnectionsInvalidClosed, 1);
self.inner.forward_error(e);
conn.state = ConnectionState::Invalid;
continue;
Expand All @@ -122,12 +127,20 @@ where
let result = match timeout(self.inner.statics.connection_timeout, future).await {
Ok(result) => result,
_ => {
kind = StatsKind::TimedOut;
get_kind = StatsKind::GetTimedOut;
Err(RunError::TimedOut)
}
};

self.inner.statistics.record(kind);
self.inner.statistics.record(get_kind, 1);

if let Some(wait_time_start) = wait_time_start {
let wait_time = Instant::now() - wait_time_start;
self.inner
.statistics
.record(StatsKind::GetWaitedTime, wait_time.as_micros() as u64);
}
Comment on lines +135 to +142
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wrap this up in a method on AtomicStatistics called record_get().


result
}

Expand All @@ -147,7 +160,12 @@ where
let mut locked = self.inner.internals.lock();
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
(_, _) => {
(_, is_broken) => {
if is_broken {
self.inner
.statistics
.record(StatsKind::ConnectionsBrokenClosed, 1);
}
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
self.inner.notify.notify_waiters();
Expand Down Expand Up @@ -190,6 +208,9 @@ where
.internals
.lock()
.put(conn, Some(approval), self.inner.clone());
self.inner
.statistics
.record(StatsKind::ConnectionsCreated, 1);
return Ok(());
}
Err(e) => {
Expand Down
90 changes: 77 additions & 13 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::cmp::min;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use tokio::sync::Notify;

Expand Down Expand Up @@ -49,7 +49,17 @@ where

pub(crate) fn reap(&self) -> ApprovalIter {
let mut locked = self.internals.lock();
locked.reap(&self.statics)
let (iter, max_idle_timeout_closed, max_lifetime_closed) = locked.reap(&self.statics);
drop(locked);

self.statistics
.record(StatsKind::ConnectionsMaxLifeTimeClosed, max_lifetime_closed);
self.statistics.record(
StatsKind::ConnectionsMaxIdleTimeoutClosed,
max_idle_timeout_closed,
);
Comment on lines +55 to +60
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wrap this in a separate method on AtomicStatistics, too, and keep all of the reaping-related changes in a separate commit or PR.


iter
}

pub(crate) fn forward_error(&self, err: M::Error) {
Expand Down Expand Up @@ -139,22 +149,34 @@ where
ApprovalIter { num: num as usize }
}

pub(crate) fn reap(&mut self, config: &Builder<M>) -> ApprovalIter {
pub(crate) fn reap(&mut self, config: &Builder<M>) -> (ApprovalIter, u64, u64) {
let mut max_lifetime_closed = 0;
let mut max_idle_timeout_closed = 0;
Comment on lines +153 to +154
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call these closed_max_lifetime and closed_idle_timeout.

let now = Instant::now();
let before = self.conns.len();

self.conns.retain(|conn| {
let mut keep = true;
if let Some(timeout) = config.idle_timeout {
keep &= now - conn.idle_start < timeout;
if now - conn.idle_start >= timeout {
max_idle_timeout_closed += 1;
keep &= false;
}
}
if let Some(lifetime) = config.max_lifetime {
keep &= now - conn.conn.birth < lifetime;
if now - conn.conn.birth >= lifetime {
max_lifetime_closed += 1;
keep &= false;
}
}
keep
});

self.dropped((before - self.conns.len()) as u32, config)
(
self.dropped((before - self.conns.len()) as u32, config),
max_idle_timeout_closed,
max_lifetime_closed,
)
}

pub(crate) fn state(&self, statistics: Statistics) -> State {
Expand Down Expand Up @@ -255,14 +277,38 @@ pub(crate) struct AtomicStatistics {
pub(crate) get_direct: AtomicU64,
pub(crate) get_waited: AtomicU64,
pub(crate) get_timed_out: AtomicU64,
pub(crate) get_waited_time_micros: AtomicU64,
pub(crate) connections_created: AtomicU64,
pub(crate) connections_broken_closed: AtomicU64,
pub(crate) connections_invalid_closed: AtomicU64,
pub(crate) connections_max_lifetime_closed: AtomicU64,
pub(crate) connections_max_idle_timeout_closed: AtomicU64,
Comment on lines +281 to +285
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's skip the connections_ prefixes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all of them? at least at first sight the connections_created, connections_broken_closed, connections_broken_closed and connections_invalid_closed without the connections_ prefix will seem a bit off.

Also it follows the same practice for the get_ Kind, where we prefixed it and IMO helps to the user to understand the scope of them, same would happen for connections_

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let's keep connections_ but I'd like closed moved directly after connections_ and rename max_idle_timeout to idle_timeout.

}

impl AtomicStatistics {
pub(crate) fn record(&self, kind: StatsKind) {
pub(crate) fn record(&self, kind: StatsKind, value: u64) {
match kind {
StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst),
StatsKind::GetDirect => self.get_direct.fetch_add(value, Ordering::SeqCst),
StatsKind::GetWaited => self.get_waited.fetch_add(value, Ordering::SeqCst),
StatsKind::GetTimedOut => self.get_timed_out.fetch_add(value, Ordering::SeqCst),
StatsKind::GetWaitedTime => self
.get_waited_time_micros
.fetch_add(value, Ordering::SeqCst),
StatsKind::ConnectionsCreated => {
self.connections_created.fetch_add(value, Ordering::SeqCst)
}
StatsKind::ConnectionsBrokenClosed => self
.connections_broken_closed
.fetch_add(value, Ordering::SeqCst),
StatsKind::ConnectionsInvalidClosed => self
.connections_invalid_closed
.fetch_add(value, Ordering::SeqCst),
StatsKind::ConnectionsMaxLifeTimeClosed => self
.connections_max_lifetime_closed
.fetch_add(value, Ordering::SeqCst),
StatsKind::ConnectionsMaxIdleTimeoutClosed => self
.connections_max_idle_timeout_closed
.fetch_add(value, Ordering::SeqCst),
};
}
}
Expand All @@ -273,12 +319,30 @@ impl From<&AtomicStatistics> for Statistics {
get_direct: item.get_direct.load(Ordering::SeqCst),
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
get_waited_time: Duration::from_micros(
item.get_waited_time_micros.load(Ordering::SeqCst),
),
connections_created: item.connections_created.load(Ordering::SeqCst),
connections_broken_closed: item.connections_broken_closed.load(Ordering::SeqCst),
connections_invalid_closed: item.connections_invalid_closed.load(Ordering::SeqCst),
connections_max_lifetime_closed: item
.connections_max_lifetime_closed
.load(Ordering::SeqCst),
connections_max_idle_timeout_closed: item
.connections_max_idle_timeout_closed
.load(Ordering::SeqCst),
}
}
}

pub(crate) enum StatsKind {
Direct,
Waited,
TimedOut,
GetDirect,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want to split this into StatsGet { Waited, Direct, TimedOut }.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djc just for having an agreement on the new kinds and record operations that we would have within AtomicStatistics, we can go for this

pub(crate) enum StatsGet {
    Waited,
    Direct,
    TimedOut,
}

pub(crate) enum StatsConnectionClosed {
    Broken,
    Invalid,
}

Note: we should not need more stats kind since the used record_* methods for stats like the ones coming from the reap function will just call an ad-hoc function like record_connections_reaped

And then we would have the following record_* methods within the AtomicStatistics:

  • record_get (takes the StatsGet parameter)
  • record_connection_closed (takes the StatsConnectionClosed parameter)
  • record_connections_reaped
  • record_get_wait_time
  • record_connection_created

Let me know if this works for you and i can create a new PR with separated commits for each group, starting with the rename of the record to record_get

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as suggested in my previous message we should reuse a simple record() with StatsKind for both connection_closed and connection_created, and record_get() should also handle get_wait_time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oks, let me provide a new draft - sorry for the back and forth here :

pub(crate) enum StatsGet {
    Waited,
    Direct,
    TimedOut,
    WaitTime
}

pub(crate) enum StatsConnectionClosed {
    Broken,
    Invalid,
}

And following record methods

  • record which would take a StatsConnectionClosed kind type
  • record_get for recording any stats related to get including wait time
  • record_connections_reaped for recording the reaped connections either idle timeout or max lifetime

How does it sound? is this aligned with your expectations?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Let's do one PR at a time, please implement the first chunk and I'll give feedback. Please closely read my feedback from this PR because you've still missed some of it.

GetWaited,
GetTimedOut,
GetWaitedTime,
ConnectionsCreated,
ConnectionsBrokenClosed,
ConnectionsInvalidClosed,
ConnectionsMaxLifeTimeClosed,
ConnectionsMaxIdleTimeoutClosed,
}
91 changes: 91 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ async fn test_drop_on_broken() {
}

assert!(DROPPED.load(Ordering::SeqCst));

assert_eq!(pool.state().statistics.connections_broken_closed, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -453,6 +455,71 @@ async fn test_now_invalid() {
// Now try to get a new connection.
let r = pool.get().await;
assert!(r.is_err());

// both connections in the pool were considered invalid
assert_eq!(pool.state().statistics.connections_invalid_closed, 2);
}

#[tokio::test]
async fn test_idle_timeout() {
static DROPPED: AtomicUsize = AtomicUsize::new(0);

#[derive(Default)]
struct Connection;
impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

let manager = NthConnectionFailManager::<Connection>::new(5);
let pool = Pool::builder()
.idle_timeout(Some(Duration::from_secs(1)))
.connection_timeout(Duration::from_secs(1))
.reaper_rate(Duration::from_secs(1))
.max_size(5)
.min_idle(Some(5))
.build(manager)
.await
.unwrap();

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let conn = clone.get().await.unwrap();
tx1.send(()).unwrap();
// NB: If we sleep here we'll block this thread's event loop, and the
// reaper can't run.
let _ = rx2
.map(|r| match r {
Ok(v) => Ok((v, conn)),
Err(_) => Err((Error, conn)),
})
.await;
});

rx1.await.unwrap();

// And wait.
assert!(timeout(Duration::from_secs(2), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 4);

tx2.send(()).unwrap();

// And wait some more.
assert!(timeout(Duration::from_secs(3), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 idle connections were closed due to max idle time
assert_eq!(
pool.state().statistics.connections_max_idle_timeout_closed,
5
);
}

#[tokio::test]
Expand Down Expand Up @@ -509,6 +576,9 @@ async fn test_max_lifetime() {
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 connections were closed due to max lifetime
assert_eq!(pool.state().statistics.connections_max_lifetime_closed, 5);
}

#[tokio::test]
Expand Down Expand Up @@ -924,4 +994,25 @@ async fn test_state_get_contention() {
let statistics = pool.state().statistics;
assert_eq!(statistics.get_direct, 1);
assert_eq!(statistics.get_waited, 1);
assert!(statistics.get_waited_time > Duration::from_micros(0));
}

#[tokio::test]
async fn test_statistics_connections_created() {
let pool = Pool::builder()
.max_size(1)
.min_idle(1)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();
let (tx1, rx1) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let _ = clone.get().await.unwrap();
tx1.send(()).unwrap();
});
// wait until finished.
rx1.await.unwrap();

assert_eq!(pool.state().statistics.connections_created, 1);
}
Loading