Skip to content

Commit

Permalink
feat(pageserver): support auto split layers based on size (#8574)
Browse files Browse the repository at this point in the history
part of #8002

## Summary of changes

Add a `SplitImageWriter` that automatically splits image layer based on
estimated target image layer size. This does not consider compression
and we might need a better metrics.

---------

Signed-off-by: Alex Chi Z <[email protected]>
Co-authored-by: Arpad Müller <[email protected]>
  • Loading branch information
skyzh and arpad-m authored Aug 5, 2024
1 parent 6814bdd commit 1dc496a
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ mod layer_desc;
mod layer_name;
pub mod merge_iterator;

#[cfg(test)]
pub mod split_writer;

use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::walrecord::NeonWalRecord;
Expand Down
58 changes: 56 additions & 2 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,14 @@ struct ImageLayerWriterInner {
// where we have chosen their compressed form
uncompressed_bytes_chosen: u64,

// Number of keys in the layer.
num_keys: usize,

blob_writer: BlobWriter<false>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,

#[cfg_attr(not(feature = "testing"), allow(dead_code))]
last_written_key: Key,
}

impl ImageLayerWriterInner {
Expand Down Expand Up @@ -800,6 +806,8 @@ impl ImageLayerWriterInner {
uncompressed_bytes: 0,
uncompressed_bytes_eligible: 0,
uncompressed_bytes_chosen: 0,
num_keys: 0,
last_written_key: Key::MIN,
};

Ok(writer)
Expand All @@ -820,6 +828,7 @@ impl ImageLayerWriterInner {
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
self.num_keys += 1;
let (_img, res) = self
.blob_writer
.write_blob_maybe_compressed(img, ctx, compression)
Expand All @@ -839,6 +848,11 @@ impl ImageLayerWriterInner {
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;

#[cfg(feature = "testing")]
{
self.last_written_key = key;
}

Ok(())
}

Expand All @@ -849,6 +863,7 @@ impl ImageLayerWriterInner {
self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
Expand Down Expand Up @@ -899,11 +914,23 @@ impl ImageLayerWriterInner {
let desc = PersistentLayerDesc::new_img(
self.tenant_shard_id,
self.timeline_id,
self.key_range.clone(),
if let Some(end_key) = end_key {
self.key_range.start..end_key
} else {
self.key_range.clone()
},
self.lsn,
metadata.len(),
);

#[cfg(feature = "testing")]
if let Some(end_key) = end_key {
assert!(
self.last_written_key < end_key,
"written key violates end_key range"
);
}

// Note: Because we open the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
Expand Down Expand Up @@ -980,6 +1007,18 @@ impl ImageLayerWriter {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}

#[cfg(test)]
/// Estimated size of the image layer.
pub(crate) fn estimated_size(&self) -> u64 {
let inner = self.inner.as_ref().unwrap();
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
}

#[cfg(test)]
pub(crate) fn num_keys(&self) -> usize {
self.inner.as_ref().unwrap().num_keys
}

///
/// Finish writing the image layer.
///
Expand All @@ -988,7 +1027,22 @@ impl ImageLayerWriter {
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<super::ResidentLayer> {
self.inner.take().unwrap().finish(timeline, ctx).await
self.inner.take().unwrap().finish(timeline, ctx, None).await
}

#[cfg(test)]
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,
timeline: &Arc<Timeline>,
end_key: Key,
ctx: &RequestContext,
) -> anyhow::Result<super::ResidentLayer> {
self.inner
.take()
.unwrap()
.finish(timeline, ctx, Some(end_key))
.await
}
}

Expand Down
244 changes: 244 additions & 0 deletions pageserver/src/tenant/storage_layer/split_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
use std::sync::Arc;

use bytes::Bytes;
use pageserver_api::key::{Key, KEY_SIZE};
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};

use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline};

use super::{ImageLayerWriter, ResidentLayer};

/// An image writer that takes images and produces multiple image layers. The interface does not
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
/// to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
target_layer_size: u64,
generated_layers: Vec<ResidentLayer>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn: Lsn,
}

impl SplitImageLayerWriter {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: ImageLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
&(start_key..Key::MAX),
lsn,
ctx,
)
.await?,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn,
})
}

pub async fn put_image(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
let next_image_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
ctx,
)
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.generated_layers.push(
prev_image_writer
.finish_with_end_key(tline, key, ctx)
.await?,
);
}
self.inner.put_image(key, img, ctx).await
}

pub(crate) async fn finish(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<ResidentLayer>> {
let Self {
mut generated_layers,
inner,
..
} = self;
generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?);
Ok(generated_layers)
}
}

#[cfg(test)]
mod tests {
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::AsLayerDesc,
},
DEFAULT_PG_VERSION,
};

use super::*;

fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}

fn get_img(id: u32) -> Bytes {
format!("{id:064}").into()
}

fn get_large_img() -> Bytes {
vec![0; 8192].into()
}

#[tokio::test]
async fn write_one_image() {
let harness = TenantHarness::create("split_writer_write_one_image")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;

let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

let mut writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();

writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.await
.unwrap();
let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap();
assert_eq!(layers.len(), 1);
}

#[tokio::test]
async fn write_split() {
let harness = TenantHarness::create("split_writer_write_split")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;

let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

let mut writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
const N: usize = 2000;
for i in 0..N {
let i = i as u32;
writer
.put_image(get_key(i), get_large_img(), &tline, &ctx)
.await
.unwrap();
}
let layers = writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
assert_eq!(layers.len(), N / 512 + 1);
for idx in 0..layers.len() {
assert_ne!(layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(layers[idx].layer_desc().key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
layers[idx - 1].layer_desc().key_range.end,
layers[idx].layer_desc().key_range.start
);
}
}
}

#[tokio::test]
async fn write_large_img() {
let harness = TenantHarness::create("split_writer_write_large_img")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;

let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

let mut writer = SplitImageLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18),
4 * 1024,
&ctx,
)
.await
.unwrap();

writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.await
.unwrap();
writer
.put_image(get_key(1), get_large_img(), &tline, &ctx)
.await
.unwrap();
let layers = writer.finish(&tline, &ctx, get_key(10)).await.unwrap();
assert_eq!(layers.len(), 2);
}
}

1 comment on commit 1dc496a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2196 tests run: 2115 passed, 0 failed, 81 skipped (full report)


Code coverage* (full report)

  • functions: 32.9% (7147 of 21716 functions)
  • lines: 50.5% (57582 of 114046 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
1dc496a at 2024-08-05T07:53:54.966Z :recycle:

Please sign in to comment.