diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index d1079876f8fe..6332d36dc343 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -515,8 +515,8 @@ impl DeltaLayerWriterInner { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let temp_path = self.path.clone(); let result = self.finish0(key_end, ctx).await; - if result.is_err() { - tracing::info!(%temp_path, "cleaning up temporary file after error during writing"); + if let Err(ref e) = result { + tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}"); if let Err(e) = std::fs::remove_file(&temp_path) { tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing"); } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 6c1a943470c2..b1f2557038d4 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -827,6 +827,25 @@ impl ImageLayerWriterInner { self, ctx: &RequestContext, end_key: Option, + ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + let temp_path = self.path.clone(); + let result = self.finish0(ctx, end_key).await; + if let Err(ref e) = result { + tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}"); + if let Err(e) = std::fs::remove_file(&temp_path) { + tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing"); + } + } + result + } + + /// + /// Finish writing the image layer. + /// + async fn finish0( + self, + ctx: &RequestContext, + end_key: Option, ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index b499a0eef4f1..5bd9a47e2b35 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -42,7 +42,7 @@ impl SplitWriterResult { pub struct SplitImageLayerWriter { inner: ImageLayerWriter, target_layer_size: u64, - generated_layers: Vec, + generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, @@ -71,7 +71,7 @@ impl SplitImageLayerWriter { ctx, ) .await?, - generated_layers: Vec::new(), + generated_layer_writers: Vec::new(), conf, timeline_id, tenant_shard_id, @@ -80,18 +80,12 @@ impl SplitImageLayerWriter { }) } - pub async fn put_image_with_discard_fn( + pub async fn put_image( &mut self, key: Key, img: Bytes, - tline: &Arc, ctx: &RequestContext, - discard: D, - ) -> anyhow::Result<()> - where - D: FnOnce(&PersistentLayerKey) -> F, - F: Future, - { + ) -> anyhow::Result<()> { // The current estimation is an upper bound of the space that the key/image could take // because we did not consider compression in this estimation. The resulting image layer // could be smaller than the target size. @@ -108,72 +102,83 @@ impl SplitImageLayerWriter { ctx, ) .await?; - let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); let layer_key = PersistentLayerKey { key_range: self.start_key..key, lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), is_delta: false, }; + let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); self.start_key = key; - if discard(&layer_key).await { - drop(prev_image_writer); - self.generated_layers - .push(SplitWriterResult::Discarded(layer_key)); - } else { - let (desc, path) = prev_image_writer.finish_with_end_key(key, ctx).await?; - - let layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - self.generated_layers - .push(SplitWriterResult::Produced(layer)); - } + self.generated_layer_writers + .push((prev_image_writer, layer_key)); } self.inner.put_image(key, img, ctx).await } - #[cfg(test)] - pub async fn put_image( - &mut self, - key: Key, - img: Bytes, - tline: &Arc, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false }) - .await - } - pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, ctx: &RequestContext, end_key: Key, - discard: D, + discard_fn: D, ) -> anyhow::Result> where - D: FnOnce(&PersistentLayerKey) -> F, + D: Fn(&PersistentLayerKey) -> F, F: Future, { let Self { - mut generated_layers, + mut generated_layer_writers, inner, .. } = self; - if inner.num_keys() == 0 { - return Ok(generated_layers); + if inner.num_keys() != 0 { + let layer_key = PersistentLayerKey { + key_range: self.start_key..end_key, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), + is_delta: false, + }; + generated_layer_writers.push((inner, layer_key)); } - let layer_key = PersistentLayerKey { - key_range: self.start_key..end_key, - lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), - is_delta: false, + let clean_up_layers = |generated_layers: Vec| { + for produced_layer in generated_layers { + if let SplitWriterResult::Produced(image_layer) = produced_layer { + let layer: Layer = image_layer.into(); + layer.delete_on_drop(); + } + } }; - if discard(&layer_key).await { - generated_layers.push(SplitWriterResult::Discarded(layer_key)); - } else { - let (desc, path) = inner.finish_with_end_key(end_key, ctx).await?; - let layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - generated_layers.push(SplitWriterResult::Produced(layer)); + // BEGIN: catch every error and do the recovery in the below section + let mut generated_layers = Vec::new(); + for (inner, layer_key) in generated_layer_writers { + if discard_fn(&layer_key).await { + generated_layers.push(SplitWriterResult::Discarded(layer_key)); + } else { + let layer = match inner + .finish_with_end_key(layer_key.key_range.end, ctx) + .await + { + Ok((desc, path)) => { + match Layer::finish_creating(self.conf, tline, desc, &path) { + Ok(layer) => layer, + Err(e) => { + tokio::fs::remove_file(&path).await.ok(); + clean_up_layers(generated_layers); + return Err(e); + } + } + } + Err(e) => { + // ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong, + // so we don't need to remove it by ourselves. + clean_up_layers(generated_layers); + return Err(e); + } + }; + generated_layers.push(SplitWriterResult::Produced(layer)); + } } + // END: catch every error and do the recovery in the above section Ok(generated_layers) } @@ -187,11 +192,6 @@ impl SplitImageLayerWriter { self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) .await } - - /// This function will be deprecated with #8841. - pub(crate) fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { - Ok((self.generated_layers, self.inner)) - } } /// A delta writer that takes key-lsn-values and produces multiple delta layers. @@ -296,8 +296,16 @@ impl SplitDeltaLayerWriter { self.generated_layers .push(SplitWriterResult::Discarded(layer_key)); } else { + // `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary + // files for `finish_creating`. let (desc, path) = prev_delta_writer.finish(key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) { + Ok(layer) => layer, + Err(e) => { + tokio::fs::remove_file(&path).await.ok(); + return Err(e); + } + }; self.generated_layers .push(SplitWriterResult::Produced(delta_layer)); } @@ -357,8 +365,16 @@ impl SplitDeltaLayerWriter { if discard(&layer_key).await { generated_layers.push(SplitWriterResult::Discarded(layer_key)); } else { + // `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary + // files for `finish_creating`. let (desc, path) = inner.finish(end_key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) { + Ok(layer) => layer, + Err(e) => { + tokio::fs::remove_file(&path).await.ok(); + return Err(e); + } + }; generated_layers.push(SplitWriterResult::Produced(delta_layer)); } Ok(generated_layers) @@ -447,7 +463,7 @@ mod tests { .unwrap(); image_writer - .put_image(get_key(0), get_img(0), &tline, &ctx) + .put_image(get_key(0), get_img(0), &ctx) .await .unwrap(); let layers = image_writer @@ -486,14 +502,18 @@ mod tests { #[tokio::test] async fn write_split() { + // Test the split writer with retaining all the layers we have produced (discard=false) write_split_helper("split_writer_write_split", false).await; } #[tokio::test] async fn write_split_discard() { - write_split_helper("split_writer_write_split_discard", false).await; + // Test the split writer with discarding all the layers we have produced (discard=true) + write_split_helper("split_writer_write_split_discard", true).await; } + /// Test the image+delta writer by writing a large number of images and deltas. If discard is + /// set to true, all layers will be discarded. async fn write_split_helper(harness_name: &'static str, discard: bool) { let harness = TenantHarness::create(harness_name).await.unwrap(); let (tenant, ctx) = harness.load().await; @@ -527,9 +547,7 @@ mod tests { for i in 0..N { let i = i as u32; image_writer - .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async { - discard - }) + .put_image(get_key(i), get_large_img(), &ctx) .await .unwrap(); delta_writer @@ -545,51 +563,54 @@ mod tests { .unwrap(); } let image_layers = image_writer - .finish(&tline, &ctx, get_key(N as u32)) + .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard }) .await .unwrap(); - let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap(); - if discard { - for layer in image_layers { - layer.into_discarded_layer(); - } - for layer in delta_layers { - layer.into_discarded_layer(); - } - } else { - let image_layers = image_layers - .into_iter() - .map(|x| x.into_resident_layer()) - .collect_vec(); - let delta_layers = delta_layers - .into_iter() - .map(|x| x.into_resident_layer()) - .collect_vec(); - assert_eq!(image_layers.len(), N / 512 + 1); - assert_eq!(delta_layers.len(), N / 512 + 1); - assert_eq!( - delta_layers.first().unwrap().layer_desc().key_range.start, - get_key(0) - ); - assert_eq!( - delta_layers.last().unwrap().layer_desc().key_range.end, - get_key(N as u32) - ); - for idx in 0..image_layers.len() { - assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); - assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX); - if idx > 0 { - assert_eq!( - image_layers[idx - 1].layer_desc().key_range.end, - image_layers[idx].layer_desc().key_range.start - ); - assert_eq!( - delta_layers[idx - 1].layer_desc().key_range.end, - delta_layers[idx].layer_desc().key_range.start - ); + let delta_layers = delta_writer + .finish_with_discard_fn(&tline, &ctx, |_| async { discard }) + .await + .unwrap(); + let image_layers = image_layers + .into_iter() + .map(|x| { + if discard { + x.into_discarded_layer() + } else { + x.into_resident_layer().layer_desc().key() + } + }) + .collect_vec(); + let delta_layers = delta_layers + .into_iter() + .map(|x| { + if discard { + x.into_discarded_layer() + } else { + x.into_resident_layer().layer_desc().key() } + }) + .collect_vec(); + assert_eq!(image_layers.len(), N / 512 + 1); + assert_eq!(delta_layers.len(), N / 512 + 1); + assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0)); + assert_eq!( + delta_layers.last().unwrap().key_range.end, + get_key(N as u32) + ); + for idx in 0..image_layers.len() { + assert_ne!(image_layers[idx].key_range.start, Key::MIN); + assert_ne!(image_layers[idx].key_range.end, Key::MAX); + assert_ne!(delta_layers[idx].key_range.start, Key::MIN); + assert_ne!(delta_layers[idx].key_range.end, Key::MAX); + if idx > 0 { + assert_eq!( + image_layers[idx - 1].key_range.end, + image_layers[idx].key_range.start + ); + assert_eq!( + delta_layers[idx - 1].key_range.end, + delta_layers[idx].key_range.start + ); } } } @@ -629,11 +650,11 @@ mod tests { .unwrap(); image_writer - .put_image(get_key(0), get_img(0), &tline, &ctx) + .put_image(get_key(0), get_img(0), &ctx) .await .unwrap(); image_writer - .put_image(get_key(1), get_large_img(), &tline, &ctx) + .put_image(get_key(1), get_large_img(), &ctx) .await .unwrap(); let layers = image_writer diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 5588363330d0..5cb1460b297e 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -141,9 +141,7 @@ impl KeyHistoryRetention { }; stat.produce_image_key(img); if let Some(image_writer) = image_writer.as_mut() { - image_writer - .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard) - .await?; + image_writer.put_image(key, img.clone(), ctx).await?; } else { delta_writer .put_value_with_discard_fn( @@ -2041,8 +2039,7 @@ impl Timeline { .finish_with_discard_fn(self, ctx, Key::MAX, discard) .await? } else { - let (layers, _) = writer.take()?; - assert!(layers.is_empty(), "image layers produced in dry run mode?"); + drop(writer); Vec::new() } } else {