diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 76dc52fa16b9b..62f066862a160 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4007,6 +4007,7 @@ mod tests { use storage_layer::PersistentLayerKey; use tests::storage_layer::ValuesReconstructState; use tests::timeline::{GetVectoredError, ShutdownMode}; + use timeline::GcInfo; use utils::bin_ser::BeSer; use utils::id::TenantId; @@ -6684,49 +6685,48 @@ mod tests { // img layer at 0x10 let img_layer = (0..10) - .map(|id| (get_key(id), test_img(&format!("value {id}@0x10")))) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) .collect_vec(); let delta1 = vec![ - // TODO: we should test a real delta record here, which requires us to add a variant of NeonWalRecord for testing purpose. ( get_key(1), Lsn(0x20), - Value::Image(test_img("value 1@0x20")), + Value::Image(Bytes::from("value 1@0x20")), ), ( get_key(2), Lsn(0x30), - Value::Image(test_img("value 2@0x30")), + Value::Image(Bytes::from("value 2@0x30")), ), ( get_key(3), Lsn(0x40), - Value::Image(test_img("value 3@0x40")), + Value::Image(Bytes::from("value 3@0x40")), ), ]; let delta2 = vec![ ( get_key(5), Lsn(0x20), - Value::Image(test_img("value 5@0x20")), + Value::Image(Bytes::from("value 5@0x20")), ), ( get_key(6), Lsn(0x20), - Value::Image(test_img("value 6@0x20")), + Value::Image(Bytes::from("value 6@0x20")), ), ]; let delta3 = vec![ ( get_key(8), Lsn(0x40), - Value::Image(test_img("value 8@0x40")), + Value::Image(Bytes::from("value 8@0x40")), ), ( get_key(9), Lsn(0x40), - Value::Image(test_img("value 9@0x40")), + Value::Image(Bytes::from("value 9@0x40")), ), ]; @@ -6748,9 +6748,42 @@ mod tests { guard.cutoffs.horizon = Lsn(0x30); } + let expected_result = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x20"), + Bytes::from_static(b"value 2@0x30"), + Bytes::from_static(b"value 3@0x40"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x20"), + Bytes::from_static(b"value 6@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x40"), + Bytes::from_static(b"value 9@0x40"), + ]; + + for (idx, expected) in expected_result.iter().enumerate() { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + expected + ); + } + let cancel = CancellationToken::new(); tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + for (idx, expected) in expected_result.iter().enumerate() { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + expected + ); + } + // Check if the image layer at the GC horizon contains exactly what we want let image_at_gc_horizon = tline .inspect_image_layers(Lsn(0x30), &ctx) @@ -6761,14 +6794,22 @@ mod tests { .collect::>(); assert_eq!(image_at_gc_horizon.len(), 10); - let expected_lsn = [0x10, 0x20, 0x30, 0x10, 0x10, 0x20, 0x20, 0x10, 0x10, 0x10]; + let expected_result = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x20"), + Bytes::from_static(b"value 2@0x30"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x20"), + Bytes::from_static(b"value 6@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; for idx in 0..10 { assert_eq!( image_at_gc_horizon[idx], - ( - get_key(idx as u32), - test_img(&format!("value {idx}@{:#x}", expected_lsn[idx])) - ) + (get_key(idx as u32), expected_result[idx].clone()) ); } @@ -6801,7 +6842,7 @@ mod tests { }, // The delta layer that is cut in the middle PersistentLayerKey { - key_range: Key::MIN..get_key(9), + key_range: get_key(3)..get_key(4), lsn_range: Lsn(0x30)..Lsn(0x41), is_delta: true }, @@ -6886,6 +6927,9 @@ mod tests { tline.get(get_key(2), Lsn(0x50), &ctx).await?, Bytes::from_static(b"0x10,0x20,0x30") ); + + // Need to remove the limit of "Neon WAL redo requires base image". + // assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new()); // assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new()); @@ -6980,4 +7024,164 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_simple_bottom_most_compaction_deltas() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas")?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + // using aux key here b/c they are guaranteed to be inside `collect_keyspace`. + let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + // We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon. + // + // | D1 | | D3 | + // -| |-- gc horizon ----------------- + // | | | D2 | + // --------- img layer ------------------ + // + // What we should expact from this compaction is: + // | Part of D1 | | D3 | + // --------- img layer with D1+D2 at GC horizon------------------ + + // img layer at 0x10 + let img_layer = (0..10) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) + .collect_vec(); + + let delta1 = vec![ + ( + get_key(1), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ( + get_key(2), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(3), + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append("@0x40")), + ), + ]; + let delta2 = vec![ + ( + get_key(5), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ( + get_key(6), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ]; + let delta3 = vec![ + ( + get_key(8), + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append("@0x40")), + ), + ( + get_key(9), + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append("@0x40")), + ), + ]; + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![delta1, delta2, delta3], // delta layers + vec![(Lsn(0x10), img_layer)], // image layers + Lsn(0x50), + ) + .await?; + { + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![], + cutoffs: GcCutoffs { + pitr: Lsn(0x30), + horizon: Lsn(0x30), + }, + leases: Default::default(), + }; + } + + let expected_result = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10@0x30"), + Bytes::from_static(b"value 3@0x10@0x40"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10@0x20"), + Bytes::from_static(b"value 6@0x10@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10@0x40"), + Bytes::from_static(b"value 9@0x10@0x40"), + ]; + + let expected_result_at_gc_horizon = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10@0x30"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10@0x20"), + Bytes::from_static(b"value 6@0x10@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + for idx in 0..10 { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + &expected_result[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x30), &ctx) + .await + .unwrap(), + &expected_result_at_gc_horizon[idx] + ); + } + + let cancel = CancellationToken::new(); + tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + + for idx in 0..10 { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + &expected_result[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x30), &ctx) + .await + .unwrap(), + &expected_result_at_gc_horizon[idx] + ); + } + + Ok(()) + } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 8a95029f33a6a..de1263fadf963 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -965,6 +965,8 @@ impl Timeline { _cancel: &CancellationToken, ctx: &RequestContext, ) -> Result<(), CompactionError> { + use std::collections::BTreeSet; + use crate::tenant::storage_layer::ValueReconstructState; // Step 0: pick all delta layers + image layers below/intersect with the GC horizon. // The layer selection has the following properties: @@ -986,20 +988,30 @@ impl Timeline { (selected_layers, gc_cutoff) }; // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs. + // Also, collect the layer information to decide when to split the new delta layers. let mut all_key_values = Vec::new(); + let mut delta_split_points = BTreeSet::new(); for layer in &layer_selection { all_key_values.extend(layer.load_key_values(ctx).await?); + let desc = layer.layer_desc(); + if desc.is_delta() { + // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon) + // so that we can avoid having too many small delta layers. + let key_range = desc.get_key_range(); + delta_split_points.insert(key_range.start); + delta_split_points.insert(key_range.end); + } } // Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and - // image layers, make image appear later than delta. + // image layers, make image appear before than delta. struct ValueWrapper<'a>(&'a crate::repository::Value); impl Ord for ValueWrapper<'_> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { use crate::repository::Value; use std::cmp::Ordering; match (self.0, other.0) { - (Value::Image(_), Value::WalRecord(_)) => Ordering::Greater, - (Value::WalRecord(_), Value::Image(_)) => Ordering::Less, + (Value::Image(_), Value::WalRecord(_)) => Ordering::Less, + (Value::WalRecord(_), Value::Image(_)) => Ordering::Greater, _ => Ordering::Equal, } } @@ -1018,13 +1030,6 @@ impl Timeline { all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| { (k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2))) }); - let max_lsn = all_key_values - .iter() - .map(|(_, lsn, _)| lsn) - .max() - .copied() - .unwrap() - + 1; // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas. // Data of the same key. let mut accumulated_values = Vec::new(); @@ -1043,7 +1048,19 @@ impl Timeline { // We have a list of deltas/images. We want to create image layers while collect garbages. for (key, lsn, val) in accumulated_values.iter().rev() { if *lsn > horizon { - keys_above_horizon.push((*key, *lsn, val.clone())); // TODO: ensure one LSN corresponds to either delta or image instead of both + if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() { + if *prev_lsn == *lsn { + // The case that we have an LSN with both data from the delta layer and the image layer. As + // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply + // drop this delta and keep the image. + // + // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will + // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply + // dropped. + continue; + } + } + keys_above_horizon.push((*key, *lsn, val.clone())); } else if *lsn <= horizon { match val { crate::repository::Value::Image(image) => { @@ -1068,15 +1085,59 @@ impl Timeline { Ok((keys_above_horizon, img)) } - let mut delta_layer_writer = DeltaLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_shard_id, - all_key_values.first().unwrap().0, - gc_cutoff..max_lsn, // TODO: off by one? - ctx, - ) - .await?; + async fn flush_deltas( + deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>, + last_key: Key, + delta_split_points: &[Key], + current_delta_split_point: &mut usize, + tline: &Arc, + gc_cutoff: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result> { + // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid + // overlapping layers. + // + // If we have a structure like this: + // + // | Delta 1 | | Delta 4 | + // |---------| Delta 2 |---------| + // | Delta 3 | | Delta 5 | + // + // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4. + // A simple solution here is to split the delta layers using the original boundary, while this + // might produce a lot of small layers. This should be improved and fixed in the future. + let mut need_split = false; + while *current_delta_split_point < delta_split_points.len() + && last_key >= delta_split_points[*current_delta_split_point] + { + *current_delta_split_point += 1; + need_split = true; + } + if !need_split { + return Ok(None); + } + let deltas = std::mem::take(deltas); + if deltas.is_empty() { + return Ok(None); + } + let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1; + let mut delta_layer_writer = DeltaLayerWriter::new( + tline.conf, + tline.timeline_id, + tline.tenant_shard_id, + deltas.first().unwrap().0, + gc_cutoff..end_lsn, + ctx, + ) + .await?; + let key_end = deltas.last().unwrap().0.next(); + for (key, lsn, val) in deltas { + delta_layer_writer.put_value(key, lsn, val, ctx).await?; + } + let delta_layer = delta_layer_writer.finish(key_end, tline, ctx).await?; + Ok(Some(delta_layer)) + } + let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -1087,6 +1148,10 @@ impl Timeline { ) .await?; + let mut delta_values = Vec::new(); + let delta_split_points = delta_split_points.into_iter().collect_vec(); + let mut current_delta_split_point = 0; + let mut delta_layers = Vec::new(); for item @ (key, _, _) in &all_key_values { if &last_key == key { accumulated_values.push(item); @@ -1094,33 +1159,54 @@ impl Timeline { let (deltas, image) = flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff) .await?; + // Put the image into the image layer. Currently we have a single big layer for the compaction. image_layer_writer.put_image(last_key, image, ctx).await?; - for (key, lsn, val) in deltas { - delta_layer_writer.put_value(key, lsn, val, ctx).await?; - } + delta_values.extend(deltas); + delta_layers.extend( + flush_deltas( + &mut delta_values, + last_key, + &delta_split_points, + &mut current_delta_split_point, + self, + gc_cutoff, + ctx, + ) + .await?, + ); accumulated_values.clear(); accumulated_values.push(item); last_key = *key; } } + + // TODO: move this part to the loop body let (deltas, image) = flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?; + // Put the image into the image layer. Currently we have a single big layer for the compaction. image_layer_writer.put_image(last_key, image, ctx).await?; - for (key, lsn, val) in deltas { - delta_layer_writer.put_value(key, lsn, val, ctx).await?; - } - accumulated_values.clear(); - // TODO: split layers - let delta_layer = delta_layer_writer.finish(last_key, self, ctx).await?; + delta_values.extend(deltas); + delta_layers.extend( + flush_deltas( + &mut delta_values, + last_key, + &delta_split_points, + &mut current_delta_split_point, + self, + gc_cutoff, + ctx, + ) + .await?, + ); + let image_layer = image_layer_writer.finish(self, ctx).await?; + let mut compact_to = Vec::new(); + compact_to.extend(delta_layers); + compact_to.push(image_layer); // Step 3: Place back to the layer map. { let mut guard = self.layers.write().await; - guard.finish_gc_compaction( - &layer_selection, - &[delta_layer.clone(), image_layer.clone()], - &self.metrics, - ) + guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics) }; Ok(()) }