Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query percentile support to stats #1112

Merged
merged 4 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ aes = { version = "0.8.3", optional = true }
cbc = { version = "0.1.2", optional = true }
zerocopy = { version = "0.7.28", features = ["derive", "alloc"] }
hashbrown = { version = "0.14.3", features = ["serde"] }
hdrhistogram = "7.5.4"

[dev-dependencies]
arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] }
Expand Down
70 changes: 54 additions & 16 deletions libsql-server/src/http/admin/stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use hdrhistogram::Histogram;
use itertools::Itertools;
use serde::Serialize;

Expand All @@ -26,7 +28,7 @@ pub struct StatsResponse {
pub embedded_replica_frames_replicated: u64,
pub query_count: u64,
pub elapsed_ms: u64,
pub queries: QueriesStatsResponse,
pub queries: Option<QueriesStatsResponse>,
}

impl From<&Stats> for StatsResponse {
Expand Down Expand Up @@ -55,7 +57,7 @@ impl From<&Stats> for StatsResponse {
.iter()
.cloned()
.collect(),
queries: QueriesStatsResponse::from(stats),
queries: stats.into(),
}
}
}
Expand All @@ -66,26 +68,62 @@ impl From<Stats> for StatsResponse {
}
}

#[derive(Serialize)]
#[derive(Serialize, Default)]
pub struct QueriesLatencyStats {
pub sum: u64,
pub p50: u64,
pub p75: u64,
pub p90: u64,
pub p95: u64,
pub p99: u64,
pub p999: u64,
}

impl QueriesLatencyStats {
fn from(hist: &Histogram<u32>, sum: &Duration) -> Self {
QueriesLatencyStats {
sum: sum.as_millis() as u64,
p50: hist.value_at_percentile(50.0),
p75: hist.value_at_percentile(75.0),
p90: hist.value_at_percentile(90.0),
p95: hist.value_at_percentile(95.0),
p99: hist.value_at_percentile(99.0),
p999: hist.value_at_percentile(99.9),
}
}
}

#[derive(Serialize, Default)]
pub struct QueriesStatsResponse {
pub id: Option<Uuid>,
pub id: Uuid,
pub created_at: u64,
pub count: u64,
pub stats: Vec<QueryAndStats>,
pub elapsed: QueriesLatencyStats,
}

impl From<&Stats> for QueriesStatsResponse {
impl From<&Stats> for Option<QueriesStatsResponse> {
fn from(stats: &Stats) -> Self {
let queries = stats.get_queries().read().unwrap();
Self {
id: queries.id(),
stats: queries
.stats()
.iter()
.map(|(k, v)| QueryAndStats {
query: k.clone(),
elapsed_ms: v.elapsed.as_millis() as u64,
stat: v.clone(),
})
.collect_vec(),
if queries.as_ref().map_or(true, |q| q.expired()) {
Self::default()
} else {
let queries = queries.as_ref().unwrap();
Some(QueriesStatsResponse {
id: queries.id(),
created_at: queries.created_at().timestamp() as u64,
count: queries.count(),
elapsed: QueriesLatencyStats::from(queries.hist(), &queries.elapsed()),
stats: queries
.stats()
.iter()
.map(|(k, v)| QueryAndStats {
query: k.clone(),
elapsed_ms: v.elapsed.as_millis() as u64,
stat: v.clone(),
})
.collect_vec(),
})
}
}
}
Expand Down
88 changes: 67 additions & 21 deletions libsql-server/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock, Weak};

use chrono::{DateTime, DurationRound, Utc};
use hdrhistogram::Histogram;
use metrics::{counter, gauge, histogram, increment_counter};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tokio::time::Duration;
use tracing::debug;
use tokio::time::{Duration, Instant};
use uuid::Uuid;

use crate::namespace::NamespaceName;
Expand Down Expand Up @@ -82,31 +83,46 @@ impl QueryStats {
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug)]
pub struct QueriesStats {
#[serde(default)]
id: Option<Uuid>,
#[serde(default)]
stats_threshold: AtomicU64,
#[serde(default)]
id: Uuid,
stats: HashMap<String, QueryStats>,
elapsed: Duration,
stats_threshold: u64,
hist: Histogram<u32>,
expires_at: Option<Instant>,
created_at: DateTime<Utc>,
}

impl QueriesStats {
fn new() -> Arc<RwLock<Self>> {
let mut this = QueriesStats::default();
this.id = Some(Uuid::new_v4());
Arc::new(RwLock::new(this))
fn new() -> Self {
Self {
id: Uuid::new_v4(),
stats: HashMap::new(),
elapsed: Duration::default(),
stats_threshold: 0,
hist: Histogram::<u32>::new(3).unwrap(),
expires_at: None,
created_at: Utc::now(),
}
}

fn with_expiration(expires_at: Instant) -> Self {
let mut this = Self::new();
this.expires_at = Some(expires_at);
this
}

fn register_query(&mut self, sql: &String, stat: QueryStats) {
self.elapsed += stat.elapsed;
let _ = self.hist.record(stat.elapsed.as_millis() as u64);

let (aggregated, new) = match self.stats.get(sql) {
Some(aggregated) => (aggregated.merge(&stat), false),
None => (stat, true),
};

debug!("query: {}, elapsed: {:?}", sql, aggregated.elapsed);
if (aggregated.elapsed.as_micros() as u64) < self.stats_threshold.load(Ordering::Relaxed) {
if (aggregated.elapsed.as_micros() as u64) < self.stats_threshold {
return;
}

Expand All @@ -131,8 +147,7 @@ impl QueriesStats {

fn update_threshold(&mut self) {
if let Some((_, v)) = self.min() {
self.stats_threshold
.store(v.elapsed.as_micros() as u64, Ordering::Relaxed);
self.stats_threshold = v.elapsed.as_micros() as u64;
}
}

Expand All @@ -143,13 +158,33 @@ impl QueriesStats {
}
}

pub(crate) fn id(&self) -> Option<Uuid> {
pub(crate) fn id(&self) -> Uuid {
self.id
}

pub(crate) fn stats(&self) -> &HashMap<String, QueryStats> {
&self.stats
}

pub(crate) fn hist(&self) -> &Histogram<u32> {
&self.hist
}

pub(crate) fn expired(&self) -> bool {
self.expires_at.map_or(false, |exp| exp < Instant::now())
}

pub(crate) fn elapsed(&self) -> &Duration {
&self.elapsed
}

pub(crate) fn count(&self) -> u64 {
self.hist.len() as u64
}

pub(crate) fn created_at(&self) -> &DateTime<Utc> {
&self.created_at
}
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -198,7 +233,7 @@ pub struct Stats {
#[serde(default)]
query_latency: AtomicU64,
#[serde(skip)]
queries: Arc<RwLock<QueriesStats>>,
queries: Arc<RwLock<Option<QueriesStats>>>,
}

impl Stats {
Expand All @@ -221,7 +256,6 @@ impl Stats {

let (update_sender, update_receiver) = mpsc::channel(256);

this.queries = QueriesStats::new();
this.namespace = namespace;
this.sender = Some(update_sender);

Expand Down Expand Up @@ -380,12 +414,16 @@ impl Stats {
self.query_latency.load(Ordering::Relaxed)
}

pub(crate) fn get_queries(&self) -> &Arc<RwLock<QueriesStats>> {
pub(crate) fn get_queries(&self) -> &Arc<RwLock<Option<QueriesStats>>> {
&self.queries
}

fn register_query(&self, sql: &String, stat: QueryStats) {
self.queries.write().unwrap().register_query(sql, stat)
let mut queries = self.queries.write().unwrap();
if queries.as_ref().map_or(true, |q| q.expired()) {
*queries = Some(QueriesStats::with_expiration(next_hour()))
}
queries.as_mut().unwrap().register_query(sql, stat)
}

fn add_top_query(&self, query: TopQuery) {
Expand Down Expand Up @@ -492,3 +530,11 @@ async fn try_persist_stats(stats: Weak<Stats>, path: &Path) -> anyhow::Result<()
tokio::fs::rename(temp_path, path).await?;
Ok(())
}

fn next_hour() -> Instant {
let utc_now = Utc::now();
let next_hour = (utc_now + chrono::Duration::hours(1))
.duration_trunc(chrono::Duration::hours(1))
.unwrap();
Instant::now() + (next_hour - utc_now).to_std().unwrap()
}
Loading