Skip to content

Commit

Permalink
Only support compressed reads if the compression setting is present (#…
Browse files Browse the repository at this point in the history
…8238)

PR #8106 was created with the assumption that no blob is larger than
`256 MiB`. Due to #7852 we have checking for *writes* of blobs larger
than that limit, but we didn't have checking for *reads* of such large
blobs: in theory, we could be reading these blobs every day but we just
don't happen to write the blobs for some reason.

Therefore, we now add a warning for *reads* of such large blobs as well.

To make deploying compression less dangerous, we therefore only assume a
blob is compressed if the compression setting is present in the config.
This also means that we can't back out of compression once we enabled
it.

Part of #5431
  • Loading branch information
arpad-m authored and VladLazar committed Jul 8, 2024
1 parent f6c830e commit 44a3800
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 30 deletions.
45 changes: 27 additions & 18 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tracing::warn;

use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
Expand Down Expand Up @@ -72,14 +73,22 @@ impl<'a> BlockCursor<'a> {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
len_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
let bit_mask = if self.read_compressed {
!LEN_COMPRESSION_BIT_MASK
} else {
0x7f
};
len_buf[0] &= bit_mask;
u32::from_be_bytes(len_buf) as usize
};
let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;

let mut tmp_buf = Vec::new();
let buf_to_write;
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
if compression_bits > BYTE_UNCOMPRESSED {
warn!("reading key above future limit ({len} bytes)");
}
buf_to_write = dstbuf;
None
} else if compression_bits == BYTE_ZSTD {
Expand Down Expand Up @@ -384,10 +393,10 @@ mod tests {
use rand::{Rng, SeedableRng};

async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED, 0>(blobs).await
round_trip_test_compressed::<BUFFERED, false>(blobs).await
}

async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: bool>(
blobs: &[Vec<u8>],
) -> Result<(), Error> {
let temp_dir = camino_tempfile::tempdir()?;
Expand All @@ -400,17 +409,15 @@ mod tests {
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = match COMPRESSION {
0 => wtr.write_blob(blob.clone(), &ctx).await,
1 => {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
)
.await
}
_ => unreachable!("Invalid compression {COMPRESSION}"),
let (_, res) = if COMPRESSION {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
)
.await
} else {
wtr.write_blob(blob.clone(), &ctx).await
};
let offs = res?;
offsets.push(offs);
Expand All @@ -425,7 +432,7 @@ mod tests {

let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
let rdr = BlockCursor::new_with_compression(rdr, COMPRESSION);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let blob_read = rdr.read_blob(*offset, &ctx).await?;
assert_eq!(
Expand Down Expand Up @@ -459,6 +466,8 @@ mod tests {
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, true>(blobs).await?;
round_trip_test_compressed::<true, true>(blobs).await?;
Ok(())
}

Expand All @@ -474,8 +483,8 @@ mod tests {
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, 1>(blobs).await?;
round_trip_test_compressed::<true, 1>(blobs).await?;
round_trip_test_compressed::<false, true>(blobs).await?;
round_trip_test_compressed::<true, true>(blobs).await?;
Ok(())
}

Expand Down
31 changes: 28 additions & 3 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,24 @@ impl<'a> BlockReaderRef<'a> {
/// ```
///
pub struct BlockCursor<'a> {
pub(super) read_compressed: bool,
reader: BlockReaderRef<'a>,
}

impl<'a> BlockCursor<'a> {
pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
BlockCursor { reader }
Self::new_with_compression(reader, false)
}
pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
BlockCursor {
read_compressed,
reader,
}
}
// Needed by cli
pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
BlockCursor {
read_compressed: false,
reader: BlockReaderRef::FileBlockReader(reader),
}
}
Expand Down Expand Up @@ -188,11 +196,25 @@ pub struct FileBlockReader<'a> {

/// Unique ID of this file, used as key in the page cache.
file_id: page_cache::FileId,

compressed_reads: bool,
}

impl<'a> FileBlockReader<'a> {
pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
FileBlockReader { file_id, file }
Self::new_with_compression(file, file_id, false)
}

pub fn new_with_compression(
file: &'a VirtualFile,
file_id: FileId,
compressed_reads: bool,
) -> Self {
FileBlockReader {
file_id,
file,
compressed_reads,
}
}

/// Read a page from the underlying file into given buffer.
Expand Down Expand Up @@ -239,7 +261,10 @@ impl<'a> FileBlockReader<'a> {

impl BlockReader for FileBlockReader<'_> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReader(self))
BlockCursor::new_with_compression(
BlockReaderRef::FileBlockReader(self),
self.compressed_reads,
)
}
}

Expand Down
28 changes: 19 additions & 9 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ pub struct ImageLayerInner {
file_id: FileId,

max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
compressed_reads: bool,
}

impl std::fmt::Debug for ImageLayerInner {
Expand All @@ -178,7 +179,8 @@ impl std::fmt::Debug for ImageLayerInner {

impl ImageLayerInner {
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
Expand Down Expand Up @@ -266,9 +268,10 @@ impl ImageLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();

let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
.await
.and_then(|res| res)?;
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, false, ctx)
.await
.and_then(|res| res)?;

// not production code
let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
Expand Down Expand Up @@ -377,6 +380,7 @@ impl ImageLayerInner {
lsn: Lsn,
summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
support_compressed_reads: bool,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
Expand Down Expand Up @@ -420,6 +424,7 @@ impl ImageLayerInner {
file,
file_id,
max_vectored_read_bytes,
compressed_reads: support_compressed_reads,
key_range: actual_summary.key_range,
}))
}
Expand All @@ -430,7 +435,8 @@ impl ImageLayerInner {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);

Expand Down Expand Up @@ -490,12 +496,14 @@ impl ImageLayerInner {
&self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
let mut result = Vec::new();
let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let cursor = block_reader.block_cursor();
while let Some(item) = stream.next().await {
// TODO: dedup code with get_reconstruct_value
Expand Down Expand Up @@ -530,7 +538,8 @@ impl ImageLayerInner {
.into(),
);

let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);

Expand Down Expand Up @@ -691,7 +700,8 @@ impl ImageLayerInner {

#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
ImageLayerIterator {
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,7 @@ impl DownloadedLayer {
lsn,
summary,
Some(owner.conf.max_vectored_read_bytes),
owner.conf.image_compression.is_some(),
ctx,
)
.await
Expand Down

0 comments on commit 44a3800

Please sign in to comment.