From 7a5b28a39b6403b592484b862a608fe813484ec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 26 Jul 2024 02:51:31 +0200 Subject: [PATCH 1/5] Add metrics for input data considered and taken for compression --- pageserver/src/metrics.rs | 18 ++++++++++++- pageserver/src/tenant/blob_io.rs | 24 +++++++++++++---- .../src/tenant/storage_layer/image_layer.rs | 27 +++++++++++++++++-- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 9aff5220f59e..ef8e0d4652ca 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -613,7 +613,23 @@ pub(crate) static CIRCUIT_BREAKERS_UNBROKEN: Lazy = Lazy::new(|| { pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES: Lazy = Lazy::new(|| { register_int_counter!( "pageserver_compression_image_in_bytes_total", - "Size of uncompressed data written into image layers" + "Size of data written into image layers before compression" + ) + .expect("failed to define a metric") +}); + +pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_ELIGIBLE: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_compression_image_in_bytes_large", + "Size of potentially compressible data written into image layers before compression" + ) + .expect("failed to define a metric") +}); + +pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_compression_image_in_bytes_large", + "Size of data whose compressed form was written into image layers" ) .expect("failed to define a metric") }); diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 791eefebe989..4f7a64bb2e94 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -28,6 +28,12 @@ use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; +#[derive(Copy, Clone, Debug)] +pub struct CompressionInfo { + pub written_compressed: bool, + pub compressed_size: Option, +} + impl<'a> BlockCursor<'a> { /// Read a blob into a new buffer. pub async fn read_blob( @@ -273,8 +279,10 @@ impl BlobWriter { srcbuf: B, ctx: &RequestContext, ) -> (B::Buf, Result) { - self.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) - .await + let (buf, res, _compression_info) = self + .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) + .await; + (buf, res) } /// Write a blob of data. Returns the offset that it was written to, @@ -284,8 +292,12 @@ impl BlobWriter { srcbuf: B, ctx: &RequestContext, algorithm: ImageCompressionAlgorithm, - ) -> (B::Buf, Result) { + ) -> (B::Buf, Result, CompressionInfo) { let offset = self.offset; + let mut compression_info = CompressionInfo { + written_compressed: false, + compressed_size: None, + }; let len = srcbuf.bytes_init(); @@ -328,7 +340,9 @@ impl BlobWriter { encoder.write_all(&slice[..]).await.unwrap(); encoder.shutdown().await.unwrap(); let compressed = encoder.into_inner(); + compression_info.compressed_size = Some(compressed.len()); if compressed.len() < len { + compression_info.written_compressed = true; let compressed_len = compressed.len(); compressed_buf = Some(compressed); (BYTE_ZSTD, compressed_len, slice.into_inner()) @@ -344,7 +358,7 @@ impl BlobWriter { assert_eq!(len_buf[0] & 0xf0, 0); len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - (self.write_all(io_buf, ctx).await, srcbuf) + (self.write_all(io_buf, ctx).await, srcbuf, compression_info) } } .await; @@ -359,7 +373,7 @@ impl BlobWriter { } else { self.write_all(srcbuf, ctx).await }; - (srcbuf, res.map(|_| offset)) + (srcbuf, res.map(|_| offset), compression_info) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index e5e7f7192880..441fdfbda71c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -49,6 +49,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; use pageserver_api::keyspace::KeySpace; +use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; @@ -736,6 +737,14 @@ struct ImageLayerWriterInner { // Total uncompressed bytes passed into put_image uncompressed_bytes: u64, + // Like `uncompressed_bytes`, + // but only of images we might consider for compression + uncompressed_bytes_eligible: u64, + + // Like `uncompressed_bytes`, but only of images + // where we have chosen their compressed form + uncompressed_bytes_chosen: u64, + blob_writer: BlobWriter, tree: DiskBtreeBuilder, } @@ -792,6 +801,8 @@ impl ImageLayerWriterInner { tree: tree_builder, blob_writer, uncompressed_bytes: 0, + uncompressed_bytes_eligible: 0, + uncompressed_bytes_chosen: 0, }; Ok(writer) @@ -810,11 +821,20 @@ impl ImageLayerWriterInner { ) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); let compression = self.conf.image_compression; - self.uncompressed_bytes += img.len() as u64; - let (_img, res) = self + let uncompressed_len = img.len() as u64; + self.uncompressed_bytes += uncompressed_len; + let (_img, res, compression_info) = self .blob_writer .write_blob_maybe_compressed(img, ctx, compression) .await; + if compression_info.compressed_size.is_some() { + // The image has been considered for compression at least + self.uncompressed_bytes_eligible += uncompressed_len; + } + if compression_info.written_compressed { + // The image has been compressed + self.uncompressed_bytes_chosen += uncompressed_len; + } // TODO: re-use the buffer for `img` further upstack let off = res?; @@ -839,6 +859,9 @@ impl ImageLayerWriterInner { // Calculate compression ratio let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes); + crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_ELIGIBLE + .inc_by(self.uncompressed_bytes_eligible); + crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen); crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); let mut file = self.blob_writer.into_inner(); From ceb9df6445fab5e2c771ec5746b3bfcb50daede7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 26 Jul 2024 02:59:55 +0200 Subject: [PATCH 2/5] Rename --- pageserver/src/metrics.rs | 6 +++--- pageserver/src/tenant/storage_layer/image_layer.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index ef8e0d4652ca..ede6b41a7509 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -618,9 +618,9 @@ pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES: Lazy = Lazy::new(|| .expect("failed to define a metric") }); -pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_ELIGIBLE: Lazy = Lazy::new(|| { +pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED: Lazy = Lazy::new(|| { register_int_counter!( - "pageserver_compression_image_in_bytes_large", + "pageserver_compression_image_in_bytes_considered", "Size of potentially compressible data written into image layers before compression" ) .expect("failed to define a metric") @@ -628,7 +628,7 @@ pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_ELIGIBLE: Lazy = Laz pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN: Lazy = Lazy::new(|| { register_int_counter!( - "pageserver_compression_image_in_bytes_large", + "pageserver_compression_image_in_bytes_chosen", "Size of data whose compressed form was written into image layers" ) .expect("failed to define a metric") diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 441fdfbda71c..b519272f287b 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -859,7 +859,7 @@ impl ImageLayerWriterInner { // Calculate compression ratio let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes); - crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_ELIGIBLE + crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED .inc_by(self.uncompressed_bytes_eligible); crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen); crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size); From 7514175785bb7bdf0d6c7dd0bc03d2904a4d3128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 26 Jul 2024 04:14:53 +0200 Subject: [PATCH 3/5] Remove unused import --- pageserver/src/tenant/storage_layer/image_layer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b519272f287b..a8cb90bbc31c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -49,7 +49,6 @@ use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; use pageserver_api::keyspace::KeySpace; -use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; From 1ea1da8a30ef34ffae1d1da8835d41007c8a6f7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 26 Jul 2024 06:37:23 +0200 Subject: [PATCH 4/5] Fix build --- pageserver/src/tenant/blob_io.rs | 24 ++++++++++++------- .../src/tenant/storage_layer/delta_layer.rs | 2 +- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 4f7a64bb2e94..34ac678bf62c 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -358,14 +358,20 @@ impl BlobWriter { assert_eq!(len_buf[0] & 0xf0, 0); len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - (self.write_all(io_buf, ctx).await, srcbuf, compression_info) + (self.write_all(io_buf, ctx).await, srcbuf) } } .await; self.io_buf = Some(io_buf); match hdr_res { Ok(_) => (), - Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), + Err(e) => { + return ( + Slice::into_inner(srcbuf.slice(..)), + Err(e), + compression_info, + ) + } } let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf { let (_buf, res) = self.write_all(compressed_buf, ctx).await; @@ -430,12 +436,14 @@ pub(crate) mod tests { let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { let (_, res) = if compression { - wtr.write_blob_maybe_compressed( - blob.clone(), - ctx, - ImageCompressionAlgorithm::Zstd { level: Some(1) }, - ) - .await + let res = wtr + .write_blob_maybe_compressed( + blob.clone(), + ctx, + ImageCompressionAlgorithm::Zstd { level: Some(1) }, + ) + .await; + (res.0, res.1) } else { wtr.write_blob(blob.clone(), ctx).await }; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 586a7b7836a6..ee0d0fb0657a 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -464,7 +464,7 @@ impl DeltaLayerWriterInner { ); // We don't want to use compression in delta layer creation let compression = ImageCompressionAlgorithm::Disabled; - let (val, res) = self + let (val, res, _) = self .blob_writer .write_blob_maybe_compressed(val, ctx, compression) .await; From 0e7d4939ee2f98939c6121d0abe9eb1a4aa916dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 29 Jul 2024 17:42:35 +0200 Subject: [PATCH 5/5] Move CompressionInfo into the Result --- pageserver/src/tenant/blob_io.rs | 18 ++++++------------ .../src/tenant/storage_layer/delta_layer.rs | 4 ++-- .../src/tenant/storage_layer/image_layer.rs | 6 +++--- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 34ac678bf62c..8e9d349ca88c 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -279,10 +279,10 @@ impl BlobWriter { srcbuf: B, ctx: &RequestContext, ) -> (B::Buf, Result) { - let (buf, res, _compression_info) = self + let (buf, res) = self .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) .await; - (buf, res) + (buf, res.map(|(off, _compression_info)| off)) } /// Write a blob of data. Returns the offset that it was written to, @@ -292,7 +292,7 @@ impl BlobWriter { srcbuf: B, ctx: &RequestContext, algorithm: ImageCompressionAlgorithm, - ) -> (B::Buf, Result, CompressionInfo) { + ) -> (B::Buf, Result<(u64, CompressionInfo), Error>) { let offset = self.offset; let mut compression_info = CompressionInfo { written_compressed: false, @@ -365,13 +365,7 @@ impl BlobWriter { self.io_buf = Some(io_buf); match hdr_res { Ok(_) => (), - Err(e) => { - return ( - Slice::into_inner(srcbuf.slice(..)), - Err(e), - compression_info, - ) - } + Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), } let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf { let (_buf, res) = self.write_all(compressed_buf, ctx).await; @@ -379,7 +373,7 @@ impl BlobWriter { } else { self.write_all(srcbuf, ctx).await }; - (srcbuf, res.map(|_| offset), compression_info) + (srcbuf, res.map(|_| (offset, compression_info))) } } @@ -443,7 +437,7 @@ pub(crate) mod tests { ImageCompressionAlgorithm::Zstd { level: Some(1) }, ) .await; - (res.0, res.1) + (res.0, res.1.map(|(off, _)| off)) } else { wtr.write_blob(blob.clone(), ctx).await }; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ee0d0fb0657a..7a128b0317ef 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -464,12 +464,12 @@ impl DeltaLayerWriterInner { ); // We don't want to use compression in delta layer creation let compression = ImageCompressionAlgorithm::Disabled; - let (val, res, _) = self + let (val, res) = self .blob_writer .write_blob_maybe_compressed(val, ctx, compression) .await; let off = match res { - Ok(off) => off, + Ok((off, _)) => off, Err(e) => return (val, Err(anyhow::anyhow!(e))), }; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a8cb90bbc31c..07b9891433c6 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -822,10 +822,12 @@ impl ImageLayerWriterInner { let compression = self.conf.image_compression; let uncompressed_len = img.len() as u64; self.uncompressed_bytes += uncompressed_len; - let (_img, res, compression_info) = self + let (_img, res) = self .blob_writer .write_blob_maybe_compressed(img, ctx, compression) .await; + // TODO: re-use the buffer for `img` further upstack + let (off, compression_info) = res?; if compression_info.compressed_size.is_some() { // The image has been considered for compression at least self.uncompressed_bytes_eligible += uncompressed_len; @@ -834,8 +836,6 @@ impl ImageLayerWriterInner { // The image has been compressed self.uncompressed_bytes_chosen += uncompressed_len; } - // TODO: re-use the buffer for `img` further upstack - let off = res?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf);