Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for input data considered and taken for compression #8522

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,23 @@ pub(crate) static CIRCUIT_BREAKERS_UNBROKEN: Lazy<IntCounter> = Lazy::new(|| {
pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_compression_image_in_bytes_total",
"Size of uncompressed data written into image layers"
"Size of data written into image layers before compression"
)
.expect("failed to define a metric")
});

pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_compression_image_in_bytes_considered",
"Size of potentially compressible data written into image layers before compression"
)
.expect("failed to define a metric")
});

pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_compression_image_in_bytes_chosen",
"Size of data whose compressed form was written into image layers"
)
.expect("failed to define a metric")
});
Expand Down
44 changes: 33 additions & 11 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::io::{Error, ErrorKind};

#[derive(Copy, Clone, Debug)]
pub struct CompressionInfo {
pub written_compressed: bool,
pub compressed_size: Option<usize>,
}

impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(
Expand Down Expand Up @@ -273,8 +279,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
self.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
.await
let (buf, res, _compression_info) = self
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
.await;
(buf, res)
}

/// Write a blob of data. Returns the offset that it was written to,
Expand All @@ -284,8 +292,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
srcbuf: B,
ctx: &RequestContext,
algorithm: ImageCompressionAlgorithm,
) -> (B::Buf, Result<u64, Error>) {
) -> (B::Buf, Result<u64, Error>, CompressionInfo) {
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
let offset = self.offset;
let mut compression_info = CompressionInfo {
written_compressed: false,
compressed_size: None,
};

let len = srcbuf.bytes_init();

Expand Down Expand Up @@ -328,7 +340,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
encoder.write_all(&slice[..]).await.unwrap();
encoder.shutdown().await.unwrap();
let compressed = encoder.into_inner();
compression_info.compressed_size = Some(compressed.len());
if compressed.len() < len {
compression_info.written_compressed = true;
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_ZSTD, compressed_len, slice.into_inner())
Expand All @@ -351,15 +365,21 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
self.io_buf = Some(io_buf);
match hdr_res {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
Err(e) => {
return (
Slice::into_inner(srcbuf.slice(..)),
Err(e),
compression_info,
)
}
}
let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf {
let (_buf, res) = self.write_all(compressed_buf, ctx).await;
(Slice::into_inner(srcbuf.slice(..)), res)
} else {
self.write_all(srcbuf, ctx).await
};
(srcbuf, res.map(|_| offset))
(srcbuf, res.map(|_| offset), compression_info)
}
}

Expand Down Expand Up @@ -416,12 +436,14 @@ pub(crate) mod tests {
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = if compression {
wtr.write_blob_maybe_compressed(
blob.clone(),
ctx,
ImageCompressionAlgorithm::Zstd { level: Some(1) },
)
.await
let res = wtr
.write_blob_maybe_compressed(
blob.clone(),
ctx,
ImageCompressionAlgorithm::Zstd { level: Some(1) },
)
.await;
(res.0, res.1)
} else {
wtr.write_blob(blob.clone(), ctx).await
};
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl DeltaLayerWriterInner {
);
// We don't want to use compression in delta layer creation
let compression = ImageCompressionAlgorithm::Disabled;
let (val, res) = self
let (val, res, _) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
Expand Down
26 changes: 24 additions & 2 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,14 @@ struct ImageLayerWriterInner {
// Total uncompressed bytes passed into put_image
uncompressed_bytes: u64,

// Like `uncompressed_bytes`,
// but only of images we might consider for compression
uncompressed_bytes_eligible: u64,

// Like `uncompressed_bytes`, but only of images
// where we have chosen their compressed form
uncompressed_bytes_chosen: u64,

blob_writer: BlobWriter<false>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
}
Expand Down Expand Up @@ -792,6 +800,8 @@ impl ImageLayerWriterInner {
tree: tree_builder,
blob_writer,
uncompressed_bytes: 0,
uncompressed_bytes_eligible: 0,
uncompressed_bytes_chosen: 0,
};

Ok(writer)
Expand All @@ -810,11 +820,20 @@ impl ImageLayerWriterInner {
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let compression = self.conf.image_compression;
self.uncompressed_bytes += img.len() as u64;
let (_img, res) = self
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
let (_img, res, compression_info) = self
.blob_writer
.write_blob_maybe_compressed(img, ctx, compression)
.await;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
}
if compression_info.written_compressed {
// The image has been compressed
self.uncompressed_bytes_chosen += uncompressed_len;
}
// TODO: re-use the buffer for `img` further upstack
let off = res?;

Expand All @@ -839,6 +858,9 @@ impl ImageLayerWriterInner {
// Calculate compression ratio
let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
.inc_by(self.uncompressed_bytes_eligible);
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);

let mut file = self.blob_writer.into_inner();
Expand Down
Loading