Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement decompression for vectored reads #8302

Merged
merged 7 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ impl<'a> BlockCursor<'a> {
}

/// Reserved bits for length and compression
const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;

/// The maximum size of blobs we support. The highest few bits
/// are reserved for compression and other further uses.
const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;

const BYTE_UNCOMPRESSED: u8 = 0x80;
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;

/// A wrapper of `VirtualFile` that allows users to write blobs.
///
Expand Down Expand Up @@ -390,51 +390,63 @@ impl BlobWriter<false> {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use rand::{Rng, SeedableRng};

async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await
}

async fn round_trip_test_compressed<const BUFFERED: bool>(
pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
blobs: &[Vec<u8>],
compression: bool,
) -> Result<(), Error> {
ctx: &RequestContext,
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);

// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = if compression {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
ctx,
ImageCompressionAlgorithm::Zstd { level: Some(1) },
)
.await
} else {
wtr.write_blob(blob.clone(), &ctx).await
wtr.write_blob(blob.clone(), ctx).await
};
let offs = res?;
offsets.push(offs);
}
// Write out one page worth of zeros so that we can
// read again with read_blk
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await;
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
wtr.flush_buffer(&ctx).await?;
wtr.flush_buffer(ctx).await?;
}
Ok((temp_dir, pathbuf, offsets))
}

async fn round_trip_test_compressed<const BUFFERED: bool>(
blobs: &[Vec<u8>],
compression: bool,
) -> Result<(), Error> {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;

let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let file = VirtualFile::open(pathbuf, &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new_with_compression(rdr, compression);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
Expand All @@ -447,7 +459,7 @@ mod tests {
Ok(())
}

fn random_array(len: usize) -> Vec<u8> {
pub(crate) fn random_array(len: usize) -> Vec<u8> {
let mut rng = rand::thread_rng();
(0..len).map(|_| rng.gen()).collect::<_>()
}
Expand Down
127 changes: 113 additions & 14 deletions pageserver/src/tenant/vectored_blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ use std::num::NonZeroUsize;

use bytes::BytesMut;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;

use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::VirtualFile;

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -301,7 +303,7 @@ impl<'a> VectoredBlobReader<'a> {
read.size(),
buf.capacity()
);
let buf = self
let mut buf = self
.file
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
.await?
Expand All @@ -323,38 +325,68 @@ impl<'a> VectoredBlobReader<'a> {
.chain(std::iter::once(None)),
);

// Some scratch space, put here for reusing the allocation
let mut decompressed_vec = Vec::new();

for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];

// Each blob is prefixed by a header containing it's size.
// Each blob is prefixed by a header containing its size and compression information.
// Extract the size and skip that header to find the start of the data.
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
// 1 byte case and 1 in the 4 byte case.
let (size_length, blob_size) = if first_len_byte < 0x80 {
(1, first_len_byte as u64)
let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
(1, first_len_byte as u64, BYTE_UNCOMPRESSED)
} else {
let mut blob_size_buf = [0u8; 4];
let offset_in_buf = offset_in_buf as usize;

blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= 0x7f;
(4, u32::from_be_bytes(blob_size_buf) as u64)
blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
arpad-m marked this conversation as resolved.
Show resolved Hide resolved

let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
(
4,
u32::from_be_bytes(blob_size_buf) as u64,
compression_bits,
)
};

let start = offset_in_buf + size_length;
let end = match next {
let start_raw = offset_in_buf + size_length;
let end_raw = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
None => start + blob_size,
None => start_raw + blob_size,
};

assert_eq!(end - start, blob_size);
assert_eq!(end_raw - start_raw, blob_size);
let (start, end);
if compression_bits == BYTE_UNCOMPRESSED {
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
start = start_raw as usize;
end = end_raw as usize;
} else if compression_bits == BYTE_ZSTD {
let mut decoder =
async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
decoder
.write_all(&buf[start_raw as usize..end_raw as usize])
.await?;
decoder.flush().await?;
start = buf.len();
buf.extend_from_slice(&decompressed_vec);
end = buf.len();
decompressed_vec.clear();
} else {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid compression byte {compression_bits:x}"),
);
return Err(error);
}

metas.push(VectoredBlob {
start: start as usize,
end: end as usize,
start,
end,
meta: *meta,
})
});
}

Ok(VectoredBlobsBuf { buf, blobs: metas })
Expand Down Expand Up @@ -471,6 +503,13 @@ impl StreamingVectoredReadPlanner {

#[cfg(test)]
mod tests {
use anyhow::Error;

use crate::context::DownloadBehavior;
use crate::page_cache::PAGE_SZ;
use crate::task_mgr::TaskKind;

use super::super::blob_io::tests::{random_array, write_maybe_compressed};
use super::*;

fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
Expand Down Expand Up @@ -687,4 +726,64 @@ mod tests {
);
}
}

async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let (_temp_dir, pathbuf, offsets) =
write_maybe_compressed::<true>(blobs, compression, &ctx).await?;

let file = VirtualFile::open(&pathbuf, &ctx).await?;
let file_len = std::fs::metadata(&pathbuf)?.len();

// Multiply by two (compressed data might need more space), and add a few bytes for the header
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);

let vectored_blob_reader = VectoredBlobReader::new(&file);
let meta = BlobMeta {
key: Key::MIN,
lsn: Lsn(0),
};

for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let end = offsets.get(idx + 1).unwrap_or(&file_len);
if idx + 1 == offsets.len() {
continue;
}
let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
let read = read_builder.build();
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
assert_eq!(result.blobs.len(), 1);
let read_blob = &result.blobs[0];
let read_buf = &result.buf[read_blob.start..read_blob.end];
assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
buf = result.buf;
}
Ok(())
}

#[tokio::test]
async fn test_really_big_array() -> Result<(), Error> {
let blobs = &[
b"test".to_vec(),
random_array(10 * PAGE_SZ),
b"hello".to_vec(),
random_array(66 * PAGE_SZ),
vec![0xf3; 24 * PAGE_SZ],
b"foobar".to_vec(),
];
round_trip_test_compressed(blobs, false).await?;
round_trip_test_compressed(blobs, true).await?;
Ok(())
}

#[tokio::test]
async fn test_arrays_inc() -> Result<(), Error> {
let blobs = (0..PAGE_SZ / 8)
.map(|v| random_array(v * 16))
.collect::<Vec<_>>();
round_trip_test_compressed(&blobs, false).await?;
round_trip_test_compressed(&blobs, true).await?;
Ok(())
}
}
Loading