From 1dc496a2c9a8dd8a9a7aa4f08a08555b9005e64c Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." Date: Mon, 5 Aug 2024 13:55:36 +0800 Subject: [PATCH] feat(pageserver): support auto split layers based on size (#8574) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit part of https://github.com/neondatabase/neon/issues/8002 ## Summary of changes Add a `SplitImageWriter` that automatically splits image layer based on estimated target image layer size. This does not consider compression and we might need a better metrics. --------- Signed-off-by: Alex Chi Z Co-authored-by: Arpad Müller --- pageserver/src/tenant/storage_layer.rs | 3 + .../src/tenant/storage_layer/image_layer.rs | 58 ++++- .../src/tenant/storage_layer/split_writer.rs | 244 ++++++++++++++++++ 3 files changed, 303 insertions(+), 2 deletions(-) create mode 100644 pageserver/src/tenant/storage_layer/split_writer.rs diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 4fd110359bba..59d3e1ce099c 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -8,6 +8,9 @@ mod layer_desc; mod layer_name; pub mod merge_iterator; +#[cfg(test)] +pub mod split_writer; + use crate::context::{AccessStatsBehavior, RequestContext}; use crate::repository::Value; use crate::walrecord::NeonWalRecord; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 08db27514a21..aa308ba3c148 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -742,8 +742,14 @@ struct ImageLayerWriterInner { // where we have chosen their compressed form uncompressed_bytes_chosen: u64, + // Number of keys in the layer. + num_keys: usize, + blob_writer: BlobWriter, tree: DiskBtreeBuilder, + + #[cfg_attr(not(feature = "testing"), allow(dead_code))] + last_written_key: Key, } impl ImageLayerWriterInner { @@ -800,6 +806,8 @@ impl ImageLayerWriterInner { uncompressed_bytes: 0, uncompressed_bytes_eligible: 0, uncompressed_bytes_chosen: 0, + num_keys: 0, + last_written_key: Key::MIN, }; Ok(writer) @@ -820,6 +828,7 @@ impl ImageLayerWriterInner { let compression = self.conf.image_compression; let uncompressed_len = img.len() as u64; self.uncompressed_bytes += uncompressed_len; + self.num_keys += 1; let (_img, res) = self .blob_writer .write_blob_maybe_compressed(img, ctx, compression) @@ -839,6 +848,11 @@ impl ImageLayerWriterInner { key.write_to_byte_slice(&mut keybuf); self.tree.append(&keybuf, off)?; + #[cfg(feature = "testing")] + { + self.last_written_key = key; + } + Ok(()) } @@ -849,6 +863,7 @@ impl ImageLayerWriterInner { self, timeline: &Arc, ctx: &RequestContext, + end_key: Option, ) -> anyhow::Result { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -899,11 +914,23 @@ impl ImageLayerWriterInner { let desc = PersistentLayerDesc::new_img( self.tenant_shard_id, self.timeline_id, - self.key_range.clone(), + if let Some(end_key) = end_key { + self.key_range.start..end_key + } else { + self.key_range.clone() + }, self.lsn, metadata.len(), ); + #[cfg(feature = "testing")] + if let Some(end_key) = end_key { + assert!( + self.last_written_key < end_key, + "written key violates end_key range" + ); + } + // Note: Because we open the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't // set inner.file here. The first read will have to re-open it. @@ -980,6 +1007,18 @@ impl ImageLayerWriter { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } + #[cfg(test)] + /// Estimated size of the image layer. + pub(crate) fn estimated_size(&self) -> u64 { + let inner = self.inner.as_ref().unwrap(); + inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 + } + + #[cfg(test)] + pub(crate) fn num_keys(&self) -> usize { + self.inner.as_ref().unwrap().num_keys + } + /// /// Finish writing the image layer. /// @@ -988,7 +1027,22 @@ impl ImageLayerWriter { timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { - self.inner.take().unwrap().finish(timeline, ctx).await + self.inner.take().unwrap().finish(timeline, ctx, None).await + } + + #[cfg(test)] + /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive. + pub(super) async fn finish_with_end_key( + mut self, + timeline: &Arc, + end_key: Key, + ctx: &RequestContext, + ) -> anyhow::Result { + self.inner + .take() + .unwrap() + .finish(timeline, ctx, Some(end_key)) + .await } } diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs new file mode 100644 index 000000000000..a4091a890c7e --- /dev/null +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -0,0 +1,244 @@ +use std::sync::Arc; + +use bytes::Bytes; +use pageserver_api::key::{Key, KEY_SIZE}; +use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; + +use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline}; + +use super::{ImageLayerWriter, ResidentLayer}; + +/// An image writer that takes images and produces multiple image layers. The interface does not +/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files +/// to be cleaned up) +#[must_use] +pub struct SplitImageLayerWriter { + inner: ImageLayerWriter, + target_layer_size: u64, + generated_layers: Vec, + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_shard_id: TenantShardId, + lsn: Lsn, +} + +impl SplitImageLayerWriter { + pub async fn new( + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_shard_id: TenantShardId, + start_key: Key, + lsn: Lsn, + target_layer_size: u64, + ctx: &RequestContext, + ) -> anyhow::Result { + Ok(Self { + target_layer_size, + inner: ImageLayerWriter::new( + conf, + timeline_id, + tenant_shard_id, + &(start_key..Key::MAX), + lsn, + ctx, + ) + .await?, + generated_layers: Vec::new(), + conf, + timeline_id, + tenant_shard_id, + lsn, + }) + } + + pub async fn put_image( + &mut self, + key: Key, + img: Bytes, + tline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + // The current estimation is an upper bound of the space that the key/image could take + // because we did not consider compression in this estimation. The resulting image layer + // could be smaller than the target size. + let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64; + if self.inner.num_keys() >= 1 + && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size + { + let next_image_writer = ImageLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + &(key..Key::MAX), + self.lsn, + ctx, + ) + .await?; + let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); + self.generated_layers.push( + prev_image_writer + .finish_with_end_key(tline, key, ctx) + .await?, + ); + } + self.inner.put_image(key, img, ctx).await + } + + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + end_key: Key, + ) -> anyhow::Result> { + let Self { + mut generated_layers, + inner, + .. + } = self; + generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?); + Ok(generated_layers) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + tenant::{ + harness::{TenantHarness, TIMELINE_ID}, + storage_layer::AsLayerDesc, + }, + DEFAULT_PG_VERSION, + }; + + use super::*; + + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + fn get_img(id: u32) -> Bytes { + format!("{id:064}").into() + } + + fn get_large_img() -> Bytes { + vec![0; 8192].into() + } + + #[tokio::test] + async fn write_one_image() { + let harness = TenantHarness::create("split_writer_write_one_image") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let mut writer = SplitImageLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + + writer + .put_image(get_key(0), get_img(0), &tline, &ctx) + .await + .unwrap(); + let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap(); + assert_eq!(layers.len(), 1); + } + + #[tokio::test] + async fn write_split() { + let harness = TenantHarness::create("split_writer_write_split") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let mut writer = SplitImageLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + const N: usize = 2000; + for i in 0..N { + let i = i as u32; + writer + .put_image(get_key(i), get_large_img(), &tline, &ctx) + .await + .unwrap(); + } + let layers = writer + .finish(&tline, &ctx, get_key(N as u32)) + .await + .unwrap(); + assert_eq!(layers.len(), N / 512 + 1); + for idx in 0..layers.len() { + assert_ne!(layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(layers[idx].layer_desc().key_range.end, Key::MAX); + if idx > 0 { + assert_eq!( + layers[idx - 1].layer_desc().key_range.end, + layers[idx].layer_desc().key_range.start + ); + } + } + } + + #[tokio::test] + async fn write_large_img() { + let harness = TenantHarness::create("split_writer_write_large_img") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + let mut writer = SplitImageLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x18), + 4 * 1024, + &ctx, + ) + .await + .unwrap(); + + writer + .put_image(get_key(0), get_img(0), &tline, &ctx) + .await + .unwrap(); + writer + .put_image(get_key(1), get_large_img(), &tline, &ctx) + .await + .unwrap(); + let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap(); + assert_eq!(layers.len(), 2); + } +}