Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement decompression for vectored reads #8302

Merged
merged 7 commits into from
Jul 12, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
@@ -137,14 +137,14 @@ impl<'a> BlockCursor<'a> {
}

/// Reserved bits for length and compression
const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;

/// The maximum size of blobs we support. The highest few bits
/// are reserved for compression and other further uses.
const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;

const BYTE_UNCOMPRESSED: u8 = 0x80;
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;

/// A wrapper of `VirtualFile` that allows users to write blobs.
///
@@ -395,22 +395,24 @@ impl BlobWriter<false> {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use rand::{Rng, SeedableRng};

async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await
}

async fn round_trip_test_compressed<const BUFFERED: bool>(
pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
blobs: &[Vec<u8>],
compression: bool,
) -> Result<(), Error> {
ctx: &RequestContext,
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);

// Write part (in block to drop the file)
let mut offsets = Vec::new();
@@ -438,8 +440,18 @@ mod tests {
println!("Writing final blob at offs={offs}");
wtr.flush_buffer(&ctx).await?;
}
Ok((temp_dir, pathbuf, offsets))
}

async fn round_trip_test_compressed<const BUFFERED: bool>(
blobs: &[Vec<u8>],
compression: bool,
) -> Result<(), Error> {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;

let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let file = VirtualFile::open(pathbuf, &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new_with_compression(rdr, compression);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
@@ -452,7 +464,7 @@ mod tests {
Ok(())
}

fn random_array(len: usize) -> Vec<u8> {
pub(crate) fn random_array(len: usize) -> Vec<u8> {
let mut rng = rand::thread_rng();
(0..len).map(|_| rng.gen()).collect::<_>()
}
130 changes: 116 additions & 14 deletions pageserver/src/tenant/vectored_blob_io.rs
Original file line number Diff line number Diff line change
@@ -20,11 +20,13 @@ use std::num::NonZeroUsize;

use bytes::BytesMut;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;

use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::VirtualFile;

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@@ -315,7 +317,7 @@ impl<'a> VectoredBlobReader<'a> {
read.size(),
buf.capacity()
);
let buf = self
let mut buf = self
.file
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
.await?
@@ -337,38 +339,68 @@ impl<'a> VectoredBlobReader<'a> {
.chain(std::iter::once(None)),
);

// Some scratch space, put here for reusing the allocation
let mut decompressed_vec = Vec::new();

for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];

// Each blob is prefixed by a header containing it's size.
// Each blob is prefixed by a header containing its size and compression information.
// Extract the size and skip that header to find the start of the data.
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
// 1 byte case and 1 in the 4 byte case.
let (size_length, blob_size) = if first_len_byte < 0x80 {
(1, first_len_byte as u64)
let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
(1, first_len_byte as u64, BYTE_UNCOMPRESSED)
} else {
let mut blob_size_buf = [0u8; 4];
let offset_in_buf = offset_in_buf as usize;

blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= 0x7f;
(4, u32::from_be_bytes(blob_size_buf) as u64)
blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
arpad-m marked this conversation as resolved.
Show resolved Hide resolved

let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
(
4,
u32::from_be_bytes(blob_size_buf) as u64,
compression_bits,
)
};

let start = offset_in_buf + size_length;
let end = match next {
let start_raw = offset_in_buf + size_length;
let end_raw = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
None => start + blob_size,
None => start_raw + blob_size,
};

assert_eq!(end - start, blob_size);
assert_eq!(end_raw - start_raw, blob_size);
let (start, end);
if compression_bits == BYTE_UNCOMPRESSED {
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
start = start_raw as usize;
end = end_raw as usize;
} else if compression_bits == BYTE_ZSTD {
let mut decoder =
async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
decoder
.write_all(&buf[start_raw as usize..end_raw as usize])
.await?;
decoder.flush().await?;
start = buf.len();
buf.extend_from_slice(&decompressed_vec);
end = buf.len();
decompressed_vec.clear();
} else {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid compression byte {compression_bits:x}"),
);
return Err(error);
}

metas.push(VectoredBlob {
start: start as usize,
end: end as usize,
start,
end,
meta: *meta,
})
});
}

Ok(VectoredBlobsBuf { buf, blobs: metas })
@@ -458,6 +490,13 @@ impl StreamingVectoredReadPlanner {

#[cfg(test)]
mod tests {
use anyhow::Error;

use crate::context::DownloadBehavior;
use crate::page_cache::PAGE_SZ;
use crate::task_mgr::TaskKind;

use super::super::blob_io::tests::{random_array, write_maybe_compressed};
use super::*;

fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
@@ -548,4 +587,67 @@ mod tests {
validate_read(read, ranges[idx]);
}
}

async fn round_trip_test_compressed(
blobs: &[Vec<u8>],
compression: bool,
) -> Result<(), Error> {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed::<true>(blobs, compression, &ctx).await?;

let file = VirtualFile::open(&pathbuf, &ctx).await?;
let file_len = std::fs::metadata(&pathbuf)?.len();

// Multiply by two (compressed data might need more space), and add a few bytes for the header
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);

let vectored_blob_reader = VectoredBlobReader::new(&file);
let meta = BlobMeta {
key: Key::MIN,
lsn: Lsn(0),
};

for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let end = offsets.get(idx + 1).unwrap_or_else(|| &file_len);
if idx + 1 == offsets.len() {
continue;
}
let read_builder = VectoredReadBuilder::new(*offset, *end, meta, None);
let read = read_builder.build();
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
assert_eq!(result.blobs.len(), 1);
let read_blob = &result.blobs[0];
let read_buf = &result.buf[read_blob.start..read_blob.end];
assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
buf = result.buf;
}
Ok(())
}

#[tokio::test]
async fn test_really_big_array() -> Result<(), Error> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
b"hello".to_vec(),
random_array(66 * PAGE_SZ),
vec![0xf3; 24 * PAGE_SZ],
b"foobar".to_vec(),
];
round_trip_test_compressed(blobs, false).await?;
round_trip_test_compressed(blobs, true).await?;
Ok(())
}

#[tokio::test]
async fn test_arrays_inc() -> Result<(), Error> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();
round_trip_test_compressed(&blobs, false).await?;
round_trip_test_compressed(&blobs, true).await?;
Ok(())
}
}