Skip to content

Commit

Permalink
feat(pageserver): add metrics for aux file size
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed May 6, 2024
1 parent 26efd47 commit 24d5d8e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 16 deletions.
47 changes: 47 additions & 0 deletions pageserver/src/aux_file.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::sync::{
atomic::{AtomicIsize, AtomicUsize},
Arc,
};

use bytes::{Buf, BufMut};
use futures::lock::Mutex;
use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE};
use tracing::warn;

Expand Down Expand Up @@ -112,6 +118,47 @@ pub fn encode_file_value(files: &[(&str, &[u8])]) -> anyhow::Result<Vec<u8>> {
Ok(encoded)
}

/// An estimation of the size of aux files.
pub struct AuxFileSizeEstimator {
size: Arc<std::sync::Mutex<Option<isize>>>,
}

impl AuxFileSizeEstimator {
pub fn new() -> Self {
Self {
size: Arc::new(std::sync::Mutex::new(None)),
}
}

pub fn on_base_backup(&self, new_size: usize) {
let mut guard = self.size.lock().unwrap();
*guard = Some(new_size as isize);
}

pub fn on_add(&self, file_size: usize) {
let mut guard = self.size.lock().unwrap();
if let Some(size) = &mut *guard {
*size += file_size as isize;
}
}

pub fn on_remove(&self, file_size: usize) {
let mut guard = self.size.lock().unwrap();
if let Some(size) = &mut *guard {
*size -= file_size as isize;
}
}

pub fn on_update(&self, old_size: usize, new_size: usize) {
let mut guard = self.size.lock().unwrap();
if let Some(size) = &mut *guard {
*size += new_size as isize - old_size as isize;
}
}

pub fn report(&self) {}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use std::sync::atomic::AtomicUsize;

use crate::task_mgr::TaskKind;

pub(crate) mod optional_counter;
Expand All @@ -98,6 +100,8 @@ pub struct RequestContext {
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
/// Total number of kilobytes of layer files processed in this request.
pub vectored_access_delta_file_size_kb: AtomicUsize,
}

/// The kind of access to the page cache.
Expand Down Expand Up @@ -154,6 +158,7 @@ impl RequestContextBuilder {
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
micros_spent_throttled: Default::default(),
vectored_access_delta_file_size_kb: AtomicUsize::new(0),
},
}
}
Expand All @@ -168,6 +173,7 @@ impl RequestContextBuilder {
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
micros_spent_throttled: Default::default(),
vectored_access_delta_file_size_kb: AtomicUsize::new(0),
},
}
}
Expand Down
52 changes: 39 additions & 13 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,17 @@ impl Timeline {
.await
.context("scan")?;
let mut result = HashMap::new();
let mut sz = 0;
for (_, v) in kv {
let v = v.context("get value")?;
let v = aux_file::decode_file_value(&v).context("value decode")?;
for (fname, content) in v {
result.insert(fname.to_string(), content.to_vec().into());
sz += fname.len();
sz += content.len();
}
}
self.aux_file_size_estimator.on_base_backup(sz);
Ok(result)
}

Expand Down Expand Up @@ -1471,23 +1475,45 @@ impl<'a> DatadirModification<'a> {
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
let files = if let Some(ref old_val) = old_val {
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
} else {
Vec::new()
};
let new_files = if content.is_empty() {
files
.into_iter()
.filter(|(p, _)| &path != p)
.collect::<Vec<_>>()
} else {
files
.into_iter()
.filter(|(p, _)| &path != p)
.chain(std::iter::once((path, content)))
.collect::<Vec<_>>()
};
let mut other_files = Vec::with_capacity(files.len());
let mut modifying_file = None;
for file @ (p, content) in files {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
);
modifying_file = Some(content);
} else {
other_files.push(file);
}
}
let mut new_files = other_files;
match (modifying_file, content.is_empty()) {
(Some(old_content), false) => {
self.tline
.aux_file_size_estimator
.on_update(old_content.len(), content.len());
new_files.push((path, content));
}
(Some(old_content), true) => {
self.tline
.aux_file_size_estimator
.on_remove(old_content.len());
// not adding the file key to the final `new_files` vec.
}
(None, false) => {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
(None, true) => anyhow::bail!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
}
Expand Down
13 changes: 10 additions & 3 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ use std::{
ops::ControlFlow,
};

use crate::tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
use crate::{
aux_file::AuxFileSizeEstimator,
tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
},
};
use crate::{
context::{DownloadBehavior, RequestContext},
Expand Down Expand Up @@ -407,6 +410,8 @@ pub struct Timeline {

/// Keep aux directory cache to avoid it's reconstruction on each update
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,

pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
}

pub struct WalReceiverInfo {
Expand Down Expand Up @@ -2253,6 +2258,8 @@ impl Timeline {
dir: None,
n_deltas: 0,
}),

aux_file_size_estimator: AuxFileSizeEstimator::new(),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
Expand Down

0 comments on commit 24d5d8e

Please sign in to comment.