Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
stats: add storage_bytes_used (#372)
Browse files Browse the repository at this point in the history
The new metrics track how many storage bytes are used by this sqld
instance. It only tracks the main database file, under the assumption
that the most interesting metrics for users is "how large is my database
after I successfully checkpoint the write-ahead log".

Right now we don't have a separate fiber that performs checkpoints,
but that's planned. And once we have it, inspecting storage should
happen right after the checkpoint.

For now, the fiber that monitors storage used just runs
once every 15 minutes.

Fixes #340
  • Loading branch information
psarna authored May 3, 2023
1 parent a14accd commit 1ce18e4
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 8 deletions.
5 changes: 1 addition & 4 deletions sqld/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ pub async fn server_heartbeat(
let client = reqwest::Client::new();
loop {
sleep(update_period).await;
let body = StatsResponse {
rows_read_count: stats.rows_read(),
rows_written_count: stats.rows_written(),
};
let body = StatsResponse::from(&stats);
let request = client.post(&url);
let request = if let Some(ref auth) = auth {
request.header("Authorization", auth.clone())
Expand Down
22 changes: 18 additions & 4 deletions sqld/src/http/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,27 @@ use crate::stats::Stats;
pub struct StatsResponse {
pub rows_read_count: u64,
pub rows_written_count: u64,
pub storage_bytes_used: u64,
}

impl From<&Stats> for StatsResponse {
fn from(stats: &Stats) -> Self {
Self {
rows_read_count: stats.rows_read(),
rows_written_count: stats.rows_written(),
storage_bytes_used: stats.storage_bytes_used(),
}
}
}

impl From<Stats> for StatsResponse {
fn from(stats: Stats) -> Self {
(&stats).into()
}
}

pub fn handle_stats(stats: &Stats) -> Response<Body> {
let resp = StatsResponse {
rows_read_count: stats.rows_read(),
rows_written_count: stats.rows_written(),
};
let resp: StatsResponse = stats.into();

let payload = serde_json::to_vec(&resp).unwrap();
Response::builder()
Expand Down
15 changes: 15 additions & 0 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,19 @@ async fn start_primary(
Ok(())
}

// Periodically check the storage used by the database and save it in the Stats structure.
// TODO: Once we have a separate fiber that does WAL checkpoints, running this routine
// right after checkpointing is exactly where it should be done.
async fn run_storage_monitor(mut db_path: PathBuf, stats: Stats) -> anyhow::Result<()> {
let duration = tokio::time::Duration::from_secs(60 * 15);
db_path.push("data");
loop {
let attr = tokio::fs::metadata(&db_path).await;
stats.set_storage_bytes_used(attr.map_or(0, |stats| stats.len()));
tokio::time::sleep(duration).await;
}
}

pub async fn run_server(config: Config) -> anyhow::Result<()> {
tracing::trace!("Backend: {:?}", config.backend);

Expand Down Expand Up @@ -425,6 +438,8 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {

let stats = Stats::new(&config.db_path)?;

join_set.spawn(run_storage_monitor(config.db_path.clone(), stats.clone()));

match config.writer_rpc_addr {
Some(_) => start_replica(&config, &mut join_set, idle_shutdown_layer, stats).await?,
None => start_primary(&config, &mut join_set, idle_shutdown_layer, stats).await?,
Expand Down
10 changes: 10 additions & 0 deletions sqld/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct Stats {
struct StatsInner {
rows_written: AtomicU64,
rows_read: AtomicU64,
storage_bytes_used: AtomicU64,
}

impl Stats {
Expand Down Expand Up @@ -46,6 +47,10 @@ impl Stats {
self.inner.rows_read.fetch_add(n, Ordering::Relaxed);
}

pub fn set_storage_bytes_used(&self, n: u64) {
self.inner.storage_bytes_used.store(n, Ordering::Relaxed);
}

/// returns the total number of rows read since this database was created
pub fn rows_read(&self) -> u64 {
self.inner.rows_read.load(Ordering::Relaxed)
Expand All @@ -55,6 +60,11 @@ impl Stats {
pub fn rows_written(&self) -> u64 {
self.inner.rows_written.load(Ordering::Relaxed)
}

/// returns the total number of bytes used by the database (excluding uncheckpointed WAL entries)
pub fn storage_bytes_used(&self) -> u64 {
self.inner.storage_bytes_used.load(Ordering::Relaxed)
}
}

fn spawn_stats_persist_thread(stats: Arc<StatsInner>, mut file: File) {
Expand Down

0 comments on commit 1ce18e4

Please sign in to comment.