diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 35855bcd5a50..1b39bb40fe44 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -553,93 +553,89 @@ where let mut all_in_window: bool = false; let mut window = Window::new(); - let drain_window = |window: &mut Window<_>, new_jobs: &mut Vec<_>, has_more: bool| { - if let Some(key_range) = window.choose_next_delta(self.target_file_size, has_more) { - let batch_layers: Vec = job - .input_layers - .iter() - .filter(|layer_id| { - overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range) - }) - .cloned() - .collect(); - assert!(!batch_layers.is_empty()); - new_jobs.push(CompactionJob { - key_range, - lsn_range: job.lsn_range.clone(), - strategy: CompactionStrategy::CreateDelta, - input_layers: batch_layers, - completed: false, - }); - true - } else { - false - } + // Helper function to create a job for a new delta layer with given key-lsn + // rectangle. + let create_delta_job = |key_range, lsn_range: &Range, new_jobs: &mut Vec<_>| { + // The inputs for the job are all the input layers of the original job that + // overlap with the rectangle. + let batch_layers: Vec = job + .input_layers + .iter() + .filter(|layer_id| { + overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range) + }) + .cloned() + .collect(); + assert!(!batch_layers.is_empty()); + new_jobs.push(CompactionJob { + key_range, + lsn_range: lsn_range.clone(), + strategy: CompactionStrategy::CreateDelta, + input_layers: batch_layers, + completed: false, + }); }; - 'outer: loop { + + loop { if all_in_window && window.is_empty() { // All done! break; } - if drain_window(&mut window, &mut new_jobs, !all_in_window) { + + // If we now have enough keyspace for next delta layer in the window, create a + // new delta layer + if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window) { + create_delta_job(key_range, &job.lsn_range, &mut new_jobs); continue; } assert!(!all_in_window); - while let Some(next_key) = key_accum.next().await.transpose()? { - if next_key.partition_lsns.is_empty() { + + // Process next key in the key space + match key_accum.next().await.transpose()? { + None => { + all_in_window = true; + } + Some(next_key) if next_key.partition_lsns.is_empty() => { // Normal case: extend the window by the key window.feed(next_key.key, next_key.size); - continue 'outer; - } - // We have a key with too large size impact for a single delta layer. - // Therefore, drain window with has_more = false to make a clean cut - // before the key, and then make dedicated delta layers for the - // single key. - // We cannot cluster the key with the others, because - // layer files are not allowed to overlap with each other in - // the lsn,key space (no overlaps allowed for the rectangles). - let key = next_key.key; - debug!("key {key} with size impact larger than the layer size"); - if !window.is_empty() { - let has_more = false; - drain_window(&mut window, &mut new_jobs, has_more); } - assert!(window.is_empty()); - - // Not really required: but here for future resilience: - // We make a "gap" here, so any structure the window holds should - // probably be reset. - window = Window::new(); - - let mut prior_lsn = job.lsn_range.start; - let mut lsn_ranges = Vec::new(); - for (lsn, _size) in next_key.partition_lsns.iter() { - lsn_ranges.push(prior_lsn..*lsn); - prior_lsn = *lsn; - } - lsn_ranges.push(prior_lsn..job.lsn_range.end); - for lsn_range in lsn_ranges { - let key_range = key..key.next(); - let batch_layers: Vec = job - .input_layers - .iter() - .filter(|layer_id| { - let layer = &self.layers[layer_id.0].layer; - layer.key_range().contains(&key) - && overlaps_with(layer.lsn_range(), &lsn_range) - }) - .cloned() - .collect(); - new_jobs.push(CompactionJob { - key_range, - lsn_range, - strategy: CompactionStrategy::CreateDelta, - input_layers: batch_layers, - completed: false, - }); + Some(next_key) => { + // A key with too large size impact for a single delta layer. This + // case occurs if you make a huge number of updates for a single key. + // + // Drain the window with has_more = false to make a clean cut before + // the key, and then make dedicated delta layers for the single key. + // + // We cannot cluster the key with the others, because layer files are + // not allowed to overlap with each other in the lsn,key space (no + // overlaps allowed for the rectangles). + let key = next_key.key; + debug!("key {key} with size impact larger than the layer size"); + while !window.is_empty() { + let has_more = false; + let key_range = window.choose_next_delta(self.target_file_size, has_more) + .expect("with has_more==false, choose_next_delta always returns something for a non-empty Window"); + create_delta_job(key_range, &job.lsn_range, &mut new_jobs); + } + + // Not really required: but here for future resilience: + // We make a "gap" here, so any structure the window holds should + // probably be reset. + window = Window::new(); + + let mut prior_lsn = job.lsn_range.start; + let mut lsn_ranges = Vec::new(); + for (lsn, _size) in next_key.partition_lsns.iter() { + lsn_ranges.push(prior_lsn..*lsn); + prior_lsn = *lsn; + } + lsn_ranges.push(prior_lsn..job.lsn_range.end); + for lsn_range in lsn_ranges { + let key_range = key..key.next(); + create_delta_job(key_range, &lsn_range, &mut new_jobs); + } } } - all_in_window = true; } // All the input files are rewritten. Set up the tracking for when they can