From a33a88f8f0b29c9342c3e1c7bf550dbe50d7f4ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 2 Jul 2024 16:14:12 +0200 Subject: [PATCH] Add support for reading and writing compressed blobs (#8106) Add support for reading and writing zstd-compressed blobs for use in image layer generation, but maybe one day useful also for delta layers. The reading of them is unconditional while the writing is controlled by the `image_compression` config variable allowing for experiments. For the on-disk format, we re-use some of the bitpatterns we currently keep reserved for blobs larger than 256 MiB. This assumes that we have never ever written any such large blobs to image layers. After the preparation in #7852, we now are unable to read blobs with a size larger than 256 MiB (or write them). A non-goal of this PR is to come up with good heuristics of when to compress a bitpattern. This is left for future work. Parts of the PR were inspired by #7091. cc #7879 Part of #5431 --- libs/pageserver_api/src/models.rs | 18 ++ pageserver/src/config.rs | 21 ++- pageserver/src/tenant/blob_io.rs | 155 +++++++++++++++--- .../src/tenant/storage_layer/delta_layer.rs | 7 +- 4 files changed, 177 insertions(+), 24 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 61a255cdbc80..959e161c167a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -432,6 +432,24 @@ pub enum CompactionAlgorithm { Tiered, } +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Serialize, + Deserialize, + strum_macros::FromRepr, + strum_macros::EnumString, +)] +#[strum(serialize_all = "kebab-case")] +pub enum ImageCompressionAlgorithm { + /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well. + /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html). + Zstd { level: Option }, +} + #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub struct CompactionAlgorithmSettings { pub kind: CompactionAlgorithm, diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 2b698b75dcb1..470e941c33f9 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -5,7 +5,7 @@ //! See also `settings.md` for better description on every parameter. use anyhow::{anyhow, bail, ensure, Context, Result}; -use pageserver_api::shard::TenantShardId; +use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId}; use remote_storage::{RemotePath, RemoteStorageConfig}; use serde; use serde::de::IntoDeserializer; @@ -50,6 +50,7 @@ pub mod defaults { DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_PG_LISTEN_PORT, }; + use pageserver_api::models::ImageCompressionAlgorithm; pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT; pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s"; @@ -90,6 +91,8 @@ pub mod defaults { pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB + pub const DEFAULT_IMAGE_COMPRESSION: Option = None; + pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true; pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; @@ -285,6 +288,8 @@ pub struct PageServerConf { pub validate_vectored_get: bool, + pub image_compression: Option, + /// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this /// is exceeded, we start proactively closing ephemeral layers to limit the total amount /// of ephemeral data. @@ -395,6 +400,8 @@ struct PageServerConfigBuilder { validate_vectored_get: BuilderValue, + image_compression: BuilderValue>, + ephemeral_bytes_per_memory_kb: BuilderValue, } @@ -482,6 +489,7 @@ impl PageServerConfigBuilder { max_vectored_read_bytes: Set(MaxVectoredReadBytes( NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(), )), + image_compression: Set(DEFAULT_IMAGE_COMPRESSION), validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET), ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), } @@ -667,6 +675,10 @@ impl PageServerConfigBuilder { self.validate_vectored_get = BuilderValue::Set(value); } + pub fn get_image_compression(&mut self, value: Option) { + self.image_compression = BuilderValue::Set(value); + } + pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) { self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value); } @@ -727,6 +739,7 @@ impl PageServerConfigBuilder { get_impl, max_vectored_read_bytes, validate_vectored_get, + image_compression, ephemeral_bytes_per_memory_kb, } CUSTOM LOGIC @@ -1004,6 +1017,9 @@ impl PageServerConf { "validate_vectored_get" => { builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?) } + "image_compression" => { + builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?)) + } "ephemeral_bytes_per_memory_kb" => { builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize) } @@ -1088,6 +1104,7 @@ impl PageServerConf { NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES) .expect("Invalid default constant"), ), + image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, } @@ -1328,6 +1345,7 @@ background_task_maximum_delay = '334 s' .expect("Invalid default constant") ), validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, + image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, }, "Correct defaults should be used when no config values are provided" @@ -1401,6 +1419,7 @@ background_task_maximum_delay = '334 s' .expect("Invalid default constant") ), validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, + image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, }, "Should be able to parse all basic config values correctly" diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 2be8816cefbd..022801b17fba 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -6,12 +6,18 @@ //! is written as a one byte. If it's larger than that, the length //! is written as a four-byte integer, in big-endian, with the high //! bit set. This way, we can detect whether it's 1- or 4-byte header -//! by peeking at the first byte. +//! by peeking at the first byte. For blobs larger than 128 bits, +//! we also specify three reserved bits, only one of the three bit +//! patterns is currently in use (0b011) and signifies compression +//! with zstd. //! //! len < 128: 0XXXXXXX -//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX +//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! +use async_compression::Level; use bytes::{BufMut, BytesMut}; +use pageserver_api::models::ImageCompressionAlgorithm; +use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use crate::context::RequestContext; @@ -66,12 +72,29 @@ impl<'a> BlockCursor<'a> { len_buf.copy_from_slice(&buf[off..off + 4]); off += 4; } - len_buf[0] &= 0x7f; + len_buf[0] &= !LEN_COMPRESSION_BIT_MASK; u32::from_be_bytes(len_buf) as usize }; + let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK; - dstbuf.clear(); - dstbuf.reserve(len); + let mut tmp_buf = Vec::new(); + let buf_to_write; + let compression = if compression_bits <= BYTE_UNCOMPRESSED { + buf_to_write = dstbuf; + None + } else if compression_bits == BYTE_ZSTD { + buf_to_write = &mut tmp_buf; + Some(dstbuf) + } else { + let error = std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid compression byte {compression_bits:x}"), + ); + return Err(error); + }; + + buf_to_write.clear(); + buf_to_write.reserve(len); // Read the payload let mut remain = len; @@ -85,14 +108,35 @@ impl<'a> BlockCursor<'a> { page_remain = PAGE_SZ; } let this_blk_len = min(remain, page_remain); - dstbuf.extend_from_slice(&buf[off..off + this_blk_len]); + buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]); remain -= this_blk_len; off += this_blk_len; } + + if let Some(dstbuf) = compression { + if compression_bits == BYTE_ZSTD { + let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf); + decoder.write_all(buf_to_write).await?; + decoder.flush().await?; + } else { + unreachable!("already checked above") + } + } + Ok(()) } } +/// Reserved bits for length and compression +const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0; + +/// The maximum size of blobs we support. The highest few bits +/// are reserved for compression and other further uses. +const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff; + +const BYTE_UNCOMPRESSED: u8 = 0x80; +const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; + /// A wrapper of `VirtualFile` that allows users to write blobs. /// /// If a `BlobWriter` is dropped, the internal buffer will be @@ -219,6 +263,17 @@ impl BlobWriter { &mut self, srcbuf: B, ctx: &RequestContext, + ) -> (B::Buf, Result) { + self.write_blob_maybe_compressed(srcbuf, ctx, None).await + } + + /// Write a blob of data. Returns the offset that it was written to, + /// which can be used to retrieve the data later. + pub async fn write_blob_maybe_compressed, Buf: IoBuf + Send>( + &mut self, + srcbuf: B, + ctx: &RequestContext, + algorithm: Option, ) -> (B::Buf, Result) { let offset = self.offset; @@ -226,29 +281,58 @@ impl BlobWriter { let mut io_buf = self.io_buf.take().expect("we always put it back below"); io_buf.clear(); - let (io_buf, hdr_res) = async { + let mut compressed_buf = None; + let ((io_buf, hdr_res), srcbuf) = async { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - self.write_all(io_buf, ctx).await + ( + self.write_all(io_buf, ctx).await, + srcbuf.slice_full().into_inner(), + ) } else { // Write a 4-byte length header - if len > 0x7fff_ffff { + if len > MAX_SUPPORTED_LEN { return ( - io_buf, - Err(Error::new( - ErrorKind::Other, - format!("blob too large ({len} bytes)"), - )), + ( + io_buf, + Err(Error::new( + ErrorKind::Other, + format!("blob too large ({len} bytes)"), + )), + ), + srcbuf.slice_full().into_inner(), ); } - if len > 0x0fff_ffff { - tracing::warn!("writing blob above future limit ({len} bytes)"); - } - let mut len_buf = (len as u32).to_be_bytes(); - len_buf[0] |= 0x80; + let (high_bit_mask, len_written, srcbuf) = match algorithm { + Some(ImageCompressionAlgorithm::Zstd { level }) => { + let mut encoder = if let Some(level) = level { + async_compression::tokio::write::ZstdEncoder::with_quality( + Vec::new(), + Level::Precise(level.into()), + ) + } else { + async_compression::tokio::write::ZstdEncoder::new(Vec::new()) + }; + let slice = srcbuf.slice_full(); + encoder.write_all(&slice[..]).await.unwrap(); + encoder.shutdown().await.unwrap(); + let compressed = encoder.into_inner(); + if compressed.len() < len { + let compressed_len = compressed.len(); + compressed_buf = Some(compressed); + (BYTE_ZSTD, compressed_len, slice.into_inner()) + } else { + (BYTE_UNCOMPRESSED, len, slice.into_inner()) + } + } + None => (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()), + }; + let mut len_buf = (len_written as u32).to_be_bytes(); + assert_eq!(len_buf[0] & 0xf0, 0); + len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - self.write_all(io_buf, ctx).await + (self.write_all(io_buf, ctx).await, srcbuf) } } .await; @@ -257,7 +341,12 @@ impl BlobWriter { Ok(_) => (), Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), } - let (srcbuf, res) = self.write_all(srcbuf, ctx).await; + let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf { + let (_buf, res) = self.write_all(compressed_buf, ctx).await; + (Slice::into_inner(srcbuf.slice(..)), res) + } else { + self.write_all(srcbuf, ctx).await + }; (srcbuf, res.map(|_| offset)) } } @@ -295,6 +384,12 @@ mod tests { use rand::{Rng, SeedableRng}; async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { + round_trip_test_compressed::(blobs).await + } + + async fn round_trip_test_compressed( + blobs: &[Vec], + ) -> Result<(), Error> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); @@ -305,7 +400,18 @@ mod tests { let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { - let (_, res) = wtr.write_blob(blob.clone(), &ctx).await; + let (_, res) = match COMPRESSION { + 0 => wtr.write_blob(blob.clone(), &ctx).await, + 1 => { + wtr.write_blob_maybe_compressed( + blob.clone(), + &ctx, + Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }), + ) + .await + } + _ => unreachable!("Invalid compression {COMPRESSION}"), + }; let offs = res?; offsets.push(offs); } @@ -361,10 +467,15 @@ mod tests { let blobs = &[ b"test".to_vec(), random_array(10 * PAGE_SZ), + b"hello".to_vec(), + random_array(66 * PAGE_SZ), + vec![0xf3; 24 * PAGE_SZ], b"foobar".to_vec(), ]; round_trip_test::(blobs).await?; round_trip_test::(blobs).await?; + round_trip_test_compressed::(blobs).await?; + round_trip_test_compressed::(blobs).await?; Ok(()) } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index c2d4a2776b1d..e6a4d6d5c45a 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -452,7 +452,12 @@ impl DeltaLayerWriterInner { ctx: &RequestContext, ) -> (Vec, anyhow::Result<()>) { assert!(self.lsn_range.start <= lsn); - let (val, res) = self.blob_writer.write_blob(val, ctx).await; + // We don't want to use compression in delta layer creation + let compression = None; + let (val, res) = self + .blob_writer + .write_blob_maybe_compressed(val, ctx, compression) + .await; let off = match res { Ok(off) => off, Err(e) => return (val, Err(anyhow::anyhow!(e))),