-
-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds new statistics for tracking connections created and closed #202
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,7 +85,8 @@ where | |
} | ||
|
||
pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> { | ||
let mut kind = StatsKind::Direct; | ||
let mut get_kind = StatsKind::GetDirect; | ||
let mut wait_time_start = None; | ||
|
||
let future = async { | ||
loop { | ||
|
@@ -98,7 +99,8 @@ where | |
let mut conn = match conn { | ||
Some(conn) => PooledConnection::new(self, conn), | ||
None => { | ||
kind = StatsKind::Waited; | ||
wait_time_start = Some(Instant::now()); | ||
get_kind = StatsKind::GetWaited; | ||
self.inner.notify.notified().await; | ||
continue; | ||
} | ||
|
@@ -111,6 +113,9 @@ where | |
match self.inner.manager.is_valid(&mut conn).await { | ||
Ok(()) => return Ok(conn), | ||
Err(e) => { | ||
self.inner | ||
.statistics | ||
.record(StatsKind::ConnectionsInvalidClosed, 1); | ||
self.inner.forward_error(e); | ||
conn.state = ConnectionState::Invalid; | ||
continue; | ||
|
@@ -122,12 +127,20 @@ where | |
let result = match timeout(self.inner.statics.connection_timeout, future).await { | ||
Ok(result) => result, | ||
_ => { | ||
kind = StatsKind::TimedOut; | ||
get_kind = StatsKind::GetTimedOut; | ||
Err(RunError::TimedOut) | ||
} | ||
}; | ||
|
||
self.inner.statistics.record(kind); | ||
self.inner.statistics.record(get_kind, 1); | ||
|
||
if let Some(wait_time_start) = wait_time_start { | ||
let wait_time = Instant::now() - wait_time_start; | ||
self.inner | ||
.statistics | ||
.record(StatsKind::GetWaitedTime, wait_time.as_micros() as u64); | ||
} | ||
Comment on lines
+135
to
+142
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's wrap this up in a method on |
||
|
||
result | ||
} | ||
|
||
|
@@ -147,7 +160,12 @@ where | |
let mut locked = self.inner.internals.lock(); | ||
match (state, self.inner.manager.has_broken(&mut conn.conn)) { | ||
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()), | ||
(_, _) => { | ||
(_, is_broken) => { | ||
if is_broken { | ||
self.inner | ||
.statistics | ||
.record(StatsKind::ConnectionsBrokenClosed, 1); | ||
} | ||
let approvals = locked.dropped(1, &self.inner.statics); | ||
self.spawn_replenishing_approvals(approvals); | ||
self.inner.notify.notify_waiters(); | ||
|
@@ -190,6 +208,9 @@ where | |
.internals | ||
.lock() | ||
.put(conn, Some(approval), self.inner.clone()); | ||
self.inner | ||
.statistics | ||
.record(StatsKind::ConnectionsCreated, 1); | ||
return Ok(()); | ||
} | ||
Err(e) => { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ use std::cmp::min; | |
use std::collections::VecDeque; | ||
use std::sync::atomic::{AtomicU64, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::Instant; | ||
use std::time::{Duration, Instant}; | ||
|
||
use tokio::sync::Notify; | ||
|
||
|
@@ -49,7 +49,17 @@ where | |
|
||
pub(crate) fn reap(&self) -> ApprovalIter { | ||
let mut locked = self.internals.lock(); | ||
locked.reap(&self.statics) | ||
let (iter, max_idle_timeout_closed, max_lifetime_closed) = locked.reap(&self.statics); | ||
drop(locked); | ||
|
||
self.statistics | ||
.record(StatsKind::ConnectionsMaxLifeTimeClosed, max_lifetime_closed); | ||
self.statistics.record( | ||
StatsKind::ConnectionsMaxIdleTimeoutClosed, | ||
max_idle_timeout_closed, | ||
); | ||
Comment on lines
+55
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's wrap this in a separate method on |
||
|
||
iter | ||
} | ||
|
||
pub(crate) fn forward_error(&self, err: M::Error) { | ||
|
@@ -139,22 +149,34 @@ where | |
ApprovalIter { num: num as usize } | ||
} | ||
|
||
pub(crate) fn reap(&mut self, config: &Builder<M>) -> ApprovalIter { | ||
pub(crate) fn reap(&mut self, config: &Builder<M>) -> (ApprovalIter, u64, u64) { | ||
let mut max_lifetime_closed = 0; | ||
let mut max_idle_timeout_closed = 0; | ||
Comment on lines
+153
to
+154
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's call these |
||
let now = Instant::now(); | ||
let before = self.conns.len(); | ||
|
||
self.conns.retain(|conn| { | ||
let mut keep = true; | ||
if let Some(timeout) = config.idle_timeout { | ||
keep &= now - conn.idle_start < timeout; | ||
if now - conn.idle_start >= timeout { | ||
max_idle_timeout_closed += 1; | ||
keep &= false; | ||
} | ||
} | ||
if let Some(lifetime) = config.max_lifetime { | ||
keep &= now - conn.conn.birth < lifetime; | ||
if now - conn.conn.birth >= lifetime { | ||
max_lifetime_closed += 1; | ||
keep &= false; | ||
} | ||
} | ||
keep | ||
}); | ||
|
||
self.dropped((before - self.conns.len()) as u32, config) | ||
( | ||
self.dropped((before - self.conns.len()) as u32, config), | ||
max_idle_timeout_closed, | ||
max_lifetime_closed, | ||
) | ||
} | ||
|
||
pub(crate) fn state(&self, statistics: Statistics) -> State { | ||
|
@@ -255,14 +277,38 @@ pub(crate) struct AtomicStatistics { | |
pub(crate) get_direct: AtomicU64, | ||
pub(crate) get_waited: AtomicU64, | ||
pub(crate) get_timed_out: AtomicU64, | ||
pub(crate) get_waited_time_micros: AtomicU64, | ||
pub(crate) connections_created: AtomicU64, | ||
pub(crate) connections_broken_closed: AtomicU64, | ||
pub(crate) connections_invalid_closed: AtomicU64, | ||
pub(crate) connections_max_lifetime_closed: AtomicU64, | ||
pub(crate) connections_max_idle_timeout_closed: AtomicU64, | ||
Comment on lines
+281
to
+285
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's skip the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For all of them? at least at first sight the Also it follows the same practice for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, let's keep |
||
} | ||
|
||
impl AtomicStatistics { | ||
pub(crate) fn record(&self, kind: StatsKind) { | ||
pub(crate) fn record(&self, kind: StatsKind, value: u64) { | ||
match kind { | ||
StatsKind::Direct => self.get_direct.fetch_add(1, Ordering::SeqCst), | ||
StatsKind::Waited => self.get_waited.fetch_add(1, Ordering::SeqCst), | ||
StatsKind::TimedOut => self.get_timed_out.fetch_add(1, Ordering::SeqCst), | ||
StatsKind::GetDirect => self.get_direct.fetch_add(value, Ordering::SeqCst), | ||
StatsKind::GetWaited => self.get_waited.fetch_add(value, Ordering::SeqCst), | ||
StatsKind::GetTimedOut => self.get_timed_out.fetch_add(value, Ordering::SeqCst), | ||
StatsKind::GetWaitedTime => self | ||
.get_waited_time_micros | ||
.fetch_add(value, Ordering::SeqCst), | ||
StatsKind::ConnectionsCreated => { | ||
self.connections_created.fetch_add(value, Ordering::SeqCst) | ||
} | ||
StatsKind::ConnectionsBrokenClosed => self | ||
.connections_broken_closed | ||
.fetch_add(value, Ordering::SeqCst), | ||
StatsKind::ConnectionsInvalidClosed => self | ||
.connections_invalid_closed | ||
.fetch_add(value, Ordering::SeqCst), | ||
StatsKind::ConnectionsMaxLifeTimeClosed => self | ||
.connections_max_lifetime_closed | ||
.fetch_add(value, Ordering::SeqCst), | ||
StatsKind::ConnectionsMaxIdleTimeoutClosed => self | ||
.connections_max_idle_timeout_closed | ||
.fetch_add(value, Ordering::SeqCst), | ||
}; | ||
} | ||
} | ||
|
@@ -273,12 +319,30 @@ impl From<&AtomicStatistics> for Statistics { | |
get_direct: item.get_direct.load(Ordering::SeqCst), | ||
get_waited: item.get_waited.load(Ordering::SeqCst), | ||
get_timed_out: item.get_timed_out.load(Ordering::SeqCst), | ||
get_waited_time: Duration::from_micros( | ||
item.get_waited_time_micros.load(Ordering::SeqCst), | ||
), | ||
connections_created: item.connections_created.load(Ordering::SeqCst), | ||
connections_broken_closed: item.connections_broken_closed.load(Ordering::SeqCst), | ||
connections_invalid_closed: item.connections_invalid_closed.load(Ordering::SeqCst), | ||
connections_max_lifetime_closed: item | ||
.connections_max_lifetime_closed | ||
.load(Ordering::SeqCst), | ||
connections_max_idle_timeout_closed: item | ||
.connections_max_idle_timeout_closed | ||
.load(Ordering::SeqCst), | ||
} | ||
} | ||
} | ||
|
||
pub(crate) enum StatsKind { | ||
Direct, | ||
Waited, | ||
TimedOut, | ||
GetDirect, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we'll want to split this into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @djc just for having an agreement on the new kinds and record operations that we would have within pub(crate) enum StatsGet {
Waited,
Direct,
TimedOut,
}
pub(crate) enum StatsConnectionClosed {
Broken,
Invalid,
} Note: we should not need more stats kind since the used And then we would have the following
Let me know if this works for you and i can create a new PR with separated commits for each group, starting with the rename of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, as suggested in my previous message we should reuse a simple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oks, let me provide a new draft - sorry for the back and forth here :
And following
How does it sound? is this aligned with your expectations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. Let's do one PR at a time, please implement the first chunk and I'll give feedback. Please closely read my feedback from this PR because you've still missed some of it. |
||
GetWaited, | ||
GetTimedOut, | ||
GetWaitedTime, | ||
ConnectionsCreated, | ||
ConnectionsBrokenClosed, | ||
ConnectionsInvalidClosed, | ||
ConnectionsMaxLifeTimeClosed, | ||
ConnectionsMaxIdleTimeoutClosed, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't rename this
get_kind
, please.