From e158df4e86318fa3fd5ee9516f3e7ac91dd14283 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 10 Sep 2024 05:03:27 +0800 Subject: [PATCH] feat(pageserver): split delta writer automatically determines key range (#8850) close https://github.com/neondatabase/neon/issues/8838 ## Summary of changes This patch modifies the split delta layer writer to avoid taking start_key and end_key when creating/finishing the layer writer. The start_key for the delta layers will be the first key provided to the layer writer, and the end_key would be the `last_key.next()`. This simplifies the delta layer writer API. On that, the layer key hack is removed. Image layers now use the full key range, and delta layers use the first/last key provided by the user. --------- Signed-off-by: Alex Chi Z --- libs/pageserver_api/src/key.rs | 9 - pageserver/src/tenant.rs | 6 +- .../src/tenant/storage_layer/split_writer.rs | 158 ++++++++++++------ pageserver/src/tenant/timeline/compaction.rs | 7 +- 4 files changed, 109 insertions(+), 71 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 77d744e4da21..8929ccb41d6f 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -263,15 +263,6 @@ impl Key { field5: u8::MAX, field6: u32::MAX, }; - /// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers - pub const NON_L0_MAX: Key = Key { - field1: u8::MAX, - field2: u32::MAX, - field3: u32::MAX, - field4: u32::MAX, - field5: u8::MAX, - field6: u32::MAX - 1, - }; pub fn from_hex(s: &str) -> Result { if s.len() != 36 { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index fd2520a42eb3..c6f0e481017e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -7091,13 +7091,13 @@ mod tests { vec![ // Image layer at GC horizon PersistentLayerKey { - key_range: Key::MIN..Key::NON_L0_MAX, + key_range: Key::MIN..Key::MAX, lsn_range: Lsn(0x30)..Lsn(0x31), is_delta: false }, - // The delta layer covers the full range (with the layer key hack to avoid being recognized as L0) + // The delta layer below the horizon PersistentLayerKey { - key_range: Key::MIN..Key::NON_L0_MAX, + key_range: get_key(3)..get_key(4), lsn_range: Lsn(0x30)..Lsn(0x48), is_delta: true }, diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index 7c1ac863bf2e..40a6a77a5013 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -188,7 +188,7 @@ impl SplitImageLayerWriter { .await } - /// When split writer fails, the caller should call this function and handle partially generated layers. + /// This function will be deprecated with #8841. pub(crate) fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { Ok((self.generated_layers, self.inner)) } @@ -204,7 +204,7 @@ impl SplitImageLayerWriter { /// will split them into multiple files based on size. #[must_use] pub struct SplitDeltaLayerWriter { - inner: DeltaLayerWriter, + inner: Option<(Key, DeltaLayerWriter)>, target_layer_size: u64, generated_layers: Vec, conf: &'static PageServerConf, @@ -212,7 +212,6 @@ pub struct SplitDeltaLayerWriter { tenant_shard_id: TenantShardId, lsn_range: Range, last_key_written: Key, - start_key: Key, } impl SplitDeltaLayerWriter { @@ -220,29 +219,18 @@ impl SplitDeltaLayerWriter { conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, - start_key: Key, lsn_range: Range, target_layer_size: u64, - ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { target_layer_size, - inner: DeltaLayerWriter::new( - conf, - timeline_id, - tenant_shard_id, - start_key, - lsn_range.clone(), - ctx, - ) - .await?, + inner: None, generated_layers: Vec::new(), conf, timeline_id, tenant_shard_id, lsn_range, last_key_written: Key::MIN, - start_key, }) } @@ -265,9 +253,26 @@ impl SplitDeltaLayerWriter { // // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction // strategy. https://github.com/neondatabase/neon/issues/8837 + + if self.inner.is_none() { + self.inner = Some(( + key, + DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + key, + self.lsn_range.clone(), + ctx, + ) + .await?, + )); + } + let (_, inner) = self.inner.as_mut().unwrap(); + let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */; - if self.inner.num_keys() >= 1 - && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size + if inner.num_keys() >= 1 + && inner.estimated_size() + addition_size_estimation >= self.target_layer_size { if key != self.last_key_written { let next_delta_writer = DeltaLayerWriter::new( @@ -279,13 +284,13 @@ impl SplitDeltaLayerWriter { ctx, ) .await?; - let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); + let (start_key, prev_delta_writer) = + std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap(); let layer_key = PersistentLayerKey { - key_range: self.start_key..key, + key_range: start_key..key, lsn_range: self.lsn_range.clone(), is_delta: true, }; - self.start_key = key; if discard(&layer_key).await { drop(prev_delta_writer); self.generated_layers @@ -296,17 +301,18 @@ impl SplitDeltaLayerWriter { self.generated_layers .push(SplitWriterResult::Produced(delta_layer)); } - } else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT { + } else if inner.estimated_size() >= S3_UPLOAD_LIMIT { // We have to produce a very large file b/c a key is updated too often. anyhow::bail!( "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced", key, - self.inner.estimated_size() + inner.estimated_size() ); } } self.last_key_written = key; - self.inner.put_value(key, lsn, val, ctx).await + let (_, inner) = self.inner.as_mut().unwrap(); + inner.put_value(key, lsn, val, ctx).await } pub async fn put_value( @@ -325,7 +331,6 @@ impl SplitDeltaLayerWriter { self, tline: &Arc, ctx: &RequestContext, - end_key: Key, discard: D, ) -> anyhow::Result> where @@ -337,11 +342,15 @@ impl SplitDeltaLayerWriter { inner, .. } = self; + let Some((start_key, inner)) = inner else { + return Ok(generated_layers); + }; if inner.num_keys() == 0 { return Ok(generated_layers); } + let end_key = self.last_key_written.next(); let layer_key = PersistentLayerKey { - key_range: self.start_key..end_key, + key_range: start_key..end_key, lsn_range: self.lsn_range.clone(), is_delta: true, }; @@ -360,15 +369,14 @@ impl SplitDeltaLayerWriter { self, tline: &Arc, ctx: &RequestContext, - end_key: Key, ) -> anyhow::Result> { - self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) + self.finish_with_discard_fn(tline, ctx, |_| async { false }) .await } - /// When split writer fails, the caller should call this function and handle partially generated layers. - pub(crate) fn take(self) -> anyhow::Result<(Vec, DeltaLayerWriter)> { - Ok((self.generated_layers, self.inner)) + /// This function will be deprecated with #8841. + pub(crate) fn take(self) -> anyhow::Result<(Vec, Option)> { + Ok((self.generated_layers, self.inner.map(|x| x.1))) } } @@ -432,10 +440,8 @@ mod tests { tenant.conf, tline.timeline_id, tenant.tenant_shard_id, - get_key(0), Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, - &ctx, ) .await .unwrap(); @@ -460,11 +466,22 @@ mod tests { ) .await .unwrap(); - let layers = delta_writer - .finish(&tline, &ctx, get_key(10)) - .await - .unwrap(); + let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); assert_eq!(layers.len(), 1); + assert_eq!( + layers + .into_iter() + .next() + .unwrap() + .into_resident_layer() + .layer_desc() + .key(), + PersistentLayerKey { + key_range: get_key(0)..get_key(1), + lsn_range: Lsn(0x18)..Lsn(0x20), + is_delta: true + } + ); } #[tokio::test] @@ -501,10 +518,8 @@ mod tests { tenant.conf, tline.timeline_id, tenant.tenant_shard_id, - get_key(0), Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, - &ctx, ) .await .unwrap(); @@ -533,10 +548,7 @@ mod tests { .finish(&tline, &ctx, get_key(N as u32)) .await .unwrap(); - let delta_layers = delta_writer - .finish(&tline, &ctx, get_key(N as u32)) - .await - .unwrap(); + let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap(); if discard { for layer in image_layers { layer.into_discarded_layer(); @@ -555,6 +567,14 @@ mod tests { .collect_vec(); assert_eq!(image_layers.len(), N / 512 + 1); assert_eq!(delta_layers.len(), N / 512 + 1); + assert_eq!( + delta_layers.first().unwrap().layer_desc().key_range.start, + get_key(0) + ); + assert_eq!( + delta_layers.last().unwrap().layer_desc().key_range.end, + get_key(N as u32) + ); for idx in 0..image_layers.len() { assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); @@ -602,10 +622,8 @@ mod tests { tenant.conf, tline.timeline_id, tenant.tenant_shard_id, - get_key(0), Lsn(0x18)..Lsn(0x20), 4 * 1024, - &ctx, ) .await .unwrap(); @@ -644,11 +662,35 @@ mod tests { ) .await .unwrap(); - let layers = delta_writer - .finish(&tline, &ctx, get_key(10)) - .await - .unwrap(); + let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); assert_eq!(layers.len(), 2); + let mut layers_iter = layers.into_iter(); + assert_eq!( + layers_iter + .next() + .unwrap() + .into_resident_layer() + .layer_desc() + .key(), + PersistentLayerKey { + key_range: get_key(0)..get_key(1), + lsn_range: Lsn(0x18)..Lsn(0x20), + is_delta: true + } + ); + assert_eq!( + layers_iter + .next() + .unwrap() + .into_resident_layer() + .layer_desc() + .key(), + PersistentLayerKey { + key_range: get_key(1)..get_key(2), + lsn_range: Lsn(0x18)..Lsn(0x20), + is_delta: true + } + ); } #[tokio::test] @@ -668,10 +710,8 @@ mod tests { tenant.conf, tline.timeline_id, tenant.tenant_shard_id, - get_key(0), Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), 4 * 1024 * 1024, - &ctx, ) .await .unwrap(); @@ -689,10 +729,20 @@ mod tests { .await .unwrap(); } - let delta_layers = delta_writer - .finish(&tline, &ctx, get_key(N as u32)) - .await - .unwrap(); + let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap(); assert_eq!(delta_layers.len(), 1); + let delta_layer = delta_layers + .into_iter() + .next() + .unwrap() + .into_resident_layer(); + assert_eq!( + delta_layer.layer_desc().key(), + PersistentLayerKey { + key_range: get_key(0)..get_key(1), + lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), + is_delta: true + } + ); } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index a87b502cd607..0b5c520ba7ad 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1809,7 +1809,6 @@ impl Timeline { .unwrap(); // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized // as an L0 layer. - let hack_end_key = Key::NON_L0_MAX; let mut delta_layers = Vec::new(); let mut image_layers = Vec::new(); let mut downloaded_layers = Vec::new(); @@ -1855,10 +1854,8 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - Key::MIN, lowest_retain_lsn..end_lsn, self.get_compaction_target_size(), - ctx, ) .await?; @@ -1965,7 +1962,7 @@ impl Timeline { let produced_image_layers = if let Some(writer) = image_layer_writer { if !dry_run { writer - .finish_with_discard_fn(self, ctx, hack_end_key, discard) + .finish_with_discard_fn(self, ctx, Key::MAX, discard) .await? } else { let (layers, _) = writer.take()?; @@ -1978,7 +1975,7 @@ impl Timeline { let produced_delta_layers = if !dry_run { delta_layer_writer - .finish_with_discard_fn(self, ctx, hack_end_key, discard) + .finish_with_discard_fn(self, ctx, discard) .await? } else { let (layers, _) = delta_layer_writer.take()?;