diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 022801b17fba..de74066b81bc 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -19,6 +19,7 @@ use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use tracing::warn; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; @@ -72,14 +73,22 @@ impl<'a> BlockCursor<'a> { len_buf.copy_from_slice(&buf[off..off + 4]); off += 4; } - len_buf[0] &= !LEN_COMPRESSION_BIT_MASK; + let bit_mask = if self.read_compressed { + !LEN_COMPRESSION_BIT_MASK + } else { + 0x7f + }; + len_buf[0] &= bit_mask; u32::from_be_bytes(len_buf) as usize }; let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK; let mut tmp_buf = Vec::new(); let buf_to_write; - let compression = if compression_bits <= BYTE_UNCOMPRESSED { + let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed { + if compression_bits > BYTE_UNCOMPRESSED { + warn!("reading key above future limit ({len} bytes)"); + } buf_to_write = dstbuf; None } else if compression_bits == BYTE_ZSTD { @@ -384,10 +393,10 @@ mod tests { use rand::{Rng, SeedableRng}; async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { - round_trip_test_compressed::(blobs).await + round_trip_test_compressed::(blobs).await } - async fn round_trip_test_compressed( + async fn round_trip_test_compressed( blobs: &[Vec], ) -> Result<(), Error> { let temp_dir = camino_tempfile::tempdir()?; @@ -400,17 +409,15 @@ mod tests { let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { - let (_, res) = match COMPRESSION { - 0 => wtr.write_blob(blob.clone(), &ctx).await, - 1 => { - wtr.write_blob_maybe_compressed( - blob.clone(), - &ctx, - Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }), - ) - .await - } - _ => unreachable!("Invalid compression {COMPRESSION}"), + let (_, res) = if COMPRESSION { + wtr.write_blob_maybe_compressed( + blob.clone(), + &ctx, + Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }), + ) + .await + } else { + wtr.write_blob(blob.clone(), &ctx).await }; let offs = res?; offsets.push(offs); @@ -425,7 +432,7 @@ mod tests { let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?; let rdr = BlockReaderRef::VirtualFile(&file); - let rdr = BlockCursor::new(rdr); + let rdr = BlockCursor::new_with_compression(rdr, COMPRESSION); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { let blob_read = rdr.read_blob(*offset, &ctx).await?; assert_eq!( @@ -459,6 +466,8 @@ mod tests { ]; round_trip_test::(blobs).await?; round_trip_test::(blobs).await?; + round_trip_test_compressed::(blobs).await?; + round_trip_test_compressed::(blobs).await?; Ok(()) } @@ -474,8 +483,8 @@ mod tests { ]; round_trip_test::(blobs).await?; round_trip_test::(blobs).await?; - round_trip_test_compressed::(blobs).await?; - round_trip_test_compressed::(blobs).await?; + round_trip_test_compressed::(blobs).await?; + round_trip_test_compressed::(blobs).await?; Ok(()) } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 85f3b1c79942..3324e840ecf1 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -149,16 +149,24 @@ impl<'a> BlockReaderRef<'a> { /// ``` /// pub struct BlockCursor<'a> { + pub(super) read_compressed: bool, reader: BlockReaderRef<'a>, } impl<'a> BlockCursor<'a> { pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self { - BlockCursor { reader } + Self::new_with_compression(reader, false) + } + pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self { + BlockCursor { + read_compressed, + reader, + } } // Needed by cli pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self { BlockCursor { + read_compressed: false, reader: BlockReaderRef::FileBlockReader(reader), } } @@ -188,11 +196,25 @@ pub struct FileBlockReader<'a> { /// Unique ID of this file, used as key in the page cache. file_id: page_cache::FileId, + + compressed_reads: bool, } impl<'a> FileBlockReader<'a> { pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self { - FileBlockReader { file_id, file } + Self::new_with_compression(file, file_id, false) + } + + pub fn new_with_compression( + file: &'a VirtualFile, + file_id: FileId, + compressed_reads: bool, + ) -> Self { + FileBlockReader { + file_id, + file, + compressed_reads, + } } /// Read a page from the underlying file into given buffer. @@ -239,7 +261,10 @@ impl<'a> FileBlockReader<'a> { impl BlockReader for FileBlockReader<'_> { fn block_cursor(&self) -> BlockCursor<'_> { - BlockCursor::new(BlockReaderRef::FileBlockReader(self)) + BlockCursor::new_with_compression( + BlockReaderRef::FileBlockReader(self), + self.compressed_reads, + ) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 50aacbd9ad46..4a1b3a02377a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -165,6 +165,7 @@ pub struct ImageLayerInner { file_id: FileId, max_vectored_read_bytes: Option, + compressed_reads: bool, } impl std::fmt::Debug for ImageLayerInner { @@ -178,7 +179,8 @@ impl std::fmt::Debug for ImageLayerInner { impl ImageLayerInner { pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> { - let block_reader = FileBlockReader::new(&self.file, self.file_id); + let block_reader = + FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads); let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new( self.index_start_blk, self.index_root_blk, @@ -266,9 +268,10 @@ impl ImageLayer { async fn load_inner(&self, ctx: &RequestContext) -> Result { let path = self.path(); - let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx) - .await - .and_then(|res| res)?; + let loaded = + ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, false, ctx) + .await + .and_then(|res| res)?; // not production code let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap(); @@ -377,6 +380,7 @@ impl ImageLayerInner { lsn: Lsn, summary: Option, max_vectored_read_bytes: Option, + support_compressed_reads: bool, ctx: &RequestContext, ) -> Result, anyhow::Error> { let file = match VirtualFile::open(path, ctx).await { @@ -420,6 +424,7 @@ impl ImageLayerInner { file, file_id, max_vectored_read_bytes, + compressed_reads: support_compressed_reads, key_range: actual_summary.key_range, })) } @@ -430,7 +435,8 @@ impl ImageLayerInner { reconstruct_state: &mut ValueReconstructState, ctx: &RequestContext, ) -> anyhow::Result { - let block_reader = FileBlockReader::new(&self.file, self.file_id); + let block_reader = + FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads); let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader); @@ -490,12 +496,14 @@ impl ImageLayerInner { &self, ctx: &RequestContext, ) -> anyhow::Result> { - let block_reader = FileBlockReader::new(&self.file, self.file_id); + let block_reader = + FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads); let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader); let mut result = Vec::new(); let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx)); - let block_reader = FileBlockReader::new(&self.file, self.file_id); + let block_reader = + FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads); let cursor = block_reader.block_cursor(); while let Some(item) = stream.next().await { // TODO: dedup code with get_reconstruct_value @@ -530,7 +538,8 @@ impl ImageLayerInner { .into(), ); - let block_reader = FileBlockReader::new(&self.file, self.file_id); + let block_reader = + FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads); let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); @@ -691,7 +700,8 @@ impl ImageLayerInner { #[cfg(test)] pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> { - let block_reader = FileBlockReader::new(&self.file, self.file_id); + let block_reader = + FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads); let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); ImageLayerIterator { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 02069c29d264..d1f5cc8f43a7 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1685,6 +1685,7 @@ impl DownloadedLayer { lsn, summary, Some(owner.conf.max_vectored_read_bytes), + owner.conf.image_compression.is_some(), ctx, ) .await