From 0e600eb921689aad27bee3291d0dceac84314280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 12 Jul 2024 04:32:34 +0200 Subject: [PATCH] Implement decompression for vectored reads (#8302) Implement decompression of images for vectored reads. This doesn't implement support for still treating blobs as uncompressed with the bits we reserved for compression, as we have removed that functionality in #8300 anyways. Part of #5431 --- pageserver/src/tenant/blob_io.rs | 40 ++++--- pageserver/src/tenant/vectored_blob_io.rs | 127 +++++++++++++++++++--- 2 files changed, 139 insertions(+), 28 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index e98ed66ef998..791eefebe989 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -137,14 +137,14 @@ impl<'a> BlockCursor<'a> { } /// Reserved bits for length and compression -const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0; +pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0; /// The maximum size of blobs we support. The highest few bits /// are reserved for compression and other further uses. const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff; -const BYTE_UNCOMPRESSED: u8 = 0x80; -const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; +pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80; +pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; /// A wrapper of `VirtualFile` that allows users to write blobs. /// @@ -390,51 +390,63 @@ impl BlobWriter { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef}; + use camino::Utf8PathBuf; + use camino_tempfile::Utf8TempDir; use rand::{Rng, SeedableRng}; async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { round_trip_test_compressed::(blobs, false).await } - async fn round_trip_test_compressed( + pub(crate) async fn write_maybe_compressed( blobs: &[Vec], compression: bool, - ) -> Result<(), Error> { + ctx: &RequestContext, + ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec), Error> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?; + let file = VirtualFile::create(pathbuf.as_path(), ctx).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { let (_, res) = if compression { wtr.write_blob_maybe_compressed( blob.clone(), - &ctx, + ctx, ImageCompressionAlgorithm::Zstd { level: Some(1) }, ) .await } else { - wtr.write_blob(blob.clone(), &ctx).await + wtr.write_blob(blob.clone(), ctx).await }; let offs = res?; offsets.push(offs); } // Write out one page worth of zeros so that we can // read again with read_blk - let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await; + let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await; let offs = res?; println!("Writing final blob at offs={offs}"); - wtr.flush_buffer(&ctx).await?; + wtr.flush_buffer(ctx).await?; } + Ok((temp_dir, pathbuf, offsets)) + } + + async fn round_trip_test_compressed( + blobs: &[Vec], + compression: bool, + ) -> Result<(), Error> { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let (_temp_dir, pathbuf, offsets) = + write_maybe_compressed::(blobs, compression, &ctx).await?; - let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?; + let file = VirtualFile::open(pathbuf, &ctx).await?; let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new_with_compression(rdr, compression); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { @@ -447,7 +459,7 @@ mod tests { Ok(()) } - fn random_array(len: usize) -> Vec { + pub(crate) fn random_array(len: usize) -> Vec { let mut rng = rand::thread_rng(); (0..len).map(|_| rng.gen()).collect::<_>() } diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 1b470034db21..cb81f1d76d6a 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -20,11 +20,13 @@ use std::num::NonZeroUsize; use bytes::BytesMut; use pageserver_api::key::Key; +use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; use utils::lsn::Lsn; use utils::vec_map::VecMap; use crate::context::RequestContext; +use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK}; use crate::virtual_file::VirtualFile; #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -301,7 +303,7 @@ impl<'a> VectoredBlobReader<'a> { read.size(), buf.capacity() ); - let buf = self + let mut buf = self .file .read_exact_at(buf.slice(0..read.size()), read.start, ctx) .await? @@ -323,38 +325,68 @@ impl<'a> VectoredBlobReader<'a> { .chain(std::iter::once(None)), ); + // Some scratch space, put here for reusing the allocation + let mut decompressed_vec = Vec::new(); + for ((offset, meta), next) in pairs { let offset_in_buf = offset - start_offset; let first_len_byte = buf[offset_in_buf as usize]; - // Each blob is prefixed by a header containing it's size. + // Each blob is prefixed by a header containing its size and compression information. // Extract the size and skip that header to find the start of the data. // The size can be 1 or 4 bytes. The most significant bit is 0 in the // 1 byte case and 1 in the 4 byte case. - let (size_length, blob_size) = if first_len_byte < 0x80 { - (1, first_len_byte as u64) + let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 { + (1, first_len_byte as u64, BYTE_UNCOMPRESSED) } else { let mut blob_size_buf = [0u8; 4]; let offset_in_buf = offset_in_buf as usize; blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]); - blob_size_buf[0] &= 0x7f; - (4, u32::from_be_bytes(blob_size_buf) as u64) + blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK; + + let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK; + ( + 4, + u32::from_be_bytes(blob_size_buf) as u64, + compression_bits, + ) }; - let start = offset_in_buf + size_length; - let end = match next { + let start_raw = offset_in_buf + size_length; + let end_raw = match next { Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset, - None => start + blob_size, + None => start_raw + blob_size, }; - - assert_eq!(end - start, blob_size); + assert_eq!(end_raw - start_raw, blob_size); + let (start, end); + if compression_bits == BYTE_UNCOMPRESSED { + start = start_raw as usize; + end = end_raw as usize; + } else if compression_bits == BYTE_ZSTD { + let mut decoder = + async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec); + decoder + .write_all(&buf[start_raw as usize..end_raw as usize]) + .await?; + decoder.flush().await?; + start = buf.len(); + buf.extend_from_slice(&decompressed_vec); + end = buf.len(); + decompressed_vec.clear(); + } else { + let error = std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid compression byte {compression_bits:x}"), + ); + return Err(error); + } metas.push(VectoredBlob { - start: start as usize, - end: end as usize, + start, + end, meta: *meta, - }) + }); } Ok(VectoredBlobsBuf { buf, blobs: metas }) @@ -471,6 +503,13 @@ impl StreamingVectoredReadPlanner { #[cfg(test)] mod tests { + use anyhow::Error; + + use crate::context::DownloadBehavior; + use crate::page_cache::PAGE_SZ; + use crate::task_mgr::TaskKind; + + use super::super::blob_io::tests::{random_array, write_maybe_compressed}; use super::*; fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) { @@ -687,4 +726,64 @@ mod tests { ); } } + + async fn round_trip_test_compressed(blobs: &[Vec], compression: bool) -> Result<(), Error> { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let (_temp_dir, pathbuf, offsets) = + write_maybe_compressed::(blobs, compression, &ctx).await?; + + let file = VirtualFile::open(&pathbuf, &ctx).await?; + let file_len = std::fs::metadata(&pathbuf)?.len(); + + // Multiply by two (compressed data might need more space), and add a few bytes for the header + let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16; + let mut buf = BytesMut::with_capacity(reserved_bytes); + + let vectored_blob_reader = VectoredBlobReader::new(&file); + let meta = BlobMeta { + key: Key::MIN, + lsn: Lsn(0), + }; + + for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { + let end = offsets.get(idx + 1).unwrap_or(&file_len); + if idx + 1 == offsets.len() { + continue; + } + let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096); + let read = read_builder.build(); + let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?; + assert_eq!(result.blobs.len(), 1); + let read_blob = &result.blobs[0]; + let read_buf = &result.buf[read_blob.start..read_blob.end]; + assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}"); + buf = result.buf; + } + Ok(()) + } + + #[tokio::test] + async fn test_really_big_array() -> Result<(), Error> { + let blobs = &[ + b"test".to_vec(), + random_array(10 * PAGE_SZ), + b"hello".to_vec(), + random_array(66 * PAGE_SZ), + vec![0xf3; 24 * PAGE_SZ], + b"foobar".to_vec(), + ]; + round_trip_test_compressed(blobs, false).await?; + round_trip_test_compressed(blobs, true).await?; + Ok(()) + } + + #[tokio::test] + async fn test_arrays_inc() -> Result<(), Error> { + let blobs = (0..PAGE_SZ / 8) + .map(|v| random_array(v * 16)) + .collect::>(); + round_trip_test_compressed(&blobs, false).await?; + round_trip_test_compressed(&blobs, true).await?; + Ok(()) + } }