Skip to content

Commit

Permalink
Flatten compression algorithm setting (#8265)
Browse files Browse the repository at this point in the history
This flattens the compression algorithm setting, removing the
`Option<_>` wrapping layer and making handling of the setting easier.

It also adds a specific setting for *disabled* compression with the
continued ability to read copmressed data, giving us the option to
more easily back out of a compression rollout, should the need arise,
which was one of the limitations of #8238.

Implements my suggestion from
#8238 (comment) ,
inspired by Christian's review in
#8238 (review) .

Part of #5431
  • Loading branch information
arpad-m authored and VladLazar committed Jul 8, 2024
1 parent 32828cd commit e770aee
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 14 deletions.
15 changes: 14 additions & 1 deletion libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,22 @@ pub enum CompactionAlgorithm {
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
/// Disabled for writes, and never decompress during reading.
/// Never set this after you've enabled compression once!
DisabledNoDecompress,
// Disabled for writes, support decompressing during read path
Disabled,
/// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well.
/// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html).
Zstd { level: Option<i8> },
Zstd {
level: Option<i8>,
},
}

impl ImageCompressionAlgorithm {
pub fn allow_decompression(&self) -> bool {
!matches!(self, ImageCompressionAlgorithm::DisabledNoDecompress)
}
}

#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
Expand Down
11 changes: 6 additions & 5 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ pub mod defaults {

pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB

pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
ImageCompressionAlgorithm::DisabledNoDecompress;

pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;

Expand Down Expand Up @@ -288,7 +289,7 @@ pub struct PageServerConf {

pub validate_vectored_get: bool,

pub image_compression: Option<ImageCompressionAlgorithm>,
pub image_compression: ImageCompressionAlgorithm,

/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
Expand Down Expand Up @@ -402,7 +403,7 @@ struct PageServerConfigBuilder {

validate_vectored_get: BuilderValue<bool>,

image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,
image_compression: BuilderValue<ImageCompressionAlgorithm>,

ephemeral_bytes_per_memory_kb: BuilderValue<usize>,

Expand Down Expand Up @@ -680,7 +681,7 @@ impl PageServerConfigBuilder {
self.validate_vectored_get = BuilderValue::Set(value);
}

pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
pub fn get_image_compression(&mut self, value: ImageCompressionAlgorithm) {
self.image_compression = BuilderValue::Set(value);
}

Expand Down Expand Up @@ -1028,7 +1029,7 @@ impl PageServerConf {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
"image_compression" => {
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
builder.get_image_compression(parse_toml_from_str("image_compression", item)?)
}
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
Expand Down
18 changes: 13 additions & 5 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
self.write_blob_maybe_compressed(srcbuf, ctx, None).await
self.write_blob_maybe_compressed(
srcbuf,
ctx,
ImageCompressionAlgorithm::DisabledNoDecompress,
)
.await
}

/// Write a blob of data. Returns the offset that it was written to,
Expand All @@ -282,7 +287,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
srcbuf: B,
ctx: &RequestContext,
algorithm: Option<ImageCompressionAlgorithm>,
algorithm: ImageCompressionAlgorithm,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;

Expand Down Expand Up @@ -314,7 +319,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
);
}
let (high_bit_mask, len_written, srcbuf) = match algorithm {
Some(ImageCompressionAlgorithm::Zstd { level }) => {
ImageCompressionAlgorithm::Zstd { level } => {
let mut encoder = if let Some(level) = level {
async_compression::tokio::write::ZstdEncoder::with_quality(
Vec::new(),
Expand All @@ -335,7 +340,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
(BYTE_UNCOMPRESSED, len, slice.into_inner())
}
}
None => (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()),
ImageCompressionAlgorithm::Disabled
| ImageCompressionAlgorithm::DisabledNoDecompress => {
(BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner())
}
};
let mut len_buf = (len_written as u32).to_be_bytes();
assert_eq!(len_buf[0] & 0xf0, 0);
Expand Down Expand Up @@ -414,7 +422,7 @@ mod tests {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
ImageCompressionAlgorithm::Zstd { level: Some(1) },
)
.await
} else {
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -453,7 +453,7 @@ impl DeltaLayerWriterInner {
) -> (Vec<u8>, anyhow::Result<()>) {
assert!(self.lsn_range.start <= lsn);
// We don't want to use compression in delta layer creation
let compression = None;
let compression = ImageCompressionAlgorithm::DisabledNoDecompress;
let (val, res) = self
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1685,7 +1685,7 @@ impl DownloadedLayer {
lsn,
summary,
Some(owner.conf.max_vectored_read_bytes),
owner.conf.image_compression.is_some(),
owner.conf.image_compression.allow_decompression(),
ctx,
)
.await
Expand Down

0 comments on commit e770aee

Please sign in to comment.