From 8745c0d6f22f7a1b91ed5e6141c8653b3955c4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 24 May 2024 03:00:06 +0200 Subject: [PATCH 01/36] Add a pagectl tool to recompress image layers --- Cargo.lock | 10 +++ Cargo.toml | 1 + libs/pageserver_api/src/models.rs | 18 ++++ pageserver/Cargo.toml | 1 + pageserver/ctl/src/layers.rs | 21 +++++ pageserver/src/config.rs | 19 +++- pageserver/src/tenant/blob_io.rs | 84 +++++++++++++---- .../src/tenant/storage_layer/image_layer.rs | 90 ++++++++++++++++++- 8 files changed, 226 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c8a8b0c0fff..9403044b3a76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2946,6 +2946,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -3612,6 +3621,7 @@ dependencies = [ "hyper 0.14.26", "itertools", "leaky-bucket", + "lz4_flex", "md5", "metrics", "nix 0.27.1", diff --git a/Cargo.toml b/Cargo.toml index 8fddaaef12dd..830bd7cb800c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ jsonwebtoken = "9" lasso = "0.7" leaky-bucket = "1.0.1" libc = "0.2" +lz4_flex = "0.11" md5 = "0.7.0" measured = { version = "0.0.21", features=["lasso"] } measured-process = { version = "0.0.21" } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9311dab33cd0..318dd307d7f4 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -455,6 +455,24 @@ pub enum CompactionAlgorithm { Tiered, } +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Serialize, + Deserialize, + strum_macros::FromRepr, + strum_macros::EnumString, + enum_map::Enum, +)] +#[strum(serialize_all = "kebab-case")] +pub enum ImageCompressionAlgorithm { + Zstd, + LZ4, +} + #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub struct CompactionAlgorithmSettings { pub kind: CompactionAlgorithm, diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 4335f38f1e7f..2c0dede89c72 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true leaky-bucket.workspace = true +lz4_flex.workspace = true md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 3611b0baab2c..340718ef1a06 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -55,6 +55,10 @@ pub(crate) enum LayerCmd { #[clap(long)] new_timeline_id: Option, }, + Compress { + dest_path: Utf8PathBuf, + layer_file_path: Utf8PathBuf, + }, } async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { @@ -240,5 +244,22 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { anyhow::bail!("not an image or delta layer: {layer_file_path}"); } + LayerCmd::Compress { + dest_path, + layer_file_path, + } => { + pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::page_cache::init(100); + + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + + let stats = + ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?; + println!( + "Statistics: {stats:#?}\n{}", + serde_json::to_string(&stats).unwrap() + ); + return Ok(()); + } } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index b4a0d1ac0235..a7da4f55be24 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -5,7 +5,7 @@ //! See also `settings.md` for better description on every parameter. use anyhow::{anyhow, bail, ensure, Context, Result}; -use pageserver_api::shard::TenantShardId; +use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId}; use remote_storage::{RemotePath, RemoteStorageConfig}; use serde; use serde::de::IntoDeserializer; @@ -55,6 +55,7 @@ pub mod defaults { DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_PG_LISTEN_PORT, }; + use pageserver_api::models::ImageCompressionAlgorithm; pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT; pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s"; @@ -95,6 +96,8 @@ pub mod defaults { pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB + pub const DEFAULT_IMAGE_COMPRESSION: Option = None; + pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true; pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; @@ -290,6 +293,8 @@ pub struct PageServerConf { pub validate_vectored_get: bool, + pub image_compression: Option, + /// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this /// is exceeded, we start proactively closing ephemeral layers to limit the total amount /// of ephemeral data. @@ -400,6 +405,8 @@ struct PageServerConfigBuilder { validate_vectored_get: BuilderValue, + image_compression: BuilderValue>, + ephemeral_bytes_per_memory_kb: BuilderValue, } @@ -487,6 +494,7 @@ impl PageServerConfigBuilder { max_vectored_read_bytes: Set(MaxVectoredReadBytes( NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(), )), + image_compression: Set(DEFAULT_IMAGE_COMPRESSION), validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET), ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), } @@ -672,6 +680,10 @@ impl PageServerConfigBuilder { self.validate_vectored_get = BuilderValue::Set(value); } + pub fn get_image_compression(&mut self, value: Option) { + self.image_compression = BuilderValue::Set(value); + } + pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) { self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value); } @@ -732,6 +744,7 @@ impl PageServerConfigBuilder { get_impl, max_vectored_read_bytes, validate_vectored_get, + image_compression, ephemeral_bytes_per_memory_kb, } CUSTOM LOGIC @@ -1026,6 +1039,9 @@ impl PageServerConf { "validate_vectored_get" => { builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?) } + "image_compression" => { + builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?)) + } "ephemeral_bytes_per_memory_kb" => { builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize) } @@ -1110,6 +1126,7 @@ impl PageServerConf { NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES) .expect("Invalid default constant"), ), + image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 2be8816cefbd..919b1a516274 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -12,6 +12,8 @@ //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use bytes::{BufMut, BytesMut}; +use pageserver_api::models::ImageCompressionAlgorithm; +use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use crate::context::RequestContext; @@ -219,6 +221,17 @@ impl BlobWriter { &mut self, srcbuf: B, ctx: &RequestContext, + ) -> (B::Buf, Result) { + self.write_blob_compressed(srcbuf, ctx, None).await + } + + /// Write a blob of data. Returns the offset that it was written to, + /// which can be used to retrieve the data later. + pub async fn write_blob_compressed, Buf: IoBuf + Send>( + &mut self, + srcbuf: B, + ctx: &RequestContext, + algorithm: Option, ) -> (B::Buf, Result) { let offset = self.offset; @@ -226,29 +239,65 @@ impl BlobWriter { let mut io_buf = self.io_buf.take().expect("we always put it back below"); io_buf.clear(); - let (io_buf, hdr_res) = async { + let mut compressed_buf = None; + let ((io_buf, hdr_res), srcbuf) = async { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - self.write_all(io_buf, ctx).await + ( + self.write_all(io_buf, ctx).await, + srcbuf.slice(..).into_inner(), + ) } else { // Write a 4-byte length header - if len > 0x7fff_ffff { + if len > 0x0fff_ffff { return ( - io_buf, - Err(Error::new( - ErrorKind::Other, - format!("blob too large ({len} bytes)"), - )), + ( + io_buf, + Err(Error::new( + ErrorKind::Other, + format!("blob too large ({len} bytes)"), + )), + ), + srcbuf.slice(..).into_inner(), ); } - if len > 0x0fff_ffff { - tracing::warn!("writing blob above future limit ({len} bytes)"); - } - let mut len_buf = (len as u32).to_be_bytes(); - len_buf[0] |= 0x80; + const UNCOMPRESSED: u8 = 0x80; + const ZSTD: u8 = UNCOMPRESSED | 0x10; + const LZ4: u8 = UNCOMPRESSED | 0x20; + let (high_bit_mask, len_written, srcbuf) = match algorithm { + Some(ImageCompressionAlgorithm::Zstd) => { + let mut encoder = + async_compression::tokio::write::ZstdEncoder::new(Vec::new()); + let slice = srcbuf.slice(..); + encoder.write_all(&slice[..]).await.unwrap(); + encoder.flush().await.unwrap(); + let compressed = encoder.into_inner(); + if compressed.len() < len { + let compressed_len = len; + compressed_buf = Some(compressed); + (ZSTD, compressed_len, slice.into_inner()) + } else { + (0x80, len, slice.into_inner()) + } + } + Some(ImageCompressionAlgorithm::LZ4) => { + let slice = srcbuf.slice(..); + let compressed = lz4_flex::block::compress(&slice[..]); + if compressed.len() < len { + let compressed_len = len; + compressed_buf = Some(compressed); + (LZ4, compressed_len, slice.into_inner()) + } else { + (0x80, len, slice.into_inner()) + } + } + None => (0x80, len, srcbuf.slice(..).into_inner()), + }; + let mut len_buf = (len_written as u32).to_be_bytes(); + len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - self.write_all(io_buf, ctx).await + (self.write_all(io_buf, ctx).await, srcbuf) } } .await; @@ -257,7 +306,12 @@ impl BlobWriter { Ok(_) => (), Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), } - let (srcbuf, res) = self.write_all(srcbuf, ctx).await; + let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf { + let (_buf, res) = self.write_all(compressed_buf, ctx).await; + (Slice::into_inner(srcbuf.slice(..)), res) + } else { + self.write_all(srcbuf, ctx).await + }; (srcbuf, res.map(|_| offset)) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 06e2f0938437..193012d8fff8 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -46,7 +46,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; use pageserver_api::keyspace::KeySpace; -use pageserver_api::models::LayerAccessKind; +use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind}; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; @@ -366,6 +366,83 @@ impl ImageLayer { res?; Ok(()) } + pub async fn compression_statistics( + dest_repo_path: &Utf8Path, + path: &Utf8Path, + ctx: &RequestContext, + ) -> anyhow::Result, u64)>> { + fn make_conf( + image_compression: Option, + dest_repo_path: &Utf8Path, + ) -> &'static PageServerConf { + let mut conf = PageServerConf::dummy_conf(dest_repo_path.to_owned()); + conf.image_compression = image_compression; + Box::leak(Box::new(conf)) + } + let image_compressions = [ + None, + Some(ImageCompressionAlgorithm::Zstd), + Some(ImageCompressionAlgorithm::LZ4), + ]; + let mut stats = Vec::new(); + for image_compression in image_compressions { + let size = Self::compressed_size_for_conf( + path, + ctx, + make_conf(image_compression, dest_repo_path), + ) + .await?; + stats.push((image_compression, size)); + } + Ok(stats) + } + + async fn compressed_size_for_conf( + path: &Utf8Path, + ctx: &RequestContext, + conf: &'static PageServerConf, + ) -> anyhow::Result { + let file = + VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx) + .await + .with_context(|| format!("Failed to open file '{}'", path))?; + + let file_id = page_cache::next_file_id(); + let block_reader = FileBlockReader::new(&file, file_id); + let summary_blk = block_reader.read_blk(0, ctx).await?; + let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; + if summary.magic != IMAGE_FILE_MAGIC { + anyhow::bail!("magic file mismatch"); + } + + let tree_reader = DiskBtreeReader::new( + summary.index_start_blk, + summary.index_root_blk, + &block_reader, + ); + + let mut key_offset_stream = + std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx)); + + let mut writer = ImageLayerWriter::new( + conf, + summary.timeline_id, + TenantShardId::unsharded(summary.tenant_id), + &summary.key_range, + summary.lsn, + ctx, + ) + .await?; + + let cursor = block_reader.block_cursor(); + while let Some(r) = key_offset_stream.next().await { + let (key, offset) = r?; + let key = Key::from_slice(&key); + let content = cursor.read_blob(offset, ctx).await?; + writer.put_image(key, content.into(), ctx).await?; + } + Ok(writer.size()) + } } impl ImageLayerInner { @@ -782,7 +859,10 @@ impl ImageLayerWriterInner { ctx: &RequestContext, ) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let (_img, res) = self.blob_writer.write_blob(img, ctx).await; + let (_img, res) = self + .blob_writer + .write_blob_compressed(img, ctx, self.conf.image_compression) + .await; // TODO: re-use the buffer for `img` further upstack let off = res?; @@ -923,6 +1003,12 @@ impl ImageLayerWriter { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } + /// Obtains the current size of the file + pub(crate) fn size(&self) -> u64 { + let inner = self.inner.as_ref().unwrap(); + inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 + } + /// /// Finish writing the image layer. /// From 2d37db234ac73a05099edd20e96c6492ab8a4063 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 4 Jun 2024 15:53:15 +0200 Subject: [PATCH 02/36] Add mode to compare multiple files from a tenant --- pageserver/ctl/src/layers.rs | 81 ++++++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 8 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 340718ef1a06..1ca396cbbe36 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -1,4 +1,6 @@ +use std::num::NonZeroU32; use std::path::{Path, PathBuf}; +use std::str::FromStr; use anyhow::Result; use camino::{Utf8Path, Utf8PathBuf}; @@ -8,7 +10,7 @@ use pageserver::task_mgr::TaskKind; use pageserver::tenant::block_io::BlockCursor; use pageserver::tenant::disk_btree::DiskBtreeReader; use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary}; -use pageserver::tenant::storage_layer::{delta_layer, image_layer}; +use pageserver::tenant::storage_layer::{delta_layer, image_layer, LayerName}; use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer}; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use pageserver::{page_cache, virtual_file}; @@ -20,7 +22,9 @@ use pageserver::{ }, virtual_file::VirtualFile, }; +use remote_storage::{ListingMode, RemotePath, RemoteStorageConfig}; use std::fs; +use tokio_util::sync::CancellationToken; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; @@ -57,7 +61,10 @@ pub(crate) enum LayerCmd { }, Compress { dest_path: Utf8PathBuf, - layer_file_path: Utf8PathBuf, + layer_file_path: Option, + tenant_remote_prefix: Option, + tenant_remote_config: Option, + layers_dir: Option, }, } @@ -247,18 +254,76 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { LayerCmd::Compress { dest_path, layer_file_path, + tenant_remote_prefix, + tenant_remote_config, + layers_dir, } => { pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); - let stats = - ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?; - println!( - "Statistics: {stats:#?}\n{}", - serde_json::to_string(&stats).unwrap() - ); + if let Some(layer_file_path) = layer_file_path { + let stats = + ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?; + println!( + "Statistics: {stats:#?}\n{}", + serde_json::to_string(&stats).unwrap() + ); + } else if let ( + Some(tenant_remote_prefix), + Some(tenant_remote_config), + Some(layers_dir), + ) = (tenant_remote_prefix, tenant_remote_config, layers_dir) + { + let toml_document = toml_edit::Document::from_str(&tenant_remote_config)?; + let toml_item = toml_document + .get("remote_storage") + .expect("need remote_storage"); + let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config"); + let storage = remote_storage::GenericRemoteStorage::from_config(&config)?; + + let cancel = CancellationToken::new(); + let path = RemotePath::from_string(&tenant_remote_prefix)?; + let max_files = NonZeroU32::new(8000); + let files_list = storage + .list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel) + .await?; + + for file_key in files_list.keys.iter() { + let Some(file_name) = file_key.object_name() else { + continue; + }; + let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name) + else { + // Skipping because it's either not a layer or a delta layer + continue; + }; + let json_file_path = layers_dir.join(format!("{file_name}.json")); + if tokio::fs::try_exists(&json_file_path).await? { + // If we have already created a report for the layer, skip it. + continue; + } + let local_layer_path = layers_dir.join(file_name); + let download = storage.download(file_key, &cancel).await?; + let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?; + let mut body = tokio_util::io::StreamReader::new(download.download_stream); + let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; + let stats = ImageLayer::compression_statistics( + &local_layer_path, + &local_layer_path, + &ctx, + ) + .await?; + + let stats_str = serde_json::to_string(&stats).unwrap(); + tokio::fs::write(json_file_path, stats_str).await?; + println!("Statistics for {file_name}: {stats:#?}\n"); + } + } else { + anyhow::bail!("No tenant dir or remote config or layers dir specified"); + } + return Ok(()); } } From f5baac25790c1a816b5e43f5acad7e29e7637afe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 4 Jun 2024 16:27:14 +0200 Subject: [PATCH 03/36] clippy --- pageserver/ctl/src/layers.rs | 75 +++++++++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 18 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 1ca396cbbe36..8f5997581ec3 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -1,6 +1,7 @@ use std::num::NonZeroU32; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::Arc; use anyhow::Result; use camino::{Utf8Path, Utf8PathBuf}; @@ -22,8 +23,11 @@ use pageserver::{ }, virtual_file::VirtualFile, }; -use remote_storage::{ListingMode, RemotePath, RemoteStorageConfig}; +use pageserver_api::models::ImageCompressionAlgorithm; +use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig}; use std::fs; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; @@ -65,6 +69,7 @@ pub(crate) enum LayerCmd { tenant_remote_prefix: Option, tenant_remote_config: Option, layers_dir: Option, + parallelism: Option, }, } @@ -257,6 +262,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { tenant_remote_prefix, tenant_remote_config, layers_dir, + parallelism, } => { pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); pageserver::page_cache::init(100); @@ -276,20 +282,24 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { Some(layers_dir), ) = (tenant_remote_prefix, tenant_remote_config, layers_dir) { - let toml_document = toml_edit::Document::from_str(&tenant_remote_config)?; + let toml_document = toml_edit::Document::from_str(tenant_remote_config)?; let toml_item = toml_document .get("remote_storage") .expect("need remote_storage"); let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config"); let storage = remote_storage::GenericRemoteStorage::from_config(&config)?; + let storage = Arc::new(storage); let cancel = CancellationToken::new(); - let path = RemotePath::from_string(&tenant_remote_prefix)?; - let max_files = NonZeroU32::new(8000); + let path = RemotePath::from_string(tenant_remote_prefix)?; + let max_files = NonZeroU32::new(16000); let files_list = storage .list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel) .await?; + let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize)); + + let mut tasks = JoinSet::new(); for file_key in files_list.keys.iter() { let Some(file_name) = file_key.object_name() else { continue; @@ -305,26 +315,55 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { continue; } let local_layer_path = layers_dir.join(file_name); - let download = storage.download(file_key, &cancel).await?; - let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?; - let mut body = tokio_util::io::StreamReader::new(download.download_stream); - let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; - let stats = ImageLayer::compression_statistics( - &local_layer_path, - &local_layer_path, - &ctx, - ) - .await?; + async fn stats( + semaphore: Arc, + local_layer_path: Utf8PathBuf, + json_file_path: Utf8PathBuf, + dest_path: Utf8PathBuf, + storage: Arc, + file_key: RemotePath, + ) -> Result, u64)>, anyhow::Error> + { + let _permit = semaphore.acquire().await?; + let cancel = CancellationToken::new(); + let download = storage.download(&file_key, &cancel).await?; + let mut dest_layer_file = + tokio::fs::File::create(&local_layer_path).await?; + let mut body = tokio_util::io::StreamReader::new(download.download_stream); + let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + let stats = + ImageLayer::compression_statistics(&dest_path, &local_layer_path, &ctx) + .await?; - let stats_str = serde_json::to_string(&stats).unwrap(); - tokio::fs::write(json_file_path, stats_str).await?; - println!("Statistics for {file_name}: {stats:#?}\n"); + let stats_str = serde_json::to_string(&stats).unwrap(); + tokio::fs::write(json_file_path, stats_str).await?; + Ok(stats) + } + let semaphore = semaphore.clone(); + let file_key = file_key.to_owned(); + let storage = storage.clone(); + let dest_path = dest_path.to_owned(); + let file_name = file_name.to_owned(); + tasks.spawn(async move { + let stats = stats( + semaphore, + local_layer_path.to_owned(), + json_file_path.to_owned(), + dest_path, + storage, + file_key, + ) + .await; + println!("Statistics for {file_name}: {stats:#?}\n"); + }); } + while let Some(_res) = tasks.join_next().await {} } else { anyhow::bail!("No tenant dir or remote config or layers dir specified"); } - return Ok(()); + Ok(()) } } } From 9850794250a93e6cb87596e8641a76517b053374 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:06:05 +0200 Subject: [PATCH 04/36] Remote layer file after done --- pageserver/ctl/src/layers.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 8f5997581ec3..99f7e53528ca 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -338,6 +338,8 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let stats_str = serde_json::to_string(&stats).unwrap(); tokio::fs::write(json_file_path, stats_str).await?; + + tokio::fs::remove_file(&local_layer_path).await?; Ok(stats) } let semaphore = semaphore.clone(); From 554a6bd4a699f3cb613a039330ff0595c71386f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:11:02 +0200 Subject: [PATCH 05/36] Two separate commands More easy to have an overview --- pageserver/ctl/src/layers.rs | 191 +++++++++++++++++------------------ 1 file changed, 95 insertions(+), 96 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 99f7e53528ca..c386d4cca4ec 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -63,12 +63,15 @@ pub(crate) enum LayerCmd { #[clap(long)] new_timeline_id: Option, }, - Compress { + CompressOne { dest_path: Utf8PathBuf, - layer_file_path: Option, - tenant_remote_prefix: Option, - tenant_remote_config: Option, - layers_dir: Option, + layer_file_path: Utf8PathBuf, + }, + CompressMany { + dest_path: Utf8PathBuf, + tenant_remote_prefix: String, + tenant_remote_config: String, + layers_dir: Utf8PathBuf, parallelism: Option, }, } @@ -256,9 +259,25 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { anyhow::bail!("not an image or delta layer: {layer_file_path}"); } - LayerCmd::Compress { + LayerCmd::CompressOne { dest_path, layer_file_path, + } => { + pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::page_cache::init(100); + + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + + let stats = + ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?; + println!( + "Statistics: {stats:#?}\n{}", + serde_json::to_string(&stats).unwrap() + ); + Ok(()) + } + LayerCmd::CompressMany { + dest_path, tenant_remote_prefix, tenant_remote_config, layers_dir, @@ -267,103 +286,83 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); pageserver::page_cache::init(100); - let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); - - if let Some(layer_file_path) = layer_file_path { - let stats = - ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?; - println!( - "Statistics: {stats:#?}\n{}", - serde_json::to_string(&stats).unwrap() - ); - } else if let ( - Some(tenant_remote_prefix), - Some(tenant_remote_config), - Some(layers_dir), - ) = (tenant_remote_prefix, tenant_remote_config, layers_dir) - { - let toml_document = toml_edit::Document::from_str(tenant_remote_config)?; - let toml_item = toml_document - .get("remote_storage") - .expect("need remote_storage"); - let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config"); - let storage = remote_storage::GenericRemoteStorage::from_config(&config)?; - let storage = Arc::new(storage); + let toml_document = toml_edit::Document::from_str(tenant_remote_config)?; + let toml_item = toml_document + .get("remote_storage") + .expect("need remote_storage"); + let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config"); + let storage = remote_storage::GenericRemoteStorage::from_config(&config)?; + let storage = Arc::new(storage); - let cancel = CancellationToken::new(); - let path = RemotePath::from_string(tenant_remote_prefix)?; - let max_files = NonZeroU32::new(16000); - let files_list = storage - .list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel) - .await?; + let cancel = CancellationToken::new(); + let path = RemotePath::from_string(tenant_remote_prefix)?; + let max_files = NonZeroU32::new(16000); + let files_list = storage + .list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel) + .await?; - let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize)); + let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize)); - let mut tasks = JoinSet::new(); - for file_key in files_list.keys.iter() { - let Some(file_name) = file_key.object_name() else { - continue; - }; - let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name) - else { - // Skipping because it's either not a layer or a delta layer - continue; - }; - let json_file_path = layers_dir.join(format!("{file_name}.json")); - if tokio::fs::try_exists(&json_file_path).await? { - // If we have already created a report for the layer, skip it. - continue; - } - let local_layer_path = layers_dir.join(file_name); - async fn stats( - semaphore: Arc, - local_layer_path: Utf8PathBuf, - json_file_path: Utf8PathBuf, - dest_path: Utf8PathBuf, - storage: Arc, - file_key: RemotePath, - ) -> Result, u64)>, anyhow::Error> - { - let _permit = semaphore.acquire().await?; - let cancel = CancellationToken::new(); - let download = storage.download(&file_key, &cancel).await?; - let mut dest_layer_file = - tokio::fs::File::create(&local_layer_path).await?; - let mut body = tokio_util::io::StreamReader::new(download.download_stream); - let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; - let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); - let stats = - ImageLayer::compression_statistics(&dest_path, &local_layer_path, &ctx) - .await?; + let mut tasks = JoinSet::new(); + for file_key in files_list.keys.iter() { + let Some(file_name) = file_key.object_name() else { + continue; + }; + let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name) else { + // Skipping because it's either not a layer or a delta layer + continue; + }; + let json_file_path = layers_dir.join(format!("{file_name}.json")); + if tokio::fs::try_exists(&json_file_path).await? { + // If we have already created a report for the layer, skip it. + continue; + } + let local_layer_path = layers_dir.join(file_name); + async fn stats( + semaphore: Arc, + local_layer_path: Utf8PathBuf, + json_file_path: Utf8PathBuf, + dest_path: Utf8PathBuf, + storage: Arc, + file_key: RemotePath, + ) -> Result, u64)>, anyhow::Error> + { + let _permit = semaphore.acquire().await?; + let cancel = CancellationToken::new(); + let download = storage.download(&file_key, &cancel).await?; + let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?; + let mut body = tokio_util::io::StreamReader::new(download.download_stream); + let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + let stats = + ImageLayer::compression_statistics(&dest_path, &local_layer_path, &ctx) + .await?; - let stats_str = serde_json::to_string(&stats).unwrap(); - tokio::fs::write(json_file_path, stats_str).await?; + let stats_str = serde_json::to_string(&stats).unwrap(); + tokio::fs::write(json_file_path, stats_str).await?; - tokio::fs::remove_file(&local_layer_path).await?; - Ok(stats) - } - let semaphore = semaphore.clone(); - let file_key = file_key.to_owned(); - let storage = storage.clone(); - let dest_path = dest_path.to_owned(); - let file_name = file_name.to_owned(); - tasks.spawn(async move { - let stats = stats( - semaphore, - local_layer_path.to_owned(), - json_file_path.to_owned(), - dest_path, - storage, - file_key, - ) - .await; - println!("Statistics for {file_name}: {stats:#?}\n"); - }); + tokio::fs::remove_file(&local_layer_path).await?; + Ok(stats) } - while let Some(_res) = tasks.join_next().await {} - } else { - anyhow::bail!("No tenant dir or remote config or layers dir specified"); + let semaphore = semaphore.clone(); + let file_key = file_key.to_owned(); + let storage = storage.clone(); + let dest_path = dest_path.to_owned(); + let file_name = file_name.to_owned(); + tasks.spawn(async move { + let stats = stats( + semaphore, + local_layer_path.to_owned(), + json_file_path.to_owned(), + dest_path, + storage, + file_key, + ) + .await; + println!("Statistics for {file_name}: {stats:#?}\n"); + }); } + while let Some(_res) = tasks.join_next().await {} Ok(()) } From d030cbffec5879fd6a007cca02c178616121b598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:24:54 +0200 Subject: [PATCH 06/36] Print number of keys --- pageserver/ctl/src/layers.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index c386d4cca4ec..6286a31dfb27 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -301,6 +301,8 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { .list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel) .await?; + println!("Listing gave {} keys", files_list.keys.len()); + let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize)); let mut tasks = JoinSet::new(); From f132658bd97d7ef56a32211914d04cc8d2b37477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:28:09 +0200 Subject: [PATCH 07/36] Some prints --- pageserver/ctl/src/layers.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 6286a31dfb27..9097b1944b93 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -312,10 +312,12 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { }; let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name) else { // Skipping because it's either not a layer or a delta layer + println!("object {file_name}: not a delta layer"); continue; }; let json_file_path = layers_dir.join(format!("{file_name}.json")); if tokio::fs::try_exists(&json_file_path).await? { + println!("object {file_name}: report already created"); // If we have already created a report for the layer, skip it. continue; } From fce252fb2cdadee5ea211def810b831175ba0d04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:29:49 +0200 Subject: [PATCH 08/36] Rename dest_path to tmp_dir --- pageserver/ctl/src/layers.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 9097b1944b93..89015ce26fc7 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -68,7 +68,7 @@ pub(crate) enum LayerCmd { layer_file_path: Utf8PathBuf, }, CompressMany { - dest_path: Utf8PathBuf, + tmp_dir: Utf8PathBuf, tenant_remote_prefix: String, tenant_remote_config: String, layers_dir: Utf8PathBuf, @@ -277,7 +277,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { Ok(()) } LayerCmd::CompressMany { - dest_path, + tmp_dir, tenant_remote_prefix, tenant_remote_config, layers_dir, @@ -326,7 +326,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { semaphore: Arc, local_layer_path: Utf8PathBuf, json_file_path: Utf8PathBuf, - dest_path: Utf8PathBuf, + tmp_dir: Utf8PathBuf, storage: Arc, file_key: RemotePath, ) -> Result, u64)>, anyhow::Error> @@ -339,7 +339,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let stats = - ImageLayer::compression_statistics(&dest_path, &local_layer_path, &ctx) + ImageLayer::compression_statistics(&tmp_dir, &local_layer_path, &ctx) .await?; let stats_str = serde_json::to_string(&stats).unwrap(); @@ -351,14 +351,14 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let semaphore = semaphore.clone(); let file_key = file_key.to_owned(); let storage = storage.clone(); - let dest_path = dest_path.to_owned(); + let tmp_dir = tmp_dir.to_owned(); let file_name = file_name.to_owned(); tasks.spawn(async move { let stats = stats( semaphore, local_layer_path.to_owned(), json_file_path.to_owned(), - dest_path, + tmp_dir, storage, file_key, ) From 9b74d554b4938980d2fcd664cba80c762ecdb69b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:35:58 +0200 Subject: [PATCH 09/36] Remove generation suffix --- pageserver/ctl/src/layers.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 89015ce26fc7..29f12a7172f0 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -310,7 +310,12 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let Some(file_name) = file_key.object_name() else { continue; }; - let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name) else { + let Some(file_without_generation) = file_name.rsplit_once('-') else { + continue; + }; + let Ok(LayerName::Image(_layer_file_name)) = + LayerName::from_str(file_without_generation.0) + else { // Skipping because it's either not a layer or a delta layer println!("object {file_name}: not a delta layer"); continue; From 80803ff0985f63fd9b3ee310cff2824e7f6dbe9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:40:46 +0200 Subject: [PATCH 10/36] Printing tweaks --- pageserver/ctl/src/layers.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 29f12a7172f0..86cc6c069726 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -317,12 +317,12 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { LayerName::from_str(file_without_generation.0) else { // Skipping because it's either not a layer or a delta layer - println!("object {file_name}: not a delta layer"); + //println!("object {file_name}: not a delta layer"); continue; }; let json_file_path = layers_dir.join(format!("{file_name}.json")); if tokio::fs::try_exists(&json_file_path).await? { - println!("object {file_name}: report already created"); + //println!("object {file_name}: report already created"); // If we have already created a report for the layer, skip it. continue; } @@ -342,6 +342,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?; let mut body = tokio_util::io::StreamReader::new(download.download_stream); let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; + println!("Downloaded file to {local_layer_path}"); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let stats = ImageLayer::compression_statistics(&tmp_dir, &local_layer_path, &ctx) From 3182c3361a585715bdbf2942d577af2fa222341a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:42:37 +0200 Subject: [PATCH 11/36] Corrections --- pageserver/ctl/src/layers.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 86cc6c069726..f811bba6815a 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -310,14 +310,15 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let Some(file_name) = file_key.object_name() else { continue; }; + // Split off the final part. Sometimes this cuts off actually important pieces in case of legacy layer files, but usually it doesn't. let Some(file_without_generation) = file_name.rsplit_once('-') else { continue; }; let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_without_generation.0) else { - // Skipping because it's either not a layer or a delta layer - //println!("object {file_name}: not a delta layer"); + // Skipping because it's either not a layer or an image layer + //println!("object {file_name}: not an image layer"); continue; }; let json_file_path = layers_dir.join(format!("{file_name}.json")); From 0c500450fe48f66e9ecaa429dddecc4354a41bbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 6 Jun 2024 23:49:44 +0200 Subject: [PATCH 12/36] Print error better --- pageserver/ctl/src/layers.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index f811bba6815a..cf19cb7c8b5a 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -370,7 +370,10 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { file_key, ) .await; - println!("Statistics for {file_name}: {stats:#?}\n"); + match stats { + Ok(stats) => println!("Statistics for {file_name}: {stats:#?}\n"), + Err(e) => eprintln!("Error for {file_name}: {e:?}"), + }; }); } while let Some(_res) = tasks.join_next().await {} From a9963db8c369e6488e6ba0351b689a76e947c35a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 00:21:39 +0200 Subject: [PATCH 13/36] Create timeline dir in temp location if not existent --- pageserver/src/tenant/storage_layer/image_layer.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 193012d8fff8..afd1965676fc 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -411,6 +411,7 @@ impl ImageLayer { let block_reader = FileBlockReader::new(&file, file_id); let summary_blk = block_reader.read_blk(0, ctx).await?; let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; + let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id); if summary.magic != IMAGE_FILE_MAGIC { anyhow::bail!("magic file mismatch"); } @@ -424,10 +425,13 @@ impl ImageLayer { let mut key_offset_stream = std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx)); + let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id); + tokio::fs::create_dir_all(timeline_path).await?; + let mut writer = ImageLayerWriter::new( conf, summary.timeline_id, - TenantShardId::unsharded(summary.tenant_id), + tenant_shard_id, &summary.key_range, summary.lsn, ctx, From 8fcb236783607f38fd3000541f2da296a4b77f39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 00:26:02 +0200 Subject: [PATCH 14/36] Increase listing limit --- pageserver/ctl/src/layers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index cf19cb7c8b5a..4523321e4f87 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -296,7 +296,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let cancel = CancellationToken::new(); let path = RemotePath::from_string(tenant_remote_prefix)?; - let max_files = NonZeroU32::new(16000); + let max_files = NonZeroU32::new(128_000); let files_list = storage .list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel) .await?; From 14447b98ced79f6e29a02cd4585fe18eaf227105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 01:03:57 +0200 Subject: [PATCH 15/36] Yield in between --- pageserver/ctl/src/layers.rs | 2 +- pageserver/src/tenant/storage_layer/image_layer.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 4523321e4f87..39e0d36eaa3f 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -371,7 +371,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { ) .await; match stats { - Ok(stats) => println!("Statistics for {file_name}: {stats:#?}\n"), + Ok(stats) => println!("Statistics for {file_name}: {stats:?}\n"), Err(e) => eprintln!("Error for {file_name}: {e:?}"), }; }); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index afd1965676fc..2ec5e3a81b5d 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -393,6 +393,7 @@ impl ImageLayer { ) .await?; stats.push((image_compression, size)); + tokio::task::yield_now().await; } Ok(stats) } From 0e667dcd93ab94227a5a508588bb2ef0c8f6a80e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 01:14:00 +0200 Subject: [PATCH 16/36] more yielding --- pageserver/src/tenant/storage_layer/image_layer.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2ec5e3a81b5d..c1cb328bad92 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -440,11 +440,16 @@ impl ImageLayer { .await?; let cursor = block_reader.block_cursor(); + let mut counter = 0u32; while let Some(r) = key_offset_stream.next().await { let (key, offset) = r?; let key = Key::from_slice(&key); let content = cursor.read_blob(offset, ctx).await?; writer.put_image(key, content.into(), ctx).await?; + counter += 1; + if counter % 2048 == 0 { + tokio::task::yield_now().await; + } } Ok(writer.size()) } From dadbd87ac1e9a876500ae0a51134c0472d581399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 01:57:45 +0200 Subject: [PATCH 17/36] Add percent to output --- pageserver/ctl/src/layers.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 39e0d36eaa3f..7373d7efe5a9 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -306,7 +306,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize)); let mut tasks = JoinSet::new(); - for file_key in files_list.keys.iter() { + for (file_idx, file_key) in files_list.keys.iter().enumerate() { let Some(file_name) = file_key.object_name() else { continue; }; @@ -360,6 +360,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let storage = storage.clone(); let tmp_dir = tmp_dir.to_owned(); let file_name = file_name.to_owned(); + let percent = (file_idx * 100) / files_list.keys.len(); tasks.spawn(async move { let stats = stats( semaphore, @@ -371,7 +372,9 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { ) .await; match stats { - Ok(stats) => println!("Statistics for {file_name}: {stats:?}\n"), + Ok(stats) => { + println!("Statistics for {file_name} ({percent}%): {stats:?}\n") + } Err(e) => eprintln!("Error for {file_name}: {e:?}"), }; }); From c824ffe1dc1f7274e9869a25e2573675067fd59e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 16:32:56 +0200 Subject: [PATCH 18/36] Add ZstdHigh compression mode --- libs/pageserver_api/src/models.rs | 1 + pageserver/ctl/src/layers.rs | 2 ++ pageserver/src/tenant/blob_io.rs | 12 ++++++++++-- pageserver/src/tenant/storage_layer/image_layer.rs | 1 + 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 318dd307d7f4..608459c4c364 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -470,6 +470,7 @@ pub enum CompactionAlgorithm { #[strum(serialize_all = "kebab-case")] pub enum ImageCompressionAlgorithm { Zstd, + ZstdHigh, LZ4, } diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 7373d7efe5a9..d0735cb1b6f7 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -303,6 +303,8 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { println!("Listing gave {} keys", files_list.keys.len()); + tokio::fs::create_dir_all(&layers_dir).await?; + let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize)); let mut tasks = JoinSet::new(); diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 919b1a516274..99ddcf6f7cbf 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -11,6 +11,7 @@ //! len < 128: 0XXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! +use async_compression::Level; use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; use tokio::io::AsyncWriteExt; @@ -266,9 +267,16 @@ impl BlobWriter { const ZSTD: u8 = UNCOMPRESSED | 0x10; const LZ4: u8 = UNCOMPRESSED | 0x20; let (high_bit_mask, len_written, srcbuf) = match algorithm { - Some(ImageCompressionAlgorithm::Zstd) => { + Some(ImageCompressionAlgorithm::Zstd | ImageCompressionAlgorithm::ZstdHigh) => { let mut encoder = - async_compression::tokio::write::ZstdEncoder::new(Vec::new()); + if matches!(algorithm, Some(ImageCompressionAlgorithm::ZstdHigh)) { + async_compression::tokio::write::ZstdEncoder::with_quality( + Vec::new(), + Level::Precise(6), + ) + } else { + async_compression::tokio::write::ZstdEncoder::new(Vec::new()) + }; let slice = srcbuf.slice(..); encoder.write_all(&slice[..]).await.unwrap(); encoder.flush().await.unwrap(); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index c1cb328bad92..b337a141609a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -382,6 +382,7 @@ impl ImageLayer { let image_compressions = [ None, Some(ImageCompressionAlgorithm::Zstd), + Some(ImageCompressionAlgorithm::ZstdHigh), Some(ImageCompressionAlgorithm::LZ4), ]; let mut stats = Vec::new(); From 843d996cb128487fe193fb011f230afdcc3225c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 16:41:06 +0200 Subject: [PATCH 19/36] More precise printing --- pageserver/ctl/src/layers.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index d0735cb1b6f7..b25430537669 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -362,7 +362,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let storage = storage.clone(); let tmp_dir = tmp_dir.to_owned(); let file_name = file_name.to_owned(); - let percent = (file_idx * 100) / files_list.keys.len(); + let percent = (file_idx * 100) as f64 / files_list.keys.len() as f64; tasks.spawn(async move { let stats = stats( semaphore, @@ -375,7 +375,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { .await; match stats { Ok(stats) => { - println!("Statistics for {file_name} ({percent}%): {stats:?}\n") + println!("Statistics for {file_name} ({percent:.1}%): {stats:?}\n") } Err(e) => eprintln!("Error for {file_name}: {e:?}"), }; From e6a0e7ec614511ff0f122825a1dd83c614c59ca7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 17:35:41 +0200 Subject: [PATCH 20/36] Add zstd with low compression quality --- libs/pageserver_api/src/models.rs | 1 + pageserver/src/tenant/blob_io.rs | 10 ++++++++-- pageserver/src/tenant/storage_layer/image_layer.rs | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 608459c4c364..be412e665965 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -469,6 +469,7 @@ pub enum CompactionAlgorithm { )] #[strum(serialize_all = "kebab-case")] pub enum ImageCompressionAlgorithm { + ZstdLow, Zstd, ZstdHigh, LZ4, diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 99ddcf6f7cbf..f3b33a6c1cd8 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -266,10 +266,16 @@ impl BlobWriter { const UNCOMPRESSED: u8 = 0x80; const ZSTD: u8 = UNCOMPRESSED | 0x10; const LZ4: u8 = UNCOMPRESSED | 0x20; + use ImageCompressionAlgorithm::*; let (high_bit_mask, len_written, srcbuf) = match algorithm { - Some(ImageCompressionAlgorithm::Zstd | ImageCompressionAlgorithm::ZstdHigh) => { + Some(ZstdLow | Zstd | ZstdHigh) => { let mut encoder = - if matches!(algorithm, Some(ImageCompressionAlgorithm::ZstdHigh)) { + if matches!(algorithm, Some(ZstdLow)) { + async_compression::tokio::write::ZstdEncoder::with_quality( + Vec::new(), + Level::Precise(1), + ) + } else if matches!(algorithm, Some(ZstdHigh)) { async_compression::tokio::write::ZstdEncoder::with_quality( Vec::new(), Level::Precise(6), diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b337a141609a..6c5a4f2b7e45 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -381,6 +381,7 @@ impl ImageLayer { } let image_compressions = [ None, + Some(ImageCompressionAlgorithm::ZstdLow), Some(ImageCompressionAlgorithm::Zstd), Some(ImageCompressionAlgorithm::ZstdHigh), Some(ImageCompressionAlgorithm::LZ4), From 2eb8b428cc51dea11d9578e30b0144a0027a05d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 22:12:04 +0200 Subject: [PATCH 21/36] Also support the generation-less legacy naming scheme --- pageserver/ctl/src/layers.rs | 29 ++++++++++++++++++----------- pageserver/src/tenant/blob_io.rs | 27 +++++++++++++-------------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index b25430537669..3c7bc935a012 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -312,17 +312,24 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { let Some(file_name) = file_key.object_name() else { continue; }; - // Split off the final part. Sometimes this cuts off actually important pieces in case of legacy layer files, but usually it doesn't. - let Some(file_without_generation) = file_name.rsplit_once('-') else { - continue; - }; - let Ok(LayerName::Image(_layer_file_name)) = - LayerName::from_str(file_without_generation.0) - else { - // Skipping because it's either not a layer or an image layer - //println!("object {file_name}: not an image layer"); - continue; - }; + match LayerName::from_str(file_name) { + Ok(LayerName::Delta(_)) => continue, + Ok(LayerName::Image(_)) => (), + Err(_e) => { + // Split off the final part. We ensured above that this is not turning a + // generation-less delta layer file name into an image layer file name. + let Some(file_without_generation) = file_name.rsplit_once('-') else { + continue; + }; + let Ok(LayerName::Image(_layer_file_name)) = + LayerName::from_str(file_without_generation.0) + else { + // Skipping because it's either not a layer or an image layer + //println!("object {file_name}: not an image layer"); + continue; + }; + } + } let json_file_path = layers_dir.join(format!("{file_name}.json")); if tokio::fs::try_exists(&json_file_path).await? { //println!("object {file_name}: report already created"); diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index f3b33a6c1cd8..ec22114c7377 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -269,20 +269,19 @@ impl BlobWriter { use ImageCompressionAlgorithm::*; let (high_bit_mask, len_written, srcbuf) = match algorithm { Some(ZstdLow | Zstd | ZstdHigh) => { - let mut encoder = - if matches!(algorithm, Some(ZstdLow)) { - async_compression::tokio::write::ZstdEncoder::with_quality( - Vec::new(), - Level::Precise(1), - ) - } else if matches!(algorithm, Some(ZstdHigh)) { - async_compression::tokio::write::ZstdEncoder::with_quality( - Vec::new(), - Level::Precise(6), - ) - } else { - async_compression::tokio::write::ZstdEncoder::new(Vec::new()) - }; + let mut encoder = if matches!(algorithm, Some(ZstdLow)) { + async_compression::tokio::write::ZstdEncoder::with_quality( + Vec::new(), + Level::Precise(1), + ) + } else if matches!(algorithm, Some(ZstdHigh)) { + async_compression::tokio::write::ZstdEncoder::with_quality( + Vec::new(), + Level::Precise(6), + ) + } else { + async_compression::tokio::write::ZstdEncoder::new(Vec::new()) + }; let slice = srcbuf.slice(..); encoder.write_all(&slice[..]).await.unwrap(); encoder.flush().await.unwrap(); From 8fcdc22283a5b04fe673cd7a050a63d9fcc16f8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 7 Jun 2024 23:13:53 +0200 Subject: [PATCH 22/36] Add stats info --- pageserver/ctl/src/layers.rs | 2 +- pageserver/src/tenant/storage_layer/image_layer.rs | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 3c7bc935a012..ffcb97261328 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -344,7 +344,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { tmp_dir: Utf8PathBuf, storage: Arc, file_key: RemotePath, - ) -> Result, u64)>, anyhow::Error> + ) -> Result, u64, u64)>, anyhow::Error> { let _permit = semaphore.acquire().await?; let cancel = CancellationToken::new(); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 6c5a4f2b7e45..7d811f500e02 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -57,6 +57,7 @@ use std::os::unix::prelude::FileExt; use std::str::FromStr; use std::sync::Arc; use tokio::sync::OnceCell; +use tokio::time::Instant; use tokio_stream::StreamExt; use tracing::*; @@ -370,7 +371,7 @@ impl ImageLayer { dest_repo_path: &Utf8Path, path: &Utf8Path, ctx: &RequestContext, - ) -> anyhow::Result, u64)>> { + ) -> anyhow::Result, u64, u64)>> { fn make_conf( image_compression: Option, dest_repo_path: &Utf8Path, @@ -388,13 +389,15 @@ impl ImageLayer { ]; let mut stats = Vec::new(); for image_compression in image_compressions { + let start = Instant::now(); let size = Self::compressed_size_for_conf( path, ctx, make_conf(image_compression, dest_repo_path), ) .await?; - stats.push((image_compression, size)); + let elapsed_ms = start.elapsed().as_millis() as u64; + stats.push((image_compression, size, elapsed_ms)); tokio::task::yield_now().await; } Ok(stats) @@ -442,16 +445,11 @@ impl ImageLayer { .await?; let cursor = block_reader.block_cursor(); - let mut counter = 0u32; while let Some(r) = key_offset_stream.next().await { let (key, offset) = r?; let key = Key::from_slice(&key); let content = cursor.read_blob(offset, ctx).await?; writer.put_image(key, content.into(), ctx).await?; - counter += 1; - if counter % 2048 == 0 { - tokio::task::yield_now().await; - } } Ok(writer.size()) } From 88b24e159322a9648a565680828754b97555e72d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 10 Jun 2024 17:30:56 +0200 Subject: [PATCH 23/36] Move constants out into file --- pageserver/src/tenant/blob_io.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index ec22114c7377..8ace666503fb 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -96,6 +96,10 @@ impl<'a> BlockCursor<'a> { } } +const BYTE_UNCOMPRESSED: u8 = 0x80; +const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; +const BYTE_LZ4: u8 = BYTE_UNCOMPRESSED | 0x20; + /// A wrapper of `VirtualFile` that allows users to write blobs. /// /// If a `BlobWriter` is dropped, the internal buffer will be @@ -263,9 +267,6 @@ impl BlobWriter { srcbuf.slice(..).into_inner(), ); } - const UNCOMPRESSED: u8 = 0x80; - const ZSTD: u8 = UNCOMPRESSED | 0x10; - const LZ4: u8 = UNCOMPRESSED | 0x20; use ImageCompressionAlgorithm::*; let (high_bit_mask, len_written, srcbuf) = match algorithm { Some(ZstdLow | Zstd | ZstdHigh) => { @@ -289,9 +290,9 @@ impl BlobWriter { if compressed.len() < len { let compressed_len = len; compressed_buf = Some(compressed); - (ZSTD, compressed_len, slice.into_inner()) + (BYTE_ZSTD, compressed_len, slice.into_inner()) } else { - (0x80, len, slice.into_inner()) + (BYTE_UNCOMPRESSED, len, slice.into_inner()) } } Some(ImageCompressionAlgorithm::LZ4) => { @@ -300,12 +301,12 @@ impl BlobWriter { if compressed.len() < len { let compressed_len = len; compressed_buf = Some(compressed); - (LZ4, compressed_len, slice.into_inner()) + (BYTE_LZ4, compressed_len, slice.into_inner()) } else { - (0x80, len, slice.into_inner()) + (BYTE_UNCOMPRESSED, len, slice.into_inner()) } } - None => (0x80, len, srcbuf.slice(..).into_inner()), + None => (BYTE_UNCOMPRESSED, len, srcbuf.slice(..).into_inner()), }; let mut len_buf = (len_written as u32).to_be_bytes(); len_buf[0] |= high_bit_mask; From 40e79712ebb66d9a9e2aa3acafb4cb88ce78d8ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 12 Jun 2024 16:04:39 +0200 Subject: [PATCH 24/36] Add decompression --- pageserver/src/tenant/blob_io.rs | 45 +++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 8ace666503fb..a0328951f35d 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -14,6 +14,7 @@ use async_compression::Level; use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; +use postgres_ffi::BLCKSZ; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; @@ -69,12 +70,29 @@ impl<'a> BlockCursor<'a> { len_buf.copy_from_slice(&buf[off..off + 4]); off += 4; } - len_buf[0] &= 0x7f; + len_buf[0] &= 0x0f; u32::from_be_bytes(len_buf) as usize }; + let compression_bits = first_len_byte & 0xf0; + + let mut tmp_buf = Vec::new(); + let buf_to_write; + let compression = if compression_bits <= BYTE_UNCOMPRESSED { + buf_to_write = dstbuf; + None + } else if compression_bits == BYTE_ZSTD || compression_bits == BYTE_LZ4 { + buf_to_write = &mut tmp_buf; + Some(dstbuf) + } else { + let error = std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid compression byte {compression_bits:x}"), + ); + return Err(error); + }; - dstbuf.clear(); - dstbuf.reserve(len); + buf_to_write.clear(); + buf_to_write.reserve(len); // Read the payload let mut remain = len; @@ -88,10 +106,29 @@ impl<'a> BlockCursor<'a> { page_remain = PAGE_SZ; } let this_blk_len = min(remain, page_remain); - dstbuf.extend_from_slice(&buf[off..off + this_blk_len]); + buf_to_write.extend_from_slice(&buf[off..off + this_blk_len]); remain -= this_blk_len; off += this_blk_len; } + + if let Some(dstbuf) = compression { + if compression_bits == BYTE_ZSTD { + let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf); + decoder.write_all(buf_to_write).await?; + } else if compression_bits == BYTE_LZ4 { + let decompressed = lz4_flex::block::decompress(&buf_to_write, 128 as usize) + .map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "lz4 decompression error", + ) + })?; + dstbuf.extend_from_slice(&decompressed); + } else { + unreachable!("already checked above") + } + } + Ok(()) } } From 07bd0ce69e4960e36a29631607abaab87b0af488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 14 Jun 2024 05:18:38 +0200 Subject: [PATCH 25/36] Also measure decompression time --- pageserver/ctl/src/layers.rs | 2 +- pageserver/src/tenant/blob_io.rs | 1 - .../src/tenant/storage_layer/image_layer.rs | 119 +++++++++++++++--- 3 files changed, 101 insertions(+), 21 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index ffcb97261328..5d6133377452 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -344,7 +344,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { tmp_dir: Utf8PathBuf, storage: Arc, file_key: RemotePath, - ) -> Result, u64, u64)>, anyhow::Error> + ) -> Result, u64, u64, u64)>, anyhow::Error> { let _permit = semaphore.acquire().await?; let cancel = CancellationToken::new(); diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index a0328951f35d..d084c7c8779a 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -14,7 +14,6 @@ use async_compression::Level; use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; -use postgres_ffi::BLCKSZ; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 7d811f500e02..fb09df3edc90 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -53,6 +53,7 @@ use serde::{Deserialize, Serialize}; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; +use std::os::unix::fs::MetadataExt; use std::os::unix::prelude::FileExt; use std::str::FromStr; use std::sync::Arc; @@ -371,7 +372,7 @@ impl ImageLayer { dest_repo_path: &Utf8Path, path: &Utf8Path, ctx: &RequestContext, - ) -> anyhow::Result, u64, u64)>> { + ) -> anyhow::Result, u64, u64, u64)>> { fn make_conf( image_compression: Option, dest_repo_path: &Utf8Path, @@ -387,27 +388,35 @@ impl ImageLayer { Some(ImageCompressionAlgorithm::ZstdHigh), Some(ImageCompressionAlgorithm::LZ4), ]; + let confs = image_compressions + .clone() + .map(|compression| make_conf(compression, dest_repo_path)); let mut stats = Vec::new(); - for image_compression in image_compressions { - let start = Instant::now(); - let size = Self::compressed_size_for_conf( - path, - ctx, - make_conf(image_compression, dest_repo_path), - ) - .await?; - let elapsed_ms = start.elapsed().as_millis() as u64; - stats.push((image_compression, size, elapsed_ms)); + for (image_compression, conf) in image_compressions.into_iter().zip(confs) { + let start_compression = Instant::now(); + let compressed_path = Self::compress_for_conf(path, ctx, conf).await?; + let size = path.metadata()?.size(); + let elapsed_ms = start_compression.elapsed().as_millis() as u64; + let start_decompression = Instant::now(); + Self::compare_are_equal(path, &compressed_path, ctx).await?; + let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64; + std::fs::remove_file(compressed_path)?; + stats.push(( + image_compression, + size, + elapsed_ms, + elapsed_decompression_ms, + )); tokio::task::yield_now().await; } Ok(stats) } - async fn compressed_size_for_conf( + async fn compress_for_conf( path: &Utf8Path, ctx: &RequestContext, conf: &'static PageServerConf, - ) -> anyhow::Result { + ) -> anyhow::Result { let file = VirtualFile::open_with_options(path, virtual_file::OpenOptions::new().read(true), ctx) .await @@ -417,7 +426,6 @@ impl ImageLayer { let block_reader = FileBlockReader::new(&file, file_id); let summary_blk = block_reader.read_blk(0, ctx).await?; let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; - let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id); if summary.magic != IMAGE_FILE_MAGIC { anyhow::bail!("magic file mismatch"); } @@ -431,6 +439,7 @@ impl ImageLayer { let mut key_offset_stream = std::pin::pin!(tree_reader.get_stream_from(&[0u8; KEY_SIZE], ctx)); + let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id); let timeline_path = conf.timeline_path(&tenant_shard_id, &summary.timeline_id); tokio::fs::create_dir_all(timeline_path).await?; @@ -451,7 +460,72 @@ impl ImageLayer { let content = cursor.read_blob(offset, ctx).await?; writer.put_image(key, content.into(), ctx).await?; } - Ok(writer.size()) + let path = writer.inner.take().unwrap().finish_inner(ctx).await?.2; + Ok(path) + } + + async fn compare_are_equal( + path_a: &Utf8Path, + path_b: &Utf8Path, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let mut files = Vec::new(); + for path in [path_a, path_b] { + let file = VirtualFile::open_with_options( + path, + virtual_file::OpenOptions::new().read(true), + ctx, + ) + .await + .with_context(|| format!("Failed to open file '{}'", path))?; + files.push(file); + } + + let mut readers_summaries = Vec::new(); + for file in files.iter() { + let file_id = page_cache::next_file_id(); + let block_reader = FileBlockReader::new(&file, file_id); + let summary_blk = block_reader.read_blk(0, ctx).await?; + let summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; + if summary.magic != IMAGE_FILE_MAGIC { + anyhow::bail!("magic file mismatch"); + } + readers_summaries.push((block_reader, summary)); + } + + let mut tree_readers_cursors = Vec::new(); + for (block_reader, summary) in readers_summaries.iter() { + let tree_reader = DiskBtreeReader::new( + summary.index_start_blk, + summary.index_root_blk, + block_reader, + ); + let cursor = block_reader.block_cursor(); + tree_readers_cursors.push((tree_reader, cursor)); + } + + let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0] + .0 + .get_stream_from(&[0u8; KEY_SIZE], ctx)); + let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[0] + .0 + .get_stream_from(&[0u8; KEY_SIZE], ctx)); + while let Some(r) = key_offset_stream_a.next().await { + let (key_a, offset_a): (Vec, _) = r?; + let Some(r) = key_offset_stream_b.next().await else { + panic!("second file at {path_b} has fewer keys than {path_a}"); + }; + let (key_b, offset_b): (Vec, _) = r?; + assert_eq!(key_a, key_b, "mismatch of keys for {path_a}:{path_b}"); + let key = Key::from_slice(&key_a); + let content_a = tree_readers_cursors[0].1.read_blob(offset_a, ctx).await?; + let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?; + assert_eq!( + content_a, content_b, + "mismatch for key={key} and {path_a}:{path_b}" + ); + } + Ok(()) } } @@ -886,11 +960,10 @@ impl ImageLayerWriterInner { /// /// Finish writing the image layer. /// - async fn finish( + async fn finish_inner( self, - timeline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result<(&'static PageServerConf, PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -944,8 +1017,16 @@ impl ImageLayerWriterInner { // fsync the file file.sync_all().await?; + Ok((self.conf, desc, self.path)) + } + async fn finish( + self, + timeline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result { + let (conf, desc, path) = self.finish_inner(ctx).await?; // FIXME: why not carry the virtualfile here, it supports renaming? - let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?; + let layer = Layer::finish_creating(conf, timeline, desc, &path)?; info!("created image layer {}", layer.local_path()); From 2f70221503530498bf5bdd4538da3fdc3d107fe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 14 Jun 2024 05:26:51 +0200 Subject: [PATCH 26/36] Don't forget the flush --- pageserver/src/tenant/blob_io.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index d084c7c8779a..5dd1371b285b 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -114,6 +114,7 @@ impl<'a> BlockCursor<'a> { if compression_bits == BYTE_ZSTD { let mut decoder = async_compression::tokio::write::ZstdDecoder::new(dstbuf); decoder.write_all(buf_to_write).await?; + decoder.flush().await?; } else if compression_bits == BYTE_LZ4 { let decompressed = lz4_flex::block::decompress(&buf_to_write, 128 as usize) .map_err(|_| { From 2ea8d1b15153762d8b96ba58d7c3c57405555c46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 14 Jun 2024 13:58:45 +0200 Subject: [PATCH 27/36] Shutdown instead of flush --- pageserver/src/tenant/blob_io.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 5dd1371b285b..534bdf6f7c5d 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -322,7 +322,7 @@ impl BlobWriter { }; let slice = srcbuf.slice(..); encoder.write_all(&slice[..]).await.unwrap(); - encoder.flush().await.unwrap(); + encoder.shutdown().await.unwrap(); let compressed = encoder.into_inner(); if compressed.len() < len { let compressed_len = len; From 983972f81248b471d0420977600349a05bdcbdad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 14 Jun 2024 18:03:50 +0200 Subject: [PATCH 28/36] Add tests for compression --- pageserver/src/config.rs | 2 ++ pageserver/src/tenant/blob_io.rs | 20 +++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index a7da4f55be24..22c521850f42 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -1367,6 +1367,7 @@ background_task_maximum_delay = '334 s' .expect("Invalid default constant") ), validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, + image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, }, "Correct defaults should be used when no config values are provided" @@ -1440,6 +1441,7 @@ background_task_maximum_delay = '334 s' .expect("Invalid default constant") ), validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, + image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, }, "Should be able to parse all basic config values correctly" diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 534bdf6f7c5d..6a3733308419 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -400,6 +400,12 @@ mod tests { use rand::{Rng, SeedableRng}; async fn round_trip_test(blobs: &[Vec]) -> Result<(), Error> { + round_trip_test_compressed::(blobs).await + } + + async fn round_trip_test_compressed( + blobs: &[Vec], + ) -> Result<(), Error> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); @@ -410,7 +416,12 @@ mod tests { let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { - let (_, res) = wtr.write_blob(blob.clone(), &ctx).await; + let (_, res) = match COMPRESSION { + 0 => wtr.write_blob(blob.clone(), &ctx).await, + 1 => wtr.write_blob_compressed(blob.clone(), &ctx, Some(ImageCompressionAlgorithm::ZstdLow)).await, + 2 => wtr.write_blob_compressed(blob.clone(), &ctx, Some(ImageCompressionAlgorithm::LZ4)).await, + _ => unreachable!("Invalid compression {COMPRESSION}"), + }; let offs = res?; offsets.push(offs); } @@ -466,10 +477,17 @@ mod tests { let blobs = &[ b"test".to_vec(), random_array(10 * PAGE_SZ), + b"hello".to_vec(), + random_array(66 * PAGE_SZ), + vec![0xf3; 24 * PAGE_SZ], b"foobar".to_vec(), ]; round_trip_test::(blobs).await?; round_trip_test::(blobs).await?; + round_trip_test_compressed::(blobs).await?; + round_trip_test_compressed::(blobs).await?; + round_trip_test_compressed::(blobs).await?; + round_trip_test_compressed::(blobs).await?; Ok(()) } From f1bebda7132771fd6d6fc16782667460734260e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 14 Jun 2024 23:57:33 +0200 Subject: [PATCH 29/36] Fix failing test --- pageserver/src/tenant/blob_io.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 6a3733308419..4c2cb4c23a25 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -325,7 +325,7 @@ impl BlobWriter { encoder.shutdown().await.unwrap(); let compressed = encoder.into_inner(); if compressed.len() < len { - let compressed_len = len; + let compressed_len = compressed.len(); compressed_buf = Some(compressed); (BYTE_ZSTD, compressed_len, slice.into_inner()) } else { @@ -336,7 +336,7 @@ impl BlobWriter { let slice = srcbuf.slice(..); let compressed = lz4_flex::block::compress(&slice[..]); if compressed.len() < len { - let compressed_len = len; + let compressed_len = compressed.len(); compressed_buf = Some(compressed); (BYTE_LZ4, compressed_len, slice.into_inner()) } else { @@ -418,8 +418,22 @@ mod tests { for blob in blobs.iter() { let (_, res) = match COMPRESSION { 0 => wtr.write_blob(blob.clone(), &ctx).await, - 1 => wtr.write_blob_compressed(blob.clone(), &ctx, Some(ImageCompressionAlgorithm::ZstdLow)).await, - 2 => wtr.write_blob_compressed(blob.clone(), &ctx, Some(ImageCompressionAlgorithm::LZ4)).await, + 1 => { + wtr.write_blob_compressed( + blob.clone(), + &ctx, + Some(ImageCompressionAlgorithm::ZstdLow), + ) + .await + } + 2 => { + wtr.write_blob_compressed( + blob.clone(), + &ctx, + Some(ImageCompressionAlgorithm::LZ4), + ) + .await + } _ => unreachable!("Invalid compression {COMPRESSION}"), }; let offs = res?; From 0fdeca882c6a5619b70e259844170bfb6032a68f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 15 Jun 2024 00:24:25 +0200 Subject: [PATCH 30/36] Fix tests --- pageserver/src/tenant/blob_io.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 4c2cb4c23a25..0fb7587d5dd5 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -116,11 +116,11 @@ impl<'a> BlockCursor<'a> { decoder.write_all(buf_to_write).await?; decoder.flush().await?; } else if compression_bits == BYTE_LZ4 { - let decompressed = lz4_flex::block::decompress(&buf_to_write, 128 as usize) - .map_err(|_| { + let decompressed = lz4_flex::block::decompress_size_prepended(&buf_to_write) + .map_err(|e| { std::io::Error::new( std::io::ErrorKind::InvalidData, - "lz4 decompression error", + format!("lz4 decompression error: {e:?}"), ) })?; dstbuf.extend_from_slice(&decompressed); @@ -334,7 +334,7 @@ impl BlobWriter { } Some(ImageCompressionAlgorithm::LZ4) => { let slice = srcbuf.slice(..); - let compressed = lz4_flex::block::compress(&slice[..]); + let compressed = lz4_flex::block::compress_prepend_size(&slice[..]); if compressed.len() < len { let compressed_len = compressed.len(); compressed_buf = Some(compressed); From d2533c06a5eb6bcca64e172bcd79dea16ccba3c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 15 Jun 2024 02:28:32 +0200 Subject: [PATCH 31/36] Always delete the file, even on error --- pageserver/src/tenant/storage_layer/image_layer.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index fb09df3edc90..fc06e85dea9d 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -395,12 +395,14 @@ impl ImageLayer { for (image_compression, conf) in image_compressions.into_iter().zip(confs) { let start_compression = Instant::now(); let compressed_path = Self::compress_for_conf(path, ctx, conf).await?; + scopeguard::defer!({ + let _ = std::fs::remove_file(compressed_path); + }); let size = path.metadata()?.size(); let elapsed_ms = start_compression.elapsed().as_millis() as u64; let start_decompression = Instant::now(); Self::compare_are_equal(path, &compressed_path, ctx).await?; let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64; - std::fs::remove_file(compressed_path)?; stats.push(( image_compression, size, From c5034f0e456da8efeaf2372a18b92ac5507d1209 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 17 Jun 2024 21:58:12 +0200 Subject: [PATCH 32/36] Fix build --- pageserver/src/tenant/storage_layer/image_layer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index fc06e85dea9d..79abacb744e5 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -395,8 +395,9 @@ impl ImageLayer { for (image_compression, conf) in image_compressions.into_iter().zip(confs) { let start_compression = Instant::now(); let compressed_path = Self::compress_for_conf(path, ctx, conf).await?; + let path_to_delete = compressed_path.clone(); scopeguard::defer!({ - let _ = std::fs::remove_file(compressed_path); + let _ = std::fs::remove_file(path_to_delete); }); let size = path.metadata()?.size(); let elapsed_ms = start_compression.elapsed().as_millis() as u64; From e749e73ad6e3e65f8ea2ba19fe9900b191478680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 17 Jun 2024 22:25:07 +0200 Subject: [PATCH 33/36] Add the compression algo name --- pageserver/src/tenant/storage_layer/image_layer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 79abacb744e5..ac61f72bf177 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -402,7 +402,7 @@ impl ImageLayer { let size = path.metadata()?.size(); let elapsed_ms = start_compression.elapsed().as_millis() as u64; let start_decompression = Instant::now(); - Self::compare_are_equal(path, &compressed_path, ctx).await?; + Self::compare_are_equal(path, &compressed_path, ctx, &image_compression).await?; let elapsed_decompression_ms = start_decompression.elapsed().as_millis() as u64; stats.push(( image_compression, @@ -471,6 +471,7 @@ impl ImageLayer { path_a: &Utf8Path, path_b: &Utf8Path, ctx: &RequestContext, + cmp: &Option, ) -> anyhow::Result<()> { let mut files = Vec::new(); for path in [path_a, path_b] { @@ -525,7 +526,7 @@ impl ImageLayer { let content_b = tree_readers_cursors[1].1.read_blob(offset_b, ctx).await?; assert_eq!( content_a, content_b, - "mismatch for key={key} and {path_a}:{path_b}" + "mismatch for key={key} cmp={cmp:?} and {path_a}:{path_b}" ); } Ok(()) From 66d3bef9476d4a672a43b8581091d27019726e2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 18 Jun 2024 01:04:52 +0200 Subject: [PATCH 34/36] More printing and assertions --- pageserver/src/tenant/blob_io.rs | 1 + pageserver/src/tenant/storage_layer/image_layer.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 0fb7587d5dd5..f0b32996cc53 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -346,6 +346,7 @@ impl BlobWriter { None => (BYTE_UNCOMPRESSED, len, srcbuf.slice(..).into_inner()), }; let mut len_buf = (len_written as u32).to_be_bytes(); + assert_eq!(len_buf[0] & 0xf0, 0); len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); (self.write_all(io_buf, ctx).await, srcbuf) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index ac61f72bf177..547b095aed83 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -528,6 +528,7 @@ impl ImageLayer { content_a, content_b, "mismatch for key={key} cmp={cmp:?} and {path_a}:{path_b}" ); + println!("match for key={key} cmp={cmp:?} from {path_a}"); } Ok(()) } From 6b67135bd3cb67668bf47a2b07aca7221d90bb17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 18 Jun 2024 01:08:33 +0200 Subject: [PATCH 35/36] Fix offset stream issue --- pageserver/src/tenant/storage_layer/image_layer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 547b095aed83..4714ecedce85 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -511,7 +511,7 @@ impl ImageLayer { let mut key_offset_stream_a = std::pin::pin!(tree_readers_cursors[0] .0 .get_stream_from(&[0u8; KEY_SIZE], ctx)); - let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[0] + let mut key_offset_stream_b = std::pin::pin!(tree_readers_cursors[1] .0 .get_stream_from(&[0u8; KEY_SIZE], ctx)); while let Some(r) = key_offset_stream_a.next().await { @@ -528,7 +528,7 @@ impl ImageLayer { content_a, content_b, "mismatch for key={key} cmp={cmp:?} and {path_a}:{path_b}" ); - println!("match for key={key} cmp={cmp:?} from {path_a}"); + //println!("match for key={key} cmp={cmp:?} from {path_a}"); } Ok(()) } From 5e32cce2d92e3bd802dec58eaa7f032154824343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 22 Jun 2024 18:14:05 +0200 Subject: [PATCH 36/36] Add script for plots --- scripts/plot-compression-report.py | 63 ++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100755 scripts/plot-compression-report.py diff --git a/scripts/plot-compression-report.py b/scripts/plot-compression-report.py new file mode 100755 index 000000000000..ab625c649ea7 --- /dev/null +++ b/scripts/plot-compression-report.py @@ -0,0 +1,63 @@ +#!/usr/bin/env -S python3 -u + +import argparse +import json +import os +from pprint import pprint + +import matplotlib.pyplot as plt + +parser = argparse.ArgumentParser(prog="compression-report") +parser.add_argument("dir") +args = parser.parse_args() + +files = [] +for file_name in os.listdir(args.dir): + if not file_name.endswith(".json"): + continue + file_path = os.path.join(args.dir, file_name) + with open(file_path) as json_str: + json_data = json.load(json_str) + files.append((file_name, json_data)) +#pprint(files) + +extra_zstd_lines = True +dc = 2 # data column to use (1 for sizes, 2 for time) +sort_by = "ZstdHigh" +files.sort(key=lambda file_data: [x for x in file_data[1] if x[0] == sort_by][0][dc]) + + +x_axis = [] +data_baseline = [] +data_lz4 = [] +data_zstd = [] +data_zstd_low = [] +data_zstd_high = [] + +for idx, f in enumerate(files): + file_data = f[1] + #pprint(file_data) + + x_axis.append(idx) + data_baseline.append([x for x in file_data if x[0] is None][0][dc]) + data_lz4.append([x for x in file_data if x[0] == "LZ4"][0][dc]) + data_zstd.append([x for x in file_data if x[0] == "Zstd"][0][dc]) + if extra_zstd_lines: + data_zstd_low.append([x for x in file_data if x[0] == "ZstdLow"][0][dc]) + data_zstd_high.append([x for x in file_data if x[0] == "ZstdHigh"][0][dc]) + +plt.plot(x_axis, data_baseline, "x", markeredgewidth=2, label="baseline") +plt.plot(x_axis, data_lz4, "x", markeredgewidth=2, label="lz4") +plt.plot(x_axis, data_zstd, "x", markeredgewidth=2, label="Zstd") +if extra_zstd_lines: + plt.plot(x_axis, data_zstd_low, "x", markeredgewidth=2, label="ZstdLow") + plt.plot(x_axis, data_zstd_high, "x", markeredgewidth=2, label="ZstdHigh") + +# plt.style.use('_mpl-gallery') +plt.ylim(bottom=0) +plt.legend(loc="upper left") + +figure_path = os.path.join(args.dir, "figure.png") +print(f"saving figure to {figure_path}") +plt.savefig(figure_path) +plt.show()