From 647d015ddb86a201ac22a165eae5ef187651263b Mon Sep 17 00:00:00 2001 From: Pau Freixes Date: Mon, 10 Jun 2024 16:53:16 +0200 Subject: [PATCH 1/2] Add new get wait time statistic The new field `get_waited_time` for the `Statistics` type allows users to know the accumulated time for successive calls to the `get` method while waiting for a free connection. --- bb8/src/api.rs | 2 ++ bb8/src/inner.rs | 6 ++++++ bb8/src/internals.rs | 11 ++++++++++- bb8/tests/test.rs | 1 + 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/bb8/src/api.rs b/bb8/src/api.rs index 004790d..eacadd4 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -99,6 +99,8 @@ pub struct Statistics { pub get_waited: u64, /// Total gets performed that timed out while waiting for a connection. pub get_timed_out: u64, + /// Total time accumulated waiting for a connection. + pub get_waited_time: Duration, } /// A builder for a connection pool. diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 91b43b0..d128ea5 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -86,6 +86,7 @@ where pub(crate) async fn get(&self) -> Result, RunError> { let mut kind = StatsKind::Direct; + let mut wait_time_start = None; let future = async { loop { @@ -98,6 +99,7 @@ where let mut conn = match conn { Some(conn) => PooledConnection::new(self, conn), None => { + wait_time_start = Some(Instant::now()); kind = StatsKind::Waited; self.inner.notify.notified().await; continue; @@ -128,6 +130,10 @@ where }; self.inner.statistics.record(kind); + if let Some(wait_time_start) = wait_time_start { + let wait_time = Instant::now() - wait_time_start; + self.inner.statistics.record_get(wait_time); + } result } diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 705669a..f495a16 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -2,7 +2,7 @@ use std::cmp::min; use std::collections::VecDeque; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::sync::Notify; @@ -255,6 +255,7 @@ pub(crate) struct AtomicStatistics { pub(crate) get_direct: AtomicU64, pub(crate) get_waited: AtomicU64, pub(crate) get_timed_out: AtomicU64, + pub(crate) get_waited_time_micros: AtomicU64, } impl AtomicStatistics { @@ -265,6 +266,11 @@ impl AtomicStatistics { StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst), }; } + + pub(crate) fn record_get(&self, wait_time: Duration) { + self.get_waited_time_micros + .fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst); + } } impl From<&AtomicStatistics> for Statistics { @@ -273,6 +279,9 @@ impl From<&AtomicStatistics> for Statistics { get_direct: item.get_direct.load(Ordering::SeqCst), get_waited: item.get_waited.load(Ordering::SeqCst), get_timed_out: item.get_timed_out.load(Ordering::SeqCst), + get_waited_time: Duration::from_micros( + item.get_waited_time_micros.load(Ordering::SeqCst), + ), } } } diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index ee32cad..4339ac9 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -924,4 +924,5 @@ async fn test_state_get_contention() { let statistics = pool.state().statistics; assert_eq!(statistics.get_direct, 1); assert_eq!(statistics.get_waited, 1); + assert!(statistics.get_waited_time > Duration::from_micros(0)); } From 922272024791b4242f52f77a44fd334b2a583817 Mon Sep 17 00:00:00 2001 From: Pau Freixes Date: Mon, 10 Jun 2024 18:00:35 +0200 Subject: [PATCH 2/2] Unify record_get method --- bb8/src/api.rs | 2 +- bb8/src/inner.rs | 6 +----- bb8/src/internals.rs | 17 ++++++++--------- bb8/tests/test.rs | 2 +- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/bb8/src/api.rs b/bb8/src/api.rs index eacadd4..649103b 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -100,7 +100,7 @@ pub struct Statistics { /// Total gets performed that timed out while waiting for a connection. pub get_timed_out: u64, /// Total time accumulated waiting for a connection. - pub get_waited_time: Duration, + pub get_wait_time: Duration, } /// A builder for a connection pool. diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index d128ea5..3c17e25 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -129,11 +129,7 @@ where } }; - self.inner.statistics.record(kind); - if let Some(wait_time_start) = wait_time_start { - let wait_time = Instant::now() - wait_time_start; - self.inner.statistics.record_get(wait_time); - } + self.inner.statistics.record_get(kind, wait_time_start); result } diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index f495a16..95266c5 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -255,21 +255,22 @@ pub(crate) struct AtomicStatistics { pub(crate) get_direct: AtomicU64, pub(crate) get_waited: AtomicU64, pub(crate) get_timed_out: AtomicU64, - pub(crate) get_waited_time_micros: AtomicU64, + pub(crate) get_wait_time_micros: AtomicU64, } impl AtomicStatistics { - pub(crate) fn record(&self, kind: StatsKind) { + pub(crate) fn record_get(&self, kind: StatsKind, wait_time_start: Option) { match kind { StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst), StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst), StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst), }; - } - pub(crate) fn record_get(&self, wait_time: Duration) { - self.get_waited_time_micros - .fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst); + if let Some(wait_time_start) = wait_time_start { + let wait_time = Instant::now() - wait_time_start; + self.get_wait_time_micros + .fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst); + } } } @@ -279,9 +280,7 @@ impl From<&AtomicStatistics> for Statistics { get_direct: item.get_direct.load(Ordering::SeqCst), get_waited: item.get_waited.load(Ordering::SeqCst), get_timed_out: item.get_timed_out.load(Ordering::SeqCst), - get_waited_time: Duration::from_micros( - item.get_waited_time_micros.load(Ordering::SeqCst), - ), + get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)), } } } diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 4339ac9..5219a1c 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -924,5 +924,5 @@ async fn test_state_get_contention() { let statistics = pool.state().statistics; assert_eq!(statistics.get_direct, 1); assert_eq!(statistics.get_waited, 1); - assert!(statistics.get_waited_time > Duration::from_micros(0)); + assert!(statistics.get_wait_time > Duration::from_micros(0)); }