From 23ce1782a2d589d7b63220f6df86d29eac740762 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 27 Aug 2024 13:13:52 -0400 Subject: [PATCH 1/2] feat(pageserver): split delta writer automatically determines key range Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 6 +- .../src/tenant/storage_layer/split_writer.rs | 151 ++++++++++++------ pageserver/src/tenant/timeline/compaction.rs | 6 +- 3 files changed, 108 insertions(+), 55 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index fd2520a42eb3..c6f0e481017e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -7091,13 +7091,13 @@ mod tests { vec![ // Image layer at GC horizon PersistentLayerKey { - key_range: Key::MIN..Key::NON_L0_MAX, + key_range: Key::MIN..Key::MAX, lsn_range: Lsn(0x30)..Lsn(0x31), is_delta: false }, - // The delta layer covers the full range (with the layer key hack to avoid being recognized as L0) + // The delta layer below the horizon PersistentLayerKey { - key_range: Key::MIN..Key::NON_L0_MAX, + key_range: get_key(3)..get_key(4), lsn_range: Lsn(0x30)..Lsn(0x48), is_delta: true }, diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index 7c1ac863bf2e..43e34526be0e 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -204,7 +204,7 @@ impl SplitImageLayerWriter { /// will split them into multiple files based on size. #[must_use] pub struct SplitDeltaLayerWriter { - inner: DeltaLayerWriter, + inner: Option<(Key, DeltaLayerWriter)>, target_layer_size: u64, generated_layers: Vec, conf: &'static PageServerConf, @@ -212,7 +212,6 @@ pub struct SplitDeltaLayerWriter { tenant_shard_id: TenantShardId, lsn_range: Range, last_key_written: Key, - start_key: Key, } impl SplitDeltaLayerWriter { @@ -220,29 +219,19 @@ impl SplitDeltaLayerWriter { conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, - start_key: Key, lsn_range: Range, target_layer_size: u64, - ctx: &RequestContext, + #[allow(unused)] ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { target_layer_size, - inner: DeltaLayerWriter::new( - conf, - timeline_id, - tenant_shard_id, - start_key, - lsn_range.clone(), - ctx, - ) - .await?, + inner: None, generated_layers: Vec::new(), conf, timeline_id, tenant_shard_id, lsn_range, last_key_written: Key::MIN, - start_key, }) } @@ -265,9 +254,26 @@ impl SplitDeltaLayerWriter { // // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction // strategy. https://github.com/neondatabase/neon/issues/8837 + + if self.inner.is_none() { + self.inner = Some(( + key, + DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + key, + self.lsn_range.clone(), + ctx, + ) + .await?, + )); + } + let (_, inner) = self.inner.as_mut().unwrap(); + let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */; - if self.inner.num_keys() >= 1 - && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size + if inner.num_keys() >= 1 + && inner.estimated_size() + addition_size_estimation >= self.target_layer_size { if key != self.last_key_written { let next_delta_writer = DeltaLayerWriter::new( @@ -279,13 +285,13 @@ impl SplitDeltaLayerWriter { ctx, ) .await?; - let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); + let (start_key, prev_delta_writer) = + std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap(); let layer_key = PersistentLayerKey { - key_range: self.start_key..key, + key_range: start_key..key, lsn_range: self.lsn_range.clone(), is_delta: true, }; - self.start_key = key; if discard(&layer_key).await { drop(prev_delta_writer); self.generated_layers @@ -296,17 +302,18 @@ impl SplitDeltaLayerWriter { self.generated_layers .push(SplitWriterResult::Produced(delta_layer)); } - } else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT { + } else if inner.estimated_size() >= S3_UPLOAD_LIMIT { // We have to produce a very large file b/c a key is updated too often. anyhow::bail!( "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced", key, - self.inner.estimated_size() + inner.estimated_size() ); } } self.last_key_written = key; - self.inner.put_value(key, lsn, val, ctx).await + let (_, inner) = self.inner.as_mut().unwrap(); + inner.put_value(key, lsn, val, ctx).await } pub async fn put_value( @@ -325,7 +332,6 @@ impl SplitDeltaLayerWriter { self, tline: &Arc, ctx: &RequestContext, - end_key: Key, discard: D, ) -> anyhow::Result> where @@ -337,11 +343,15 @@ impl SplitDeltaLayerWriter { inner, .. } = self; + let Some((start_key, inner)) = inner else { + return Ok(generated_layers); + }; if inner.num_keys() == 0 { return Ok(generated_layers); } + let end_key = self.last_key_written.next(); let layer_key = PersistentLayerKey { - key_range: self.start_key..end_key, + key_range: start_key..end_key, lsn_range: self.lsn_range.clone(), is_delta: true, }; @@ -360,15 +370,14 @@ impl SplitDeltaLayerWriter { self, tline: &Arc, ctx: &RequestContext, - end_key: Key, ) -> anyhow::Result> { - self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) + self.finish_with_discard_fn(tline, ctx, |_| async { false }) .await } /// When split writer fails, the caller should call this function and handle partially generated layers. - pub(crate) fn take(self) -> anyhow::Result<(Vec, DeltaLayerWriter)> { - Ok((self.generated_layers, self.inner)) + pub(crate) fn take(self) -> anyhow::Result<(Vec, Option)> { + Ok((self.generated_layers, self.inner.map(|x| x.1))) } } @@ -432,7 +441,6 @@ mod tests { tenant.conf, tline.timeline_id, tenant.tenant_shard_id, - get_key(0), Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, &ctx, @@ -460,11 +468,22 @@ mod tests { ) .await .unwrap(); - let layers = delta_writer - .finish(&tline, &ctx, get_key(10)) - .await - .unwrap(); + let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); assert_eq!(layers.len(), 1); + assert_eq!( + layers + .into_iter() + .next() + .unwrap() + .into_resident_layer() + .layer_desc() + .key(), + PersistentLayerKey { + key_range: get_key(0)..get_key(1), + lsn_range: Lsn(0x18)..Lsn(0x20), + is_delta: true + } + ); } #[tokio::test] @@ -501,7 +520,6 @@ mod tests { tenant.conf, tline.timeline_id, tenant.tenant_shard_id, - get_key(0), Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, &ctx, @@ -533,10 +551,7 @@ mod tests { .finish(&tline, &ctx, get_key(N as u32)) .await .unwrap(); - let delta_layers = delta_writer - .finish(&tline, &ctx, get_key(N as u32)) - .await - .unwrap(); + let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap(); if discard { for layer in image_layers { layer.into_discarded_layer(); @@ -555,6 +570,14 @@ mod tests { .collect_vec(); assert_eq!(image_layers.len(), N / 512 + 1); assert_eq!(delta_layers.len(), N / 512 + 1); + assert_eq!( + delta_layers.first().unwrap().layer_desc().key_range.start, + get_key(0) + ); + assert_eq!( + delta_layers.last().unwrap().layer_desc().key_range.end, + get_key(N as u32) + ); for idx in 0..image_layers.len() { assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); @@ -602,7 +625,6 @@ mod tests { tenant.conf, tline.timeline_id, tenant.tenant_shard_id, - get_key(0), Lsn(0x18)..Lsn(0x20), 4 * 1024, &ctx, @@ -644,11 +666,35 @@ mod tests { ) .await .unwrap(); - let layers = delta_writer - .finish(&tline, &ctx, get_key(10)) - .await - .unwrap(); + let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); assert_eq!(layers.len(), 2); + let mut layers_iter = layers.into_iter(); + assert_eq!( + layers_iter + .next() + .unwrap() + .into_resident_layer() + .layer_desc() + .key(), + PersistentLayerKey { + key_range: get_key(0)..get_key(1), + lsn_range: Lsn(0x18)..Lsn(0x20), + is_delta: true + } + ); + assert_eq!( + layers_iter + .next() + .unwrap() + .into_resident_layer() + .layer_desc() + .key(), + PersistentLayerKey { + key_range: get_key(1)..get_key(2), + lsn_range: Lsn(0x18)..Lsn(0x20), + is_delta: true + } + ); } #[tokio::test] @@ -668,7 +714,6 @@ mod tests { tenant.conf, tline.timeline_id, tenant.tenant_shard_id, - get_key(0), Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), 4 * 1024 * 1024, &ctx, @@ -689,10 +734,20 @@ mod tests { .await .unwrap(); } - let delta_layers = delta_writer - .finish(&tline, &ctx, get_key(N as u32)) - .await - .unwrap(); + let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap(); assert_eq!(delta_layers.len(), 1); + let delta_layer = delta_layers + .into_iter() + .next() + .unwrap() + .into_resident_layer(); + assert_eq!( + delta_layer.layer_desc().key(), + PersistentLayerKey { + key_range: get_key(0)..get_key(1), + lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), + is_delta: true + } + ); } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index a87b502cd607..d58c39557123 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1809,7 +1809,6 @@ impl Timeline { .unwrap(); // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized // as an L0 layer. - let hack_end_key = Key::NON_L0_MAX; let mut delta_layers = Vec::new(); let mut image_layers = Vec::new(); let mut downloaded_layers = Vec::new(); @@ -1855,7 +1854,6 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - Key::MIN, lowest_retain_lsn..end_lsn, self.get_compaction_target_size(), ctx, @@ -1965,7 +1963,7 @@ impl Timeline { let produced_image_layers = if let Some(writer) = image_layer_writer { if !dry_run { writer - .finish_with_discard_fn(self, ctx, hack_end_key, discard) + .finish_with_discard_fn(self, ctx, Key::MAX, discard) .await? } else { let (layers, _) = writer.take()?; @@ -1978,7 +1976,7 @@ impl Timeline { let produced_delta_layers = if !dry_run { delta_layer_writer - .finish_with_discard_fn(self, ctx, hack_end_key, discard) + .finish_with_discard_fn(self, ctx, discard) .await? } else { let (layers, _) = delta_layer_writer.take()?; From a2138c2c9088e7d87f4f96e450d2371d04eb9f6e Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Mon, 9 Sep 2024 14:56:52 +0100 Subject: [PATCH 2/2] resolve comments Signed-off-by: Alex Chi Z --- libs/pageserver_api/src/key.rs | 9 --------- pageserver/src/tenant/storage_layer/split_writer.rs | 9 ++------- pageserver/src/tenant/timeline/compaction.rs | 1 - 3 files changed, 2 insertions(+), 17 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 77d744e4da21..8929ccb41d6f 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -263,15 +263,6 @@ impl Key { field5: u8::MAX, field6: u32::MAX, }; - /// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers - pub const NON_L0_MAX: Key = Key { - field1: u8::MAX, - field2: u32::MAX, - field3: u32::MAX, - field4: u32::MAX, - field5: u8::MAX, - field6: u32::MAX - 1, - }; pub fn from_hex(s: &str) -> Result { if s.len() != 36 { diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index 43e34526be0e..40a6a77a5013 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -188,7 +188,7 @@ impl SplitImageLayerWriter { .await } - /// When split writer fails, the caller should call this function and handle partially generated layers. + /// This function will be deprecated with #8841. pub(crate) fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { Ok((self.generated_layers, self.inner)) } @@ -221,7 +221,6 @@ impl SplitDeltaLayerWriter { tenant_shard_id: TenantShardId, lsn_range: Range, target_layer_size: u64, - #[allow(unused)] ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { target_layer_size, @@ -375,7 +374,7 @@ impl SplitDeltaLayerWriter { .await } - /// When split writer fails, the caller should call this function and handle partially generated layers. + /// This function will be deprecated with #8841. pub(crate) fn take(self) -> anyhow::Result<(Vec, Option)> { Ok((self.generated_layers, self.inner.map(|x| x.1))) } @@ -443,7 +442,6 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, - &ctx, ) .await .unwrap(); @@ -522,7 +520,6 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, - &ctx, ) .await .unwrap(); @@ -627,7 +624,6 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024, - &ctx, ) .await .unwrap(); @@ -716,7 +712,6 @@ mod tests { tenant.tenant_shard_id, Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), 4 * 1024 * 1024, - &ctx, ) .await .unwrap(); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index d58c39557123..0b5c520ba7ad 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1856,7 +1856,6 @@ impl Timeline { self.tenant_shard_id, lowest_retain_lsn..end_lsn, self.get_compaction_target_size(), - ctx, ) .await?;