Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds new statistics for tracking connections created and closed #202

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ pub struct Statistics {
pub get_waited: u64,
/// Total gets performed that timed out while waiting for a connection.
pub get_timed_out: u64,
/// Total time accumulated waiting for a connection.
pub get_waited_time_micro: u64,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a std::time::Duration, and lose the _micro suffix.

/// Total connections created.
pub connections_created: u64,
/// Total connections that were closed due to be in broken state.
pub connections_broken_closed: u64,
/// Total connections that were closed due to be considered invalid.
pub connections_invalid_closed: u64,
/// Total connections that were closed because they reached the max
/// lifetime.
pub connections_max_lifetime_closed: u64,
/// Total connections that were closed because they reached the max
/// idle timeout.
pub connections_max_idle_timeout_closed: u64,
}

/// A builder for a connection pool.
Expand Down
30 changes: 28 additions & 2 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::{max, min};
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -86,6 +87,7 @@ where

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let mut kind = StatsKind::Direct;
let mut wait_time_start = None;

let future = async {
loop {
Expand All @@ -98,6 +100,7 @@ where
let mut conn = match conn {
Some(conn) => PooledConnection::new(self, conn),
None => {
wait_time_start = Some(Instant::now());
kind = StatsKind::Waited;
self.inner.notify.notified().await;
continue;
Expand All @@ -111,6 +114,10 @@ where
match self.inner.manager.is_valid(&mut conn).await {
Ok(()) => return Ok(conn),
Err(e) => {
self.inner
.statistics
.connections_invalid_closed
.fetch_add(1, Ordering::SeqCst);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just make this another StatsKind, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jaja, good point. Ill try to do that!

self.inner.forward_error(e);
conn.state = ConnectionState::Invalid;
continue;
Expand All @@ -127,7 +134,16 @@ where
}
};

self.inner.statistics.record(kind);
self.inner.statistics.record_get(kind);

if let Some(wait_time_start) = wait_time_start {
let wait_time = Instant::now() - wait_time_start;
self.inner
.statistics
.get_waited_time_micro
.fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why micros vs millis or nanos?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that micros would provide enough detail (nanos might be just noise) while we would keep the also the u64 large enough for storing many many many micros.

In any case since we move this to a Duration we will be storing behind the scenes IIRC micros using u128.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oks now I remembered, everything I wanted to use an Atomic Type. I guess that we can try to use an u128 internally and return a Duration (which IIRC is just a u128). If this works we could just forget about the discussion between nanos and micros.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally I went for using micros with u64 for converting later to Duration. If you want to use nanos, i can do that. But got the feeling that nanosecond resolution for this specific use case would not provide a lot of value (indeed micro most likely will not neither)

Ive just kept the type to u64 just for having all atomics using the same type and helping me to keep the record interface simple.

If you want to to go for nanos and/or u128 let me know.

}

result
}

Expand All @@ -147,7 +163,13 @@ where
let mut locked = self.inner.internals.lock();
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
(_, _) => {
(_, is_broken) => {
if is_broken {
self.inner
.statistics
.connections_broken_closed
.fetch_add(1, Ordering::SeqCst);
}
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
self.inner.notify.notify_waiters();
Expand Down Expand Up @@ -190,6 +212,10 @@ where
.internals
.lock()
.put(conn, Some(approval), self.inner.clone());
self.inner
.statistics
.connections_created
.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
Err(e) => {
Expand Down
48 changes: 42 additions & 6 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ where

pub(crate) fn reap(&self) -> ApprovalIter {
let mut locked = self.internals.lock();
locked.reap(&self.statics)
let (iter, max_idle_timeout_closed, max_lifetime_closed) = locked.reap(&self.statics);
drop(locked);
self.statistics
.connections_max_idle_timeout_closed
.fetch_add(max_idle_timeout_closed, Ordering::SeqCst);
self.statistics
.connections_max_lifetime_closed
.fetch_add(max_lifetime_closed, Ordering::SeqCst);
iter
}

pub(crate) fn forward_error(&self, err: M::Error) {
Expand Down Expand Up @@ -139,22 +147,34 @@ where
ApprovalIter { num: num as usize }
}

pub(crate) fn reap(&mut self, config: &Builder<M>) -> ApprovalIter {
pub(crate) fn reap(&mut self, config: &Builder<M>) -> (ApprovalIter, u64, u64) {
let mut max_lifetime_closed: u64 = 0;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid type annotations where possible.

let mut max_idle_timeout_closed: u64 = 0;
let now = Instant::now();
let before = self.conns.len();

self.conns.retain(|conn| {
let mut keep = true;
if let Some(timeout) = config.idle_timeout {
keep &= now - conn.idle_start < timeout;
if now - conn.idle_start >= timeout {
max_idle_timeout_closed += 1;
keep &= false;
}
}
if let Some(lifetime) = config.max_lifetime {
keep &= now - conn.conn.birth < lifetime;
if now - conn.conn.birth >= lifetime {
max_lifetime_closed += 1;
keep &= false;
}
}
keep
});

self.dropped((before - self.conns.len()) as u32, config)
(
self.dropped((before - self.conns.len()) as u32, config),
max_idle_timeout_closed,
max_lifetime_closed,
)
}

pub(crate) fn state(&self, statistics: Statistics) -> State {
Expand Down Expand Up @@ -255,10 +275,16 @@ pub(crate) struct AtomicStatistics {
pub(crate) get_direct: AtomicU64,
pub(crate) get_waited: AtomicU64,
pub(crate) get_timed_out: AtomicU64,
pub(crate) get_waited_time_micro: AtomicU64,
pub(crate) connections_created: AtomicU64,
pub(crate) connections_broken_closed: AtomicU64,
pub(crate) connections_invalid_closed: AtomicU64,
pub(crate) connections_max_lifetime_closed: AtomicU64,
pub(crate) connections_max_idle_timeout_closed: AtomicU64,
Comment on lines +281 to +285
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's skip the connections_ prefixes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all of them? at least at first sight the connections_created, connections_broken_closed, connections_broken_closed and connections_invalid_closed without the connections_ prefix will seem a bit off.

Also it follows the same practice for the get_ Kind, where we prefixed it and IMO helps to the user to understand the scope of them, same would happen for connections_

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let's keep connections_ but I'd like closed moved directly after connections_ and rename max_idle_timeout to idle_timeout.

}

impl AtomicStatistics {
pub(crate) fn record(&self, kind: StatsKind) {
pub(crate) fn record_get(&self, kind: StatsKind) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we can use it for incrementing other (non-get) counters, prefer to revert this.

match kind {
StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst),
StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst),
Expand All @@ -273,6 +299,16 @@ impl From<&AtomicStatistics> for Statistics {
get_direct: item.get_direct.load(Ordering::SeqCst),
get_waited: item.get_waited.load(Ordering::SeqCst),
get_timed_out: item.get_timed_out.load(Ordering::SeqCst),
get_waited_time_micro: item.get_waited_time_micro.load(Ordering::SeqCst),
connections_created: item.connections_created.load(Ordering::SeqCst),
connections_broken_closed: item.connections_broken_closed.load(Ordering::SeqCst),
connections_invalid_closed: item.connections_invalid_closed.load(Ordering::SeqCst),
connections_max_lifetime_closed: item
.connections_max_lifetime_closed
.load(Ordering::SeqCst),
connections_max_idle_timeout_closed: item
.connections_max_idle_timeout_closed
.load(Ordering::SeqCst),
}
}
}
Expand Down
91 changes: 91 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ async fn test_drop_on_broken() {
}

assert!(DROPPED.load(Ordering::SeqCst));

assert_eq!(pool.state().statistics.connections_broken_closed, 1);
}

#[tokio::test]
Expand Down Expand Up @@ -453,6 +455,71 @@ async fn test_now_invalid() {
// Now try to get a new connection.
let r = pool.get().await;
assert!(r.is_err());

// both connections in the pool were considered invalid
assert_eq!(pool.state().statistics.connections_invalid_closed, 2);
}

#[tokio::test]
async fn test_idle_timeout() {
static DROPPED: AtomicUsize = AtomicUsize::new(0);

#[derive(Default)]
struct Connection;
impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

let manager = NthConnectionFailManager::<Connection>::new(5);
let pool = Pool::builder()
.idle_timeout(Some(Duration::from_secs(1)))
.connection_timeout(Duration::from_secs(1))
.reaper_rate(Duration::from_secs(1))
.max_size(5)
.min_idle(Some(5))
.build(manager)
.await
.unwrap();

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let conn = clone.get().await.unwrap();
tx1.send(()).unwrap();
// NB: If we sleep here we'll block this thread's event loop, and the
// reaper can't run.
let _ = rx2
.map(|r| match r {
Ok(v) => Ok((v, conn)),
Err(_) => Err((Error, conn)),
})
.await;
});

rx1.await.unwrap();

// And wait.
assert!(timeout(Duration::from_secs(2), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 4);

tx2.send(()).unwrap();

// And wait some more.
assert!(timeout(Duration::from_secs(3), pending::<()>())
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 idle connections were closed due to max idle time
assert_eq!(
pool.state().statistics.connections_max_idle_timeout_closed,
5
);
}

#[tokio::test]
Expand Down Expand Up @@ -509,6 +576,9 @@ async fn test_max_lifetime() {
.await
.is_err());
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);

// all 5 connections were closed due to max lifetime
assert_eq!(pool.state().statistics.connections_max_lifetime_closed, 5);
}

#[tokio::test]
Expand Down Expand Up @@ -924,4 +994,25 @@ async fn test_state_get_contention() {
let statistics = pool.state().statistics;
assert_eq!(statistics.get_direct, 1);
assert_eq!(statistics.get_waited, 1);
assert!(statistics.get_waited_time_micro > 0);
}

#[tokio::test]
async fn test_statistics_connections_created() {
let pool = Pool::builder()
.max_size(1)
.min_idle(1)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();
let (tx1, rx1) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let _ = clone.get().await.unwrap();
tx1.send(()).unwrap();
});
// wait until finished.
rx1.await.unwrap();

assert_eq!(pool.state().statistics.connections_created, 1);
}
Loading