From 4107b8351a2377da1f95896b602458a10160e19d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 6 May 2024 23:06:54 +0200 Subject: [PATCH 1/9] track partitions in accum_key_values --- pageserver/compaction/src/compact_tiered.rs | 3 ++- pageserver/compaction/src/helpers.rs | 23 +++++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 12882c9d59cf..e877e4ee093f 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -544,7 +544,8 @@ where let mut new_jobs = Vec::new(); // Slide a window through the keyspace - let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream)); + let mut key_accum = + std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size)); let mut all_in_window: bool = false; let mut window = Window::new(); loop { diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 06454ee1d050..2c922b0a4990 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -235,9 +235,14 @@ pub struct KeySize { pub key: K, pub num_values: u64, pub size: u64, + /// The lsns to partition at (if empty then no per-lsn partitioning) + pub partition_lsns: Vec<(Lsn, u64)>, } -pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream, E>> +pub fn accum_key_values<'a, I, K, D, E>( + input: I, + target_size: u64, +) -> impl Stream, E>> where K: Eq + PartialOrd + Display + Copy, I: Stream>, @@ -249,25 +254,35 @@ where if let Some(first) = input.next().await { let first = first?; + let mut part_size = first.size(); let mut accum: KeySize = KeySize { key: first.key(), num_values: 1, - size: first.size(), + size: part_size, + partition_lsns: Vec::new(), }; let mut last_key = accum.key; while let Some(this) = input.next().await { let this = this?; if this.key() == accum.key { - accum.size += this.size(); + let add_size = this.size(); + if part_size + add_size > target_size { + accum.partition_lsns.push((this.lsn(), part_size)); + part_size = 0; + } + part_size += add_size; + accum.size += add_size; accum.num_values += 1; } else { assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key); last_key = accum.key; yield accum; + part_size = this.size(); accum = KeySize { key: this.key(), num_values: 1, - size: this.size(), + size: part_size, + partition_lsns: Vec::new(), }; } } From fed85e6f0e317e8d1e62d6c4a4e1d8f8d4187229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 7 May 2024 01:11:10 +0200 Subject: [PATCH 2/9] Cut deltas along lsn as well if needed --- pageserver/compaction/src/compact_tiered.rs | 78 ++++++++++++++++++--- 1 file changed, 67 insertions(+), 11 deletions(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index e877e4ee093f..8f13a95a184b 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -548,13 +548,9 @@ where std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size)); let mut all_in_window: bool = false; let mut window = Window::new(); - loop { - if all_in_window && window.elems.is_empty() { - // All done! - break; - } - if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window) - { + + let drain_window = |window: &mut Window<_>, new_jobs: &mut Vec<_>, has_more: bool| { + while let Some(key_range) = window.choose_next_delta(self.target_file_size, has_more) { let batch_layers: Vec = job .input_layers .iter() @@ -571,13 +567,69 @@ where input_layers: batch_layers, completed: false, }); - } else { - assert!(!all_in_window); - if let Some(next_key) = key_accum.next().await.transpose()? { + } + }; + loop { + if all_in_window && window.elems.is_empty() { + // All done! + break; + } + drain_window(&mut window, &mut new_jobs, !all_in_window); + assert!(!all_in_window); + if let Some(next_key) = key_accum.next().await.transpose()? { + if next_key.partition_lsns.is_empty() { + // Normal case: extend the window by the key window.feed(next_key.key, next_key.size); } else { - all_in_window = true; + // We have a key with too large size impact for a single delta layer. + // Therefore, drain window with has_more = false to make a clean cut + // before the key, and then make dedicated delta layers for the + // single key. + // We cannot cluster the key with the others, because + // layer files are not allowed to overlap with each other in + // the lsn,key space (no overlaps allowed for the rectangles). + let key = next_key.key; + if !window.is_empty() { + let has_more = false; + drain_window(&mut window, &mut new_jobs, has_more); + } + assert!(window.is_empty()); + + // Not really required: but here for future resilience: + // We make a "gap" here, so any structure the window holds should + // probably be reset. + window = Window::new(); + + let mut prior_lsn = job.lsn_range.start; + let mut lsn_ranges = Vec::new(); + for (lsn, _size) in next_key.partition_lsns { + lsn_ranges.push(prior_lsn..lsn); + prior_lsn = lsn; + } + lsn_ranges.push(prior_lsn..job.lsn_range.end); + for lsn_range in lsn_ranges { + let key_range = key..key.next(); + let batch_layers: Vec = job + .input_layers + .iter() + .filter(|layer_id| { + let layer = &self.layers[layer_id.0].layer; + layer.key_range().contains(&key) + && overlaps_with(layer.lsn_range(), &lsn_range) + }) + .cloned() + .collect(); + new_jobs.push(CompactionJob { + key_range, + lsn_range: job.lsn_range.clone(), + strategy: CompactionStrategy::CreateDelta, + input_layers: batch_layers, + completed: false, + }); + } } + } else { + all_in_window = true; } } @@ -798,6 +850,10 @@ where self.elems.front().unwrap().accum_size - self.splitoff_size } + fn is_empty(&self) -> bool { + self.elems.is_empty() + } + fn commit_upto(&mut self, mut upto: usize) { while upto > 1 { let popped = self.elems.pop_front().unwrap(); From 2cf64a6094030ab954e096665a23d92fb65aa259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 9 May 2024 18:00:01 +0200 Subject: [PATCH 3/9] Schedule jobs to cut in lsn dimension --- pageserver/compaction/src/compact_tiered.rs | 115 ++++++++++---------- 1 file changed, 59 insertions(+), 56 deletions(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 8f13a95a184b..3377ca68e89c 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -524,8 +524,6 @@ where // If we have accumulated only a narrow band of keyspace, create an // image layer. Otherwise write a delta layer. - // FIXME: deal with the case of lots of values for same key - // FIXME: we are ignoring images here. Did we already divide the work // so that we won't encounter them here? @@ -550,7 +548,7 @@ where let mut window = Window::new(); let drain_window = |window: &mut Window<_>, new_jobs: &mut Vec<_>, has_more: bool| { - while let Some(key_range) = window.choose_next_delta(self.target_file_size, has_more) { + if let Some(key_range) = window.choose_next_delta(self.target_file_size, has_more) { let batch_layers: Vec = job .input_layers .iter() @@ -567,70 +565,75 @@ where input_layers: batch_layers, completed: false, }); + true + } else { + false } }; - loop { - if all_in_window && window.elems.is_empty() { + 'outer: loop { + if all_in_window && window.is_empty() { // All done! break; } - drain_window(&mut window, &mut new_jobs, !all_in_window); + if drain_window(&mut window, &mut new_jobs, !all_in_window) { + continue; + } assert!(!all_in_window); - if let Some(next_key) = key_accum.next().await.transpose()? { + while let Some(next_key) = key_accum.next().await.transpose()? { if next_key.partition_lsns.is_empty() { // Normal case: extend the window by the key window.feed(next_key.key, next_key.size); - } else { - // We have a key with too large size impact for a single delta layer. - // Therefore, drain window with has_more = false to make a clean cut - // before the key, and then make dedicated delta layers for the - // single key. - // We cannot cluster the key with the others, because - // layer files are not allowed to overlap with each other in - // the lsn,key space (no overlaps allowed for the rectangles). - let key = next_key.key; - if !window.is_empty() { - let has_more = false; - drain_window(&mut window, &mut new_jobs, has_more); - } - assert!(window.is_empty()); - - // Not really required: but here for future resilience: - // We make a "gap" here, so any structure the window holds should - // probably be reset. - window = Window::new(); - - let mut prior_lsn = job.lsn_range.start; - let mut lsn_ranges = Vec::new(); - for (lsn, _size) in next_key.partition_lsns { - lsn_ranges.push(prior_lsn..lsn); - prior_lsn = lsn; - } - lsn_ranges.push(prior_lsn..job.lsn_range.end); - for lsn_range in lsn_ranges { - let key_range = key..key.next(); - let batch_layers: Vec = job - .input_layers - .iter() - .filter(|layer_id| { - let layer = &self.layers[layer_id.0].layer; - layer.key_range().contains(&key) - && overlaps_with(layer.lsn_range(), &lsn_range) - }) - .cloned() - .collect(); - new_jobs.push(CompactionJob { - key_range, - lsn_range: job.lsn_range.clone(), - strategy: CompactionStrategy::CreateDelta, - input_layers: batch_layers, - completed: false, - }); - } + continue 'outer; + } + // We have a key with too large size impact for a single delta layer. + // Therefore, drain window with has_more = false to make a clean cut + // before the key, and then make dedicated delta layers for the + // single key. + // We cannot cluster the key with the others, because + // layer files are not allowed to overlap with each other in + // the lsn,key space (no overlaps allowed for the rectangles). + let key = next_key.key; + debug!("key {key} with size impact larger than the layer size"); + if !window.is_empty() { + let has_more = false; + drain_window(&mut window, &mut new_jobs, has_more); + } + assert!(window.is_empty()); + + // Not really required: but here for future resilience: + // We make a "gap" here, so any structure the window holds should + // probably be reset. + window = Window::new(); + + let mut prior_lsn = job.lsn_range.start; + let mut lsn_ranges = Vec::new(); + for (lsn, _size) in next_key.partition_lsns.iter() { + lsn_ranges.push(prior_lsn..*lsn); + prior_lsn = *lsn; + } + lsn_ranges.push(prior_lsn..job.lsn_range.end); + for lsn_range in lsn_ranges { + let key_range = key..key.next(); + let batch_layers: Vec = job + .input_layers + .iter() + .filter(|layer_id| { + let layer = &self.layers[layer_id.0].layer; + layer.key_range().contains(&key) + && overlaps_with(layer.lsn_range(), &lsn_range) + }) + .cloned() + .collect(); + new_jobs.push(CompactionJob { + key_range, + lsn_range: lsn_range, + strategy: CompactionStrategy::CreateDelta, + input_layers: batch_layers, + completed: false, + }); } - } else { - all_in_window = true; } + all_in_window = true; } // All the input files are rewritten. Set up the tracking for when they can From cb18f113eaa09c5fa4c5fc4d475ae3469993fc66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 10 May 2024 02:21:09 +0200 Subject: [PATCH 4/9] Sanity check for emptiness, not half the target size --- pageserver/compaction/tests/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/compaction/tests/tests.rs b/pageserver/compaction/tests/tests.rs index 7aa20e6863b4..e2603f22aca8 100644 --- a/pageserver/compaction/tests/tests.rs +++ b/pageserver/compaction/tests/tests.rs @@ -43,9 +43,9 @@ async fn test_many_updates_for_single_key() { } for l in executor.live_layers.iter() { assert!(l.file_size() < executor.target_file_size * 2); - // sanity check that none of the delta layers are stupidly small either + // Sanity check that none of the delta layers are empty either. if l.is_delta() { - assert!(l.file_size() > executor.target_file_size / 2); + assert!(l.file_size() > 0); } } } From ed90183611b15dd7b26ef97f684c0ba647432c70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 10 May 2024 01:51:09 +0200 Subject: [PATCH 5/9] Ignore single-key delta layers for depth calculation --- pageserver/compaction/src/identify_levels.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pageserver/compaction/src/identify_levels.rs b/pageserver/compaction/src/identify_levels.rs index 98dd46925ca9..1853afffdd9d 100644 --- a/pageserver/compaction/src/identify_levels.rs +++ b/pageserver/compaction/src/identify_levels.rs @@ -184,6 +184,12 @@ impl Level { } let mut events: Vec> = Vec::new(); for (idx, l) in self.layers.iter().enumerate() { + let key_range = l.key_range(); + if key_range.end == key_range.start.next() && l.is_delta() { + // Ignore single-key delta layers as they can be stacked on top of each other + // as that is the only way to cut further. + continue; + } events.push(Event { key: l.key_range().start, layer_idx: idx, From 00942abbd3cf9fe4008cadfdeb0ca7d493150fcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 7 May 2024 00:36:15 +0200 Subject: [PATCH 6/9] Unignore the test --- pageserver/compaction/tests/tests.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pageserver/compaction/tests/tests.rs b/pageserver/compaction/tests/tests.rs index e2603f22aca8..bd8b54a286d8 100644 --- a/pageserver/compaction/tests/tests.rs +++ b/pageserver/compaction/tests/tests.rs @@ -20,10 +20,6 @@ pub(crate) fn setup_logging() { /// even if we produce an extremely narrow delta layer, spanning just that one /// key, we still too many records to fit in the target file size. We need to /// split in the LSN dimension too in that case. -/// -/// TODO: The code to avoid this problem has not been implemented yet! So the -/// assertion currently fails, but we need to make it not fail. -#[ignore] #[tokio::test] async fn test_many_updates_for_single_key() { setup_logging(); From d529aa5327b5d40b15c8ec85a78712c28a45e685 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 10 May 2024 16:41:19 +0200 Subject: [PATCH 7/9] clippy --- pageserver/compaction/src/compact_tiered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 3377ca68e89c..359339afca29 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -626,7 +626,7 @@ where .collect(); new_jobs.push(CompactionJob { key_range, - lsn_range: lsn_range, + lsn_range, strategy: CompactionStrategy::CreateDelta, input_layers: batch_layers, completed: false, From 0b0fbf3ca4315c18e1ef5b0ae05e1a445b217090 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 13 May 2024 18:41:54 +0300 Subject: [PATCH 8/9] Some refactoring for PR #7671, for readability (#7724) - Replace 'drain_window' closure with 'create_delta_job'. That can then also be used in the loop that creates the delta jobs for the single key - Use a match-statement to handle the three cases: end of keyspace, "normal case", and single large key --- pageserver/compaction/src/compact_tiered.rs | 144 ++++++++++---------- 1 file changed, 70 insertions(+), 74 deletions(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 35855bcd5a50..1b39bb40fe44 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -553,93 +553,89 @@ where let mut all_in_window: bool = false; let mut window = Window::new(); - let drain_window = |window: &mut Window<_>, new_jobs: &mut Vec<_>, has_more: bool| { - if let Some(key_range) = window.choose_next_delta(self.target_file_size, has_more) { - let batch_layers: Vec = job - .input_layers - .iter() - .filter(|layer_id| { - overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range) - }) - .cloned() - .collect(); - assert!(!batch_layers.is_empty()); - new_jobs.push(CompactionJob { - key_range, - lsn_range: job.lsn_range.clone(), - strategy: CompactionStrategy::CreateDelta, - input_layers: batch_layers, - completed: false, - }); - true - } else { - false - } + // Helper function to create a job for a new delta layer with given key-lsn + // rectangle. + let create_delta_job = |key_range, lsn_range: &Range, new_jobs: &mut Vec<_>| { + // The inputs for the job are all the input layers of the original job that + // overlap with the rectangle. + let batch_layers: Vec = job + .input_layers + .iter() + .filter(|layer_id| { + overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range) + }) + .cloned() + .collect(); + assert!(!batch_layers.is_empty()); + new_jobs.push(CompactionJob { + key_range, + lsn_range: lsn_range.clone(), + strategy: CompactionStrategy::CreateDelta, + input_layers: batch_layers, + completed: false, + }); }; - 'outer: loop { + + loop { if all_in_window && window.is_empty() { // All done! break; } - if drain_window(&mut window, &mut new_jobs, !all_in_window) { + + // If we now have enough keyspace for next delta layer in the window, create a + // new delta layer + if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window) { + create_delta_job(key_range, &job.lsn_range, &mut new_jobs); continue; } assert!(!all_in_window); - while let Some(next_key) = key_accum.next().await.transpose()? { - if next_key.partition_lsns.is_empty() { + + // Process next key in the key space + match key_accum.next().await.transpose()? { + None => { + all_in_window = true; + } + Some(next_key) if next_key.partition_lsns.is_empty() => { // Normal case: extend the window by the key window.feed(next_key.key, next_key.size); - continue 'outer; - } - // We have a key with too large size impact for a single delta layer. - // Therefore, drain window with has_more = false to make a clean cut - // before the key, and then make dedicated delta layers for the - // single key. - // We cannot cluster the key with the others, because - // layer files are not allowed to overlap with each other in - // the lsn,key space (no overlaps allowed for the rectangles). - let key = next_key.key; - debug!("key {key} with size impact larger than the layer size"); - if !window.is_empty() { - let has_more = false; - drain_window(&mut window, &mut new_jobs, has_more); } - assert!(window.is_empty()); - - // Not really required: but here for future resilience: - // We make a "gap" here, so any structure the window holds should - // probably be reset. - window = Window::new(); - - let mut prior_lsn = job.lsn_range.start; - let mut lsn_ranges = Vec::new(); - for (lsn, _size) in next_key.partition_lsns.iter() { - lsn_ranges.push(prior_lsn..*lsn); - prior_lsn = *lsn; - } - lsn_ranges.push(prior_lsn..job.lsn_range.end); - for lsn_range in lsn_ranges { - let key_range = key..key.next(); - let batch_layers: Vec = job - .input_layers - .iter() - .filter(|layer_id| { - let layer = &self.layers[layer_id.0].layer; - layer.key_range().contains(&key) - && overlaps_with(layer.lsn_range(), &lsn_range) - }) - .cloned() - .collect(); - new_jobs.push(CompactionJob { - key_range, - lsn_range, - strategy: CompactionStrategy::CreateDelta, - input_layers: batch_layers, - completed: false, - }); + Some(next_key) => { + // A key with too large size impact for a single delta layer. This + // case occurs if you make a huge number of updates for a single key. + // + // Drain the window with has_more = false to make a clean cut before + // the key, and then make dedicated delta layers for the single key. + // + // We cannot cluster the key with the others, because layer files are + // not allowed to overlap with each other in the lsn,key space (no + // overlaps allowed for the rectangles). + let key = next_key.key; + debug!("key {key} with size impact larger than the layer size"); + while !window.is_empty() { + let has_more = false; + let key_range = window.choose_next_delta(self.target_file_size, has_more) + .expect("with has_more==false, choose_next_delta always returns something for a non-empty Window"); + create_delta_job(key_range, &job.lsn_range, &mut new_jobs); + } + + // Not really required: but here for future resilience: + // We make a "gap" here, so any structure the window holds should + // probably be reset. + window = Window::new(); + + let mut prior_lsn = job.lsn_range.start; + let mut lsn_ranges = Vec::new(); + for (lsn, _size) in next_key.partition_lsns.iter() { + lsn_ranges.push(prior_lsn..*lsn); + prior_lsn = *lsn; + } + lsn_ranges.push(prior_lsn..job.lsn_range.end); + for lsn_range in lsn_ranges { + let key_range = key..key.next(); + create_delta_job(key_range, &lsn_range, &mut new_jobs); + } } } - all_in_window = true; } // All the input files are rewritten. Set up the tracking for when they can From ac5d72a4183376930c4582cf8a2e49f241aecf17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 13 May 2024 18:08:45 +0200 Subject: [PATCH 9/9] fmt and adjust comment --- pageserver/compaction/src/compact_tiered.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 1b39bb40fe44..a8f184af245a 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -584,7 +584,8 @@ where // If we now have enough keyspace for next delta layer in the window, create a // new delta layer - if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window) { + if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window) + { create_delta_job(key_range, &job.lsn_range, &mut new_jobs); continue; } @@ -606,9 +607,9 @@ where // Drain the window with has_more = false to make a clean cut before // the key, and then make dedicated delta layers for the single key. // - // We cannot cluster the key with the others, because layer files are - // not allowed to overlap with each other in the lsn,key space (no - // overlaps allowed for the rectangles). + // We cannot cluster the key with the others, because we don't want + // layer files to overlap with each other in the lsn,key space (no + // overlaps for the rectangles). let key = next_key.key; debug!("key {key} with size impact larger than the layer size"); while !window.is_empty() {