From aca81f5fa4f3e0f882a9b0d55eef1cdee8ffc168 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Mon, 21 Oct 2024 10:59:48 -0400 Subject: [PATCH] fix(pageserver): make image split layer writer finish atomic (#8841) Part of https://github.com/neondatabase/neon/issues/8836 ## Summary of changes This pull request makes the image layer split writer atomic when finishing the layers. All the produced layers either finish at the same time, or discard at the same time. Note that this does not prevent atomicity when crash, but anyways, it will be cleaned up on pageserver restart. --------- Signed-off-by: Alex Chi Z Co-authored-by: Christian Schwarz --- .../src/tenant/storage_layer/delta_layer.rs | 4 +- .../src/tenant/storage_layer/image_layer.rs | 19 ++ .../src/tenant/storage_layer/split_writer.rs | 231 ++++++++++-------- pageserver/src/tenant/timeline/compaction.rs | 7 +- 4 files changed, 149 insertions(+), 112 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index d1079876f8fe..6332d36dc343 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -515,8 +515,8 @@ impl DeltaLayerWriterInner { ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let temp_path = self.path.clone(); let result = self.finish0(key_end, ctx).await; - if result.is_err() { - tracing::info!(%temp_path, "cleaning up temporary file after error during writing"); + if let Err(ref e) = result { + tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}"); if let Err(e) = std::fs::remove_file(&temp_path) { tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing"); } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 6c1a943470c2..b1f2557038d4 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -827,6 +827,25 @@ impl ImageLayerWriterInner { self, ctx: &RequestContext, end_key: Option, + ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + let temp_path = self.path.clone(); + let result = self.finish0(ctx, end_key).await; + if let Err(ref e) = result { + tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}"); + if let Err(e) = std::fs::remove_file(&temp_path) { + tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing"); + } + } + result + } + + /// + /// Finish writing the image layer. + /// + async fn finish0( + self, + ctx: &RequestContext, + end_key: Option, ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index b499a0eef4f1..5bd9a47e2b35 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -42,7 +42,7 @@ impl SplitWriterResult { pub struct SplitImageLayerWriter { inner: ImageLayerWriter, target_layer_size: u64, - generated_layers: Vec, + generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, @@ -71,7 +71,7 @@ impl SplitImageLayerWriter { ctx, ) .await?, - generated_layers: Vec::new(), + generated_layer_writers: Vec::new(), conf, timeline_id, tenant_shard_id, @@ -80,18 +80,12 @@ impl SplitImageLayerWriter { }) } - pub async fn put_image_with_discard_fn( + pub async fn put_image( &mut self, key: Key, img: Bytes, - tline: &Arc, ctx: &RequestContext, - discard: D, - ) -> anyhow::Result<()> - where - D: FnOnce(&PersistentLayerKey) -> F, - F: Future, - { + ) -> anyhow::Result<()> { // The current estimation is an upper bound of the space that the key/image could take // because we did not consider compression in this estimation. The resulting image layer // could be smaller than the target size. @@ -108,72 +102,83 @@ impl SplitImageLayerWriter { ctx, ) .await?; - let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); let layer_key = PersistentLayerKey { key_range: self.start_key..key, lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), is_delta: false, }; + let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); self.start_key = key; - if discard(&layer_key).await { - drop(prev_image_writer); - self.generated_layers - .push(SplitWriterResult::Discarded(layer_key)); - } else { - let (desc, path) = prev_image_writer.finish_with_end_key(key, ctx).await?; - - let layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - self.generated_layers - .push(SplitWriterResult::Produced(layer)); - } + self.generated_layer_writers + .push((prev_image_writer, layer_key)); } self.inner.put_image(key, img, ctx).await } - #[cfg(test)] - pub async fn put_image( - &mut self, - key: Key, - img: Bytes, - tline: &Arc, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false }) - .await - } - pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, ctx: &RequestContext, end_key: Key, - discard: D, + discard_fn: D, ) -> anyhow::Result> where - D: FnOnce(&PersistentLayerKey) -> F, + D: Fn(&PersistentLayerKey) -> F, F: Future, { let Self { - mut generated_layers, + mut generated_layer_writers, inner, .. } = self; - if inner.num_keys() == 0 { - return Ok(generated_layers); + if inner.num_keys() != 0 { + let layer_key = PersistentLayerKey { + key_range: self.start_key..end_key, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), + is_delta: false, + }; + generated_layer_writers.push((inner, layer_key)); } - let layer_key = PersistentLayerKey { - key_range: self.start_key..end_key, - lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), - is_delta: false, + let clean_up_layers = |generated_layers: Vec| { + for produced_layer in generated_layers { + if let SplitWriterResult::Produced(image_layer) = produced_layer { + let layer: Layer = image_layer.into(); + layer.delete_on_drop(); + } + } }; - if discard(&layer_key).await { - generated_layers.push(SplitWriterResult::Discarded(layer_key)); - } else { - let (desc, path) = inner.finish_with_end_key(end_key, ctx).await?; - let layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - generated_layers.push(SplitWriterResult::Produced(layer)); + // BEGIN: catch every error and do the recovery in the below section + let mut generated_layers = Vec::new(); + for (inner, layer_key) in generated_layer_writers { + if discard_fn(&layer_key).await { + generated_layers.push(SplitWriterResult::Discarded(layer_key)); + } else { + let layer = match inner + .finish_with_end_key(layer_key.key_range.end, ctx) + .await + { + Ok((desc, path)) => { + match Layer::finish_creating(self.conf, tline, desc, &path) { + Ok(layer) => layer, + Err(e) => { + tokio::fs::remove_file(&path).await.ok(); + clean_up_layers(generated_layers); + return Err(e); + } + } + } + Err(e) => { + // ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong, + // so we don't need to remove it by ourselves. + clean_up_layers(generated_layers); + return Err(e); + } + }; + generated_layers.push(SplitWriterResult::Produced(layer)); + } } + // END: catch every error and do the recovery in the above section Ok(generated_layers) } @@ -187,11 +192,6 @@ impl SplitImageLayerWriter { self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) .await } - - /// This function will be deprecated with #8841. - pub(crate) fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { - Ok((self.generated_layers, self.inner)) - } } /// A delta writer that takes key-lsn-values and produces multiple delta layers. @@ -296,8 +296,16 @@ impl SplitDeltaLayerWriter { self.generated_layers .push(SplitWriterResult::Discarded(layer_key)); } else { + // `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary + // files for `finish_creating`. let (desc, path) = prev_delta_writer.finish(key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) { + Ok(layer) => layer, + Err(e) => { + tokio::fs::remove_file(&path).await.ok(); + return Err(e); + } + }; self.generated_layers .push(SplitWriterResult::Produced(delta_layer)); } @@ -357,8 +365,16 @@ impl SplitDeltaLayerWriter { if discard(&layer_key).await { generated_layers.push(SplitWriterResult::Discarded(layer_key)); } else { + // `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary + // files for `finish_creating`. let (desc, path) = inner.finish(end_key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) { + Ok(layer) => layer, + Err(e) => { + tokio::fs::remove_file(&path).await.ok(); + return Err(e); + } + }; generated_layers.push(SplitWriterResult::Produced(delta_layer)); } Ok(generated_layers) @@ -447,7 +463,7 @@ mod tests { .unwrap(); image_writer - .put_image(get_key(0), get_img(0), &tline, &ctx) + .put_image(get_key(0), get_img(0), &ctx) .await .unwrap(); let layers = image_writer @@ -486,14 +502,18 @@ mod tests { #[tokio::test] async fn write_split() { + // Test the split writer with retaining all the layers we have produced (discard=false) write_split_helper("split_writer_write_split", false).await; } #[tokio::test] async fn write_split_discard() { - write_split_helper("split_writer_write_split_discard", false).await; + // Test the split writer with discarding all the layers we have produced (discard=true) + write_split_helper("split_writer_write_split_discard", true).await; } + /// Test the image+delta writer by writing a large number of images and deltas. If discard is + /// set to true, all layers will be discarded. async fn write_split_helper(harness_name: &'static str, discard: bool) { let harness = TenantHarness::create(harness_name).await.unwrap(); let (tenant, ctx) = harness.load().await; @@ -527,9 +547,7 @@ mod tests { for i in 0..N { let i = i as u32; image_writer - .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async { - discard - }) + .put_image(get_key(i), get_large_img(), &ctx) .await .unwrap(); delta_writer @@ -545,51 +563,54 @@ mod tests { .unwrap(); } let image_layers = image_writer - .finish(&tline, &ctx, get_key(N as u32)) + .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard }) .await .unwrap(); - let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap(); - if discard { - for layer in image_layers { - layer.into_discarded_layer(); - } - for layer in delta_layers { - layer.into_discarded_layer(); - } - } else { - let image_layers = image_layers - .into_iter() - .map(|x| x.into_resident_layer()) - .collect_vec(); - let delta_layers = delta_layers - .into_iter() - .map(|x| x.into_resident_layer()) - .collect_vec(); - assert_eq!(image_layers.len(), N / 512 + 1); - assert_eq!(delta_layers.len(), N / 512 + 1); - assert_eq!( - delta_layers.first().unwrap().layer_desc().key_range.start, - get_key(0) - ); - assert_eq!( - delta_layers.last().unwrap().layer_desc().key_range.end, - get_key(N as u32) - ); - for idx in 0..image_layers.len() { - assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); - assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX); - if idx > 0 { - assert_eq!( - image_layers[idx - 1].layer_desc().key_range.end, - image_layers[idx].layer_desc().key_range.start - ); - assert_eq!( - delta_layers[idx - 1].layer_desc().key_range.end, - delta_layers[idx].layer_desc().key_range.start - ); + let delta_layers = delta_writer + .finish_with_discard_fn(&tline, &ctx, |_| async { discard }) + .await + .unwrap(); + let image_layers = image_layers + .into_iter() + .map(|x| { + if discard { + x.into_discarded_layer() + } else { + x.into_resident_layer().layer_desc().key() + } + }) + .collect_vec(); + let delta_layers = delta_layers + .into_iter() + .map(|x| { + if discard { + x.into_discarded_layer() + } else { + x.into_resident_layer().layer_desc().key() } + }) + .collect_vec(); + assert_eq!(image_layers.len(), N / 512 + 1); + assert_eq!(delta_layers.len(), N / 512 + 1); + assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0)); + assert_eq!( + delta_layers.last().unwrap().key_range.end, + get_key(N as u32) + ); + for idx in 0..image_layers.len() { + assert_ne!(image_layers[idx].key_range.start, Key::MIN); + assert_ne!(image_layers[idx].key_range.end, Key::MAX); + assert_ne!(delta_layers[idx].key_range.start, Key::MIN); + assert_ne!(delta_layers[idx].key_range.end, Key::MAX); + if idx > 0 { + assert_eq!( + image_layers[idx - 1].key_range.end, + image_layers[idx].key_range.start + ); + assert_eq!( + delta_layers[idx - 1].key_range.end, + delta_layers[idx].key_range.start + ); } } } @@ -629,11 +650,11 @@ mod tests { .unwrap(); image_writer - .put_image(get_key(0), get_img(0), &tline, &ctx) + .put_image(get_key(0), get_img(0), &ctx) .await .unwrap(); image_writer - .put_image(get_key(1), get_large_img(), &tline, &ctx) + .put_image(get_key(1), get_large_img(), &ctx) .await .unwrap(); let layers = image_writer diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 5588363330d0..5cb1460b297e 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -141,9 +141,7 @@ impl KeyHistoryRetention { }; stat.produce_image_key(img); if let Some(image_writer) = image_writer.as_mut() { - image_writer - .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard) - .await?; + image_writer.put_image(key, img.clone(), ctx).await?; } else { delta_writer .put_value_with_discard_fn( @@ -2041,8 +2039,7 @@ impl Timeline { .finish_with_discard_fn(self, ctx, Key::MAX, discard) .await? } else { - let (layers, _) = writer.take()?; - assert!(layers.is_empty(), "image layers produced in dry run mode?"); + drop(writer); Vec::new() } } else {