diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index df910b5ad9aa..8902f0c51028 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -41,7 +41,7 @@ impl SplitWriterResult { pub struct SplitImageLayerWriter { inner: ImageLayerWriter, target_layer_size: u64, - generated_layers: Vec, + generated_layer_writers: Vec<(ImageLayerWriter, Key)>, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, @@ -70,7 +70,7 @@ impl SplitImageLayerWriter { ctx, ) .await?, - generated_layers: Vec::new(), + generated_layer_writers: Vec::new(), conf, timeline_id, tenant_shard_id, @@ -79,18 +79,13 @@ impl SplitImageLayerWriter { }) } - pub async fn put_image_with_discard_fn( + pub async fn put_image( &mut self, key: Key, img: Bytes, - tline: &Arc, + #[allow(unused)] tline: &Arc, ctx: &RequestContext, - discard: D, - ) -> anyhow::Result<()> - where - D: FnOnce(&PersistentLayerKey) -> F, - F: Future, - { + ) -> anyhow::Result<()> { // The current estimation is an upper bound of the space that the key/image could take // because we did not consider compression in this estimation. The resulting image layer // could be smaller than the target size. @@ -108,74 +103,67 @@ impl SplitImageLayerWriter { ) .await?; let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); - let layer_key = PersistentLayerKey { - key_range: self.start_key..key, - lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), - is_delta: false, - }; self.start_key = key; - if discard(&layer_key).await { - drop(prev_image_writer); - self.generated_layers - .push(SplitWriterResult::Discarded(layer_key)); - } else { - self.generated_layers.push(SplitWriterResult::Produced( - prev_image_writer - .finish_with_end_key(tline, key, ctx) - .await?, - )); - } + self.generated_layer_writers.push((prev_image_writer, key)); } self.inner.put_image(key, img, ctx).await } - #[cfg(test)] - pub async fn put_image( - &mut self, - key: Key, - img: Bytes, - tline: &Arc, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false }) - .await - } - pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, ctx: &RequestContext, end_key: Key, - discard: D, + discard_fn: D, ) -> anyhow::Result> where - D: FnOnce(&PersistentLayerKey) -> F, + D: Fn(&PersistentLayerKey) -> F, F: Future, { let Self { - mut generated_layers, + mut generated_layer_writers, inner, + lsn, .. } = self; - if inner.num_keys() == 0 { - return Ok(generated_layers); + if inner.num_keys() != 0 { + generated_layer_writers.push((inner, end_key)); } - let layer_key = PersistentLayerKey { - key_range: self.start_key..end_key, - lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), - is_delta: false, - }; - if discard(&layer_key).await { - generated_layers.push(SplitWriterResult::Discarded(layer_key)); - } else { - generated_layers.push(SplitWriterResult::Produced( - inner.finish_with_end_key(tline, end_key, ctx).await?, - )); + // BEGIN: catch every error and do the recovery in the below section + let mut generated_layers = Vec::new(); + for (inner, end_key) in generated_layer_writers { + let layer_key = PersistentLayerKey { + key_range: self.start_key..end_key, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn), + is_delta: false, + }; + if discard_fn(&layer_key).await { + generated_layers.push(SplitWriterResult::Discarded(layer_key)); + } else { + let layer = match inner.finish_with_end_key(tline, end_key, ctx).await { + Ok(layer) => layer, + Err(e) => { + for produced_layer in generated_layers { + if let SplitWriterResult::Produced(image_layer) = produced_layer { + let layer: Layer = image_layer.into(); + layer.delete_on_drop(); + } + } + return Err(e); + } + }; + generated_layers.push(SplitWriterResult::Produced(layer)); + } } + // END: catch every error and do the recovery in the above section Ok(generated_layers) } + pub fn discard(self) { + // do nothing + } + #[cfg(test)] pub(crate) async fn finish( self, @@ -186,11 +174,6 @@ impl SplitImageLayerWriter { self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) .await } - - /// When split writer fails, the caller should call this function and handle partially generated layers. - pub(crate) fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { - Ok((self.generated_layers, self.inner)) - } } /// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not @@ -510,9 +493,7 @@ mod tests { for i in 0..N { let i = i as u32; image_writer - .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async { - discard - }) + .put_image(get_key(i), get_large_img(), &tline, &ctx) .await .unwrap(); delta_writer diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index aad75ac59c57..29c6b3c9f3b1 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -136,9 +136,7 @@ impl KeyHistoryRetention { }; stat.produce_image_key(img); if let Some(image_writer) = image_writer.as_mut() { - image_writer - .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard) - .await?; + image_writer.put_image(key, img.clone(), tline, ctx).await?; } else { delta_writer .put_value_with_discard_fn( @@ -2127,8 +2125,7 @@ impl Timeline { .finish_with_discard_fn(self, ctx, hack_end_key, discard) .await? } else { - let (layers, _) = writer.take()?; - assert!(layers.is_empty(), "image layers produced in dry run mode?"); + writer.discard(); Vec::new() } } else {