From 96f697bf16c6a510e8a20bfeab54335d90fbb143 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 6 Jul 2024 02:16:14 +0200 Subject: [PATCH 1/6] Implement decompression for vectored reads --- pageserver/src/tenant/blob_io.rs | 6 +-- pageserver/src/tenant/vectored_blob_io.rs | 60 +++++++++++++++++------ 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 0705182d5db2..525d8dc6ff97 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -137,14 +137,14 @@ impl<'a> BlockCursor<'a> { } /// Reserved bits for length and compression -const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0; +pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0; /// The maximum size of blobs we support. The highest few bits /// are reserved for compression and other further uses. const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff; -const BYTE_UNCOMPRESSED: u8 = 0x80; -const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; +pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80; +pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; /// A wrapper of `VirtualFile` that allows users to write blobs. /// diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 7ad8446e0411..4b91d0ce7be6 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -20,11 +20,13 @@ use std::num::NonZeroUsize; use bytes::BytesMut; use pageserver_api::key::Key; +use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; use utils::lsn::Lsn; use utils::vec_map::VecMap; use crate::context::RequestContext; +use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK}; use crate::virtual_file::VirtualFile; #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -315,7 +317,7 @@ impl<'a> VectoredBlobReader<'a> { read.size(), buf.capacity() ); - let buf = self + let mut buf = self .file .read_exact_at(buf.slice(0..read.size()), read.start, ctx) .await? @@ -337,38 +339,68 @@ impl<'a> VectoredBlobReader<'a> { .chain(std::iter::once(None)), ); + // Some scratch space, put here for reusing the allocation + let mut decompressed_vec = Vec::new(); + for ((offset, meta), next) in pairs { let offset_in_buf = offset - start_offset; let first_len_byte = buf[offset_in_buf as usize]; - // Each blob is prefixed by a header containing it's size. + // Each blob is prefixed by a header containing its size and compression information. // Extract the size and skip that header to find the start of the data. // The size can be 1 or 4 bytes. The most significant bit is 0 in the // 1 byte case and 1 in the 4 byte case. - let (size_length, blob_size) = if first_len_byte < 0x80 { - (1, first_len_byte as u64) + let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 { + (1, first_len_byte as u64, BYTE_UNCOMPRESSED) } else { let mut blob_size_buf = [0u8; 4]; let offset_in_buf = offset_in_buf as usize; blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]); - blob_size_buf[0] &= 0x7f; - (4, u32::from_be_bytes(blob_size_buf) as u64) + blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK; + + let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK; + ( + 4, + u32::from_be_bytes(blob_size_buf) as u64, + compression_bits, + ) }; - let start = offset_in_buf + size_length; - let end = match next { + let start_raw = offset_in_buf + size_length; + let end_raw = match next { Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset, - None => start + blob_size, + None => start_raw + blob_size, }; - - assert_eq!(end - start, blob_size); + assert_eq!(end_raw - start_raw, blob_size); + let (start, end); + if compression_bits == BYTE_UNCOMPRESSED { + start = start_raw as usize; + end = end_raw as usize; + } else if compression_bits == BYTE_ZSTD { + let mut decoder = + async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec); + decoder + .write_all(&buf[start_raw as usize..end_raw as usize]) + .await?; + decoder.flush().await?; + start = buf.len(); + buf.extend_from_slice(&decompressed_vec); + end = buf.len(); + decompressed_vec.clear(); + } else { + let error = std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid compression byte {compression_bits:x}"), + ); + return Err(error); + } metas.push(VectoredBlob { - start: start as usize, - end: end as usize, + start, + end, meta: *meta, - }) + }); } Ok(VectoredBlobsBuf { buf, blobs: metas }) From 3e879be3a77032d98416f9be4ea6d170bf286e51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 8 Jul 2024 14:45:32 +0200 Subject: [PATCH 2/6] Add tests --- pageserver/src/tenant/blob_io.rs | 24 ++++++-- pageserver/src/tenant/vectored_blob_io.rs | 74 +++++++++++++++++++++++ 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 525d8dc6ff97..1cb9ac28c411 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -395,22 +395,24 @@ impl BlobWriter { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef}; + use camino::Utf8PathBuf; + use camino_tempfile::Utf8TempDir; use rand::{Rng, SeedableRng}; async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { round_trip_test_compressed::(blobs, false).await } - async fn round_trip_test_compressed( + pub(crate) async fn write_maybe_compressed( blobs: &[Vec], compression: bool, - ) -> Result<(), Error> { + ctx: &RequestContext, + ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec), Error> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); // Write part (in block to drop the file) let mut offsets = Vec::new(); @@ -438,8 +440,18 @@ mod tests { println!("Writing final blob at offs={offs}"); wtr.flush_buffer(&ctx).await?; } + Ok((temp_dir, pathbuf, offsets)) + } + + async fn round_trip_test_compressed( + blobs: &[Vec], + compression: bool, + ) -> Result<(), Error> { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let (_temp_dir, pathbuf, offsets) = + write_maybe_compressed::(blobs, compression, &ctx).await?; - let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?; + let file = VirtualFile::open(pathbuf, &ctx).await?; let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new_with_compression(rdr, compression); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { @@ -452,7 +464,7 @@ mod tests { Ok(()) } - fn random_array(len: usize) -> Vec { + pub(crate) fn random_array(len: usize) -> Vec { let mut rng = rand::thread_rng(); (0..len).map(|_| rng.gen()).collect::<_>() } diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 4b91d0ce7be6..8aea43a0af42 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -490,6 +490,13 @@ impl StreamingVectoredReadPlanner { #[cfg(test)] mod tests { + use anyhow::Error; + + use crate::context::DownloadBehavior; + use crate::page_cache::PAGE_SZ; + use crate::task_mgr::TaskKind; + + use super::super::blob_io::tests::{random_array, write_maybe_compressed}; use super::*; fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) { @@ -580,4 +587,71 @@ mod tests { validate_read(read, ranges[idx]); } } + + async fn round_trip_test_compressed( + blobs: &[Vec], + compression: bool, + ) -> Result<(), Error> { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let (_temp_dir, pathbuf, offsets) = + write_maybe_compressed::(blobs, compression, &ctx).await?; // COMP + + let file = VirtualFile::open(&pathbuf, &ctx).await?; + let file_len = std::fs::metadata(&pathbuf)?.len(); + + // Multiply by two (compressed data might need more space), and add a few bytes for the header + let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16; + let mut buf = BytesMut::with_capacity(reserved_bytes); + + let vectored_blob_reader = VectoredBlobReader::new(&file); + let meta = BlobMeta { + key: Key::MIN, + lsn: Lsn(0), + }; + + for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { + let end = offsets.get(idx + 1).unwrap_or_else(|| &file_len); + if idx + 1 == offsets.len() { + continue; + } + let read_builder = VectoredReadBuilder::new(*offset, *end, meta, None); + let read = read_builder.build(); + let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?; + assert_eq!(result.blobs.len(), 1); + let read_blob = &result.blobs[0]; + let read_buf = &result.buf[read_blob.start..read_blob.end]; + assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}"); + buf = result.buf; + } + Ok(()) + } + + #[tokio::test] + async fn test_really_big_array() -> Result<(), Error> { + let blobs = &[ + b"test".to_vec(), + random_array(10 * PAGE_SZ), + b"hello".to_vec(), + random_array(66 * PAGE_SZ), + vec![0xf3; 24 * PAGE_SZ], + b"foobar".to_vec(), + ]; + round_trip_test_compressed::(blobs, false).await?; + round_trip_test_compressed::(blobs, false).await?; + round_trip_test_compressed::(blobs, true).await?; + round_trip_test_compressed::(blobs, true).await?; + Ok(()) + } + + #[tokio::test] + async fn test_arrays_inc() -> Result<(), Error> { + let blobs = (0..PAGE_SZ / 8) + .map(|v| random_array(v * 16)) + .collect::>(); + round_trip_test_compressed::(&blobs, false).await?; + round_trip_test_compressed::(&blobs, false).await?; + round_trip_test_compressed::(&blobs, true).await?; + round_trip_test_compressed::(&blobs, true).await?; + Ok(()) + } } From 8ca93b281fc433539d255b26b1198493b5cf7f86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 9 Jul 2024 16:17:28 +0200 Subject: [PATCH 3/6] Remove const param --- pageserver/src/tenant/vectored_blob_io.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 8aea43a0af42..3e50d6d4dc28 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -588,13 +588,13 @@ mod tests { } } - async fn round_trip_test_compressed( + async fn round_trip_test_compressed( blobs: &[Vec], compression: bool, ) -> Result<(), Error> { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let (_temp_dir, pathbuf, offsets) = - write_maybe_compressed::(blobs, compression, &ctx).await?; // COMP + write_maybe_compressed::(blobs, compression, &ctx).await?; let file = VirtualFile::open(&pathbuf, &ctx).await?; let file_len = std::fs::metadata(&pathbuf)?.len(); @@ -636,10 +636,8 @@ mod tests { vec![0xf3; 24 * PAGE_SZ], b"foobar".to_vec(), ]; - round_trip_test_compressed::(blobs, false).await?; - round_trip_test_compressed::(blobs, false).await?; - round_trip_test_compressed::(blobs, true).await?; - round_trip_test_compressed::(blobs, true).await?; + round_trip_test_compressed(blobs, false).await?; + round_trip_test_compressed(blobs, true).await?; Ok(()) } @@ -648,10 +646,8 @@ mod tests { let blobs = (0..PAGE_SZ / 8) .map(|v| random_array(v * 16)) .collect::>(); - round_trip_test_compressed::(&blobs, false).await?; - round_trip_test_compressed::(&blobs, false).await?; - round_trip_test_compressed::(&blobs, true).await?; - round_trip_test_compressed::(&blobs, true).await?; + round_trip_test_compressed(&blobs, false).await?; + round_trip_test_compressed(&blobs, true).await?; Ok(()) } } From 57029e5a1ee07f2811248535d0ba1024a1018925 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 11 Jul 2024 20:26:50 +0200 Subject: [PATCH 4/6] fmt --- pageserver/src/tenant/vectored_blob_io.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 3e50d6d4dc28..77081f787c20 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -588,10 +588,7 @@ mod tests { } } - async fn round_trip_test_compressed( - blobs: &[Vec], - compression: bool, - ) -> Result<(), Error> { + async fn round_trip_test_compressed(blobs: &[Vec], compression: bool) -> Result<(), Error> { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let (_temp_dir, pathbuf, offsets) = write_maybe_compressed::(blobs, compression, &ctx).await?; From 708dcc9aa1824e829eeecfafaa1b520dccc1f32d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 11 Jul 2024 22:00:38 +0200 Subject: [PATCH 5/6] clippy --- pageserver/src/tenant/blob_io.rs | 10 +++++----- pageserver/src/tenant/vectored_blob_io.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 1cb9ac28c411..a0cf0414f62c 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -417,28 +417,28 @@ pub(crate) mod tests { // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?; + let file = VirtualFile::create(pathbuf.as_path(), ctx).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { let (_, res) = if compression { wtr.write_blob_maybe_compressed( blob.clone(), - &ctx, + ctx, ImageCompressionAlgorithm::Zstd { level: Some(1) }, ) .await } else { - wtr.write_blob(blob.clone(), &ctx).await + wtr.write_blob(blob.clone(), ctx).await }; let offs = res?; offsets.push(offs); } // Write out one page worth of zeros so that we can // read again with read_blk - let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await; + let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await; let offs = res?; println!("Writing final blob at offs={offs}"); - wtr.flush_buffer(&ctx).await?; + wtr.flush_buffer(ctx).await?; } Ok((temp_dir, pathbuf, offsets)) } diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 77081f787c20..602a115ac985 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -607,7 +607,7 @@ mod tests { }; for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { - let end = offsets.get(idx + 1).unwrap_or_else(|| &file_len); + let end = offsets.get(idx + 1).unwrap_or(&file_len); if idx + 1 == offsets.len() { continue; } From c7361233b76e9e8a5e617e43470d49f2caef58e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 12 Jul 2024 02:26:31 +0200 Subject: [PATCH 6/6] fix --- pageserver/src/tenant/vectored_blob_io.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 50412c602854..cb81f1d76d6a 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -750,7 +750,7 @@ mod tests { if idx + 1 == offsets.len() { continue; } - let read_builder = VectoredReadBuilder::new(*offset, *end, meta, None); + let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096); let read = read_builder.build(); let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?; assert_eq!(result.blobs.len(), 1);