From 1378e8573f55a478f221ae42e377690624deda68 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 18 Sep 2024 11:07:11 -0400 Subject: [PATCH] fix(pageserver): make delta split layer writer finish atomic --- .../src/tenant/storage_layer/split_writer.rs | 136 +++++++----------- pageserver/src/tenant/timeline/compaction.rs | 34 +---- 2 files changed, 53 insertions(+), 117 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index 468dc9172a9f..7c2c94a0a321 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -196,7 +196,7 @@ impl SplitImageLayerWriter { pub struct SplitDeltaLayerWriter { inner: Option<(Key, DeltaLayerWriter)>, target_layer_size: u64, - generated_layers: Vec, + generated_layer_writers: Vec<(DeltaLayerWriter, PersistentLayerKey)>, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, @@ -215,7 +215,7 @@ impl SplitDeltaLayerWriter { Ok(Self { target_layer_size, inner: None, - generated_layers: Vec::new(), + generated_layer_writers: Vec::new(), conf, timeline_id, tenant_shard_id, @@ -224,20 +224,13 @@ impl SplitDeltaLayerWriter { }) } - /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end. - pub async fn put_value_with_discard_fn( + pub async fn put_value( &mut self, key: Key, lsn: Lsn, val: Value, - tline: &Arc, ctx: &RequestContext, - discard: D, - ) -> anyhow::Result<()> - where - D: FnOnce(&PersistentLayerKey) -> F, - F: Future, - { + ) -> anyhow::Result<()> { // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate // number, and therefore the final layer size could be a little bit larger or smaller than the target. // @@ -281,16 +274,8 @@ impl SplitDeltaLayerWriter { lsn_range: self.lsn_range.clone(), is_delta: true, }; - if discard(&layer_key).await { - drop(prev_delta_writer); - self.generated_layers - .push(SplitWriterResult::Discarded(layer_key)); - } else { - let (desc, path) = prev_delta_writer.finish(key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - self.generated_layers - .push(SplitWriterResult::Produced(delta_layer)); - } + self.generated_layer_writers + .push((prev_delta_writer, layer_key)); } else if inner.estimated_size() >= S3_UPLOAD_LIMIT { // We have to produce a very large file b/c a key is updated too often. anyhow::bail!( @@ -305,55 +290,61 @@ impl SplitDeltaLayerWriter { inner.put_value(key, lsn, val, ctx).await } - pub async fn put_value( - &mut self, - key: Key, - lsn: Lsn, - val: Value, - tline: &Arc, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false }) - .await - } - pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, ctx: &RequestContext, - discard: D, + discard_fn: D, ) -> anyhow::Result> where - D: FnOnce(&PersistentLayerKey) -> F, + D: Fn(&PersistentLayerKey) -> F, F: Future, { let Self { - mut generated_layers, + mut generated_layer_writers, inner, .. } = self; - let Some((start_key, inner)) = inner else { - return Ok(generated_layers); - }; - if inner.num_keys() == 0 { - return Ok(generated_layers); + if let Some((start_key, writer)) = inner { + if writer.num_keys() != 0 { + let end_key = self.last_key_written.next(); + let layer_key = PersistentLayerKey { + key_range: start_key..end_key, + lsn_range: self.lsn_range.clone(), + is_delta: true, + }; + generated_layer_writers.push((writer, layer_key)); + } } - let end_key = self.last_key_written.next(); - let layer_key = PersistentLayerKey { - key_range: start_key..end_key, - lsn_range: self.lsn_range.clone(), - is_delta: true, - }; - if discard(&layer_key).await { - generated_layers.push(SplitWriterResult::Discarded(layer_key)); - } else { - let (desc, path) = inner.finish(end_key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - generated_layers.push(SplitWriterResult::Produced(delta_layer)); + // BEGIN: catch every error and do the recovery in the below section + let mut generated_layers = Vec::new(); + for (inner, layer_key) in generated_layer_writers { + if discard_fn(&layer_key).await { + generated_layers.push(SplitWriterResult::Discarded(layer_key)); + } else { + let layer = match inner.finish(layer_key.key_range.end, ctx).await { + Ok((desc, path)) => Layer::finish_creating(self.conf, tline, desc, &path)?, + Err(e) => { + for produced_layer in generated_layers { + if let SplitWriterResult::Produced(delta_layer) = produced_layer { + let layer: Layer = delta_layer.into(); + layer.delete_on_drop(); + } + } + return Err(e); + } + }; + generated_layers.push(SplitWriterResult::Produced(layer)); + } } + // END: catch every error and do the recovery in the above section Ok(generated_layers) } + pub fn discard(self) { + // The drop handler of each `generated_layer_writers` will clean up the `.tmp` files. + } + #[cfg(test)] pub(crate) async fn finish( self, @@ -363,11 +354,6 @@ impl SplitDeltaLayerWriter { self.finish_with_discard_fn(tline, ctx, |_| async { false }) .await } - - /// This function will be deprecated with #8841. - pub(crate) fn take(self) -> anyhow::Result<(Vec, Option)> { - Ok((self.generated_layers, self.inner.map(|x| x.1))) - } } #[cfg(test)] @@ -447,13 +433,7 @@ mod tests { assert_eq!(layers.len(), 1); delta_writer - .put_value( - get_key(0), - Lsn(0x18), - Value::Image(get_img(0)), - &tline, - &ctx, - ) + .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx) .await .unwrap(); let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); @@ -521,14 +501,7 @@ mod tests { .await .unwrap(); delta_writer - .put_value_with_discard_fn( - get_key(i), - Lsn(0x20), - Value::Image(get_large_img()), - &tline, - &ctx, - |_| async { discard }, - ) + .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx) .await .unwrap(); } @@ -634,23 +607,11 @@ mod tests { assert_eq!(layers.len(), 2); delta_writer - .put_value( - get_key(0), - Lsn(0x18), - Value::Image(get_img(0)), - &tline, - &ctx, - ) + .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx) .await .unwrap(); delta_writer - .put_value( - get_key(1), - Lsn(0x1A), - Value::Image(get_large_img()), - &tline, - &ctx, - ) + .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx) .await .unwrap(); let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); @@ -714,7 +675,6 @@ mod tests { get_key(0), Lsn(i as u64 * 16 + 0x10), Value::Image(get_large_img()), - &tline, &ctx, ) .await diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index cb6c5477271b..aea5ac205d4a 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -118,18 +118,12 @@ impl KeyHistoryRetention { async fn pipe_to( self, key: Key, - tline: &Arc, delta_writer: &mut SplitDeltaLayerWriter, mut image_writer: Option<&mut SplitImageLayerWriter>, stat: &mut CompactionStatistics, - dry_run: bool, ctx: &RequestContext, ) -> anyhow::Result<()> { let mut first_batch = true; - let discard = |key: &PersistentLayerKey| { - let key = key.clone(); - async move { Self::discard_key(&key, tline, dry_run).await } - }; for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon { if first_batch { if logs.len() == 1 && logs[0].1.is_image() { @@ -141,40 +135,27 @@ impl KeyHistoryRetention { image_writer.put_image(key, img.clone(), ctx).await?; } else { delta_writer - .put_value_with_discard_fn( - key, - cutoff_lsn, - Value::Image(img.clone()), - tline, - ctx, - discard, - ) + .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx) .await?; } } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer - .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) - .await?; + delta_writer.put_value(key, lsn, val, ctx).await?; } } first_batch = false; } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer - .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) - .await?; + delta_writer.put_value(key, lsn, val, ctx).await?; } } } let KeyLogAtLsn(above_horizon_logs) = self.above_horizon; for (lsn, val) in above_horizon_logs { stat.produce_key(&val); - delta_writer - .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) - .await?; + delta_writer.put_value(key, lsn, val, ctx).await?; } Ok(()) } @@ -1914,11 +1895,9 @@ impl Timeline { retention .pipe_to( *last_key, - self, &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, - dry_run, ctx, ) .await?; @@ -1945,11 +1924,9 @@ impl Timeline { retention .pipe_to( last_key, - self, &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, - dry_run, ctx, ) .await?; @@ -1977,8 +1954,7 @@ impl Timeline { .finish_with_discard_fn(self, ctx, discard) .await? } else { - let (layers, _) = delta_layer_writer.take()?; - assert!(layers.is_empty(), "delta layers produced in dry run mode?"); + delta_layer_writer.discard(); Vec::new() };