Skip to content

Commit

Permalink
Some refactoring for PR #7671, for readability
Browse files Browse the repository at this point in the history
- Replace 'drain_window' closure with 'create_delta_job'. That can
  then also be used in the loop that creates the delta jobs for the
  single key

- Use a match-statement to handle the three cases: end of keyspace,
  "normal case", and single large key
  • Loading branch information
hlinnaka committed May 13, 2024
1 parent 764d964 commit 9cbdde7
Showing 1 changed file with 70 additions and 74 deletions.
144 changes: 70 additions & 74 deletions pageserver/compaction/src/compact_tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,93 +553,89 @@ where
let mut all_in_window: bool = false;
let mut window = Window::new();

let drain_window = |window: &mut Window<_>, new_jobs: &mut Vec<_>, has_more: bool| {
if let Some(key_range) = window.choose_next_delta(self.target_file_size, has_more) {
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
})
.cloned()
.collect();
assert!(!batch_layers.is_empty());
new_jobs.push(CompactionJob {
key_range,
lsn_range: job.lsn_range.clone(),
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
true
} else {
false
}
// Helper function to create a job for a new delta layer with given key-lsn
// rectangle.
let create_delta_job = |key_range, lsn_range: &Range<Lsn>, new_jobs: &mut Vec<_>| {
// The inputs for the job are all the input layers of the original job that
// overlap with the rectangle.
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
})
.cloned()
.collect();
assert!(!batch_layers.is_empty());
new_jobs.push(CompactionJob {
key_range,
lsn_range: lsn_range.clone(),
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
};
'outer: loop {

loop {
if all_in_window && window.is_empty() {
// All done!
break;
}
if drain_window(&mut window, &mut new_jobs, !all_in_window) {

// If we now have enough keyspace for next delta layer in the window, create a
// new delta layer
if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window) {
create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
continue;
}
assert!(!all_in_window);
while let Some(next_key) = key_accum.next().await.transpose()? {
if next_key.partition_lsns.is_empty() {

// Process next key in the key space
match key_accum.next().await.transpose()? {
None => {
all_in_window = true;
}
Some(next_key) if next_key.partition_lsns.is_empty() => {
// Normal case: extend the window by the key
window.feed(next_key.key, next_key.size);
continue 'outer;
}
// We have a key with too large size impact for a single delta layer.
// Therefore, drain window with has_more = false to make a clean cut
// before the key, and then make dedicated delta layers for the
// single key.
// We cannot cluster the key with the others, because
// layer files are not allowed to overlap with each other in
// the lsn,key space (no overlaps allowed for the rectangles).
let key = next_key.key;
debug!("key {key} with size impact larger than the layer size");
if !window.is_empty() {
let has_more = false;
drain_window(&mut window, &mut new_jobs, has_more);
}
assert!(window.is_empty());

// Not really required: but here for future resilience:
// We make a "gap" here, so any structure the window holds should
// probably be reset.
window = Window::new();

let mut prior_lsn = job.lsn_range.start;
let mut lsn_ranges = Vec::new();
for (lsn, _size) in next_key.partition_lsns.iter() {
lsn_ranges.push(prior_lsn..*lsn);
prior_lsn = *lsn;
}
lsn_ranges.push(prior_lsn..job.lsn_range.end);
for lsn_range in lsn_ranges {
let key_range = key..key.next();
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
let layer = &self.layers[layer_id.0].layer;
layer.key_range().contains(&key)
&& overlaps_with(layer.lsn_range(), &lsn_range)
})
.cloned()
.collect();
new_jobs.push(CompactionJob {
key_range,
lsn_range,
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
Some(next_key) => {
// A key with too large size impact for a single delta layer. This
// case occurs if you make a huge number of updates for a single key.
//
// Drain the window with has_more = false to make a clean cut before
// the key, and then make dedicated delta layers for the single key.
//
// We cannot cluster the key with the others, because layer files are
// not allowed to overlap with each other in the lsn,key space (no
// overlaps allowed for the rectangles).
let key = next_key.key;
debug!("key {key} with size impact larger than the layer size");
while !window.is_empty() {
let has_more = false;
let key_range = window.choose_next_delta(self.target_file_size, has_more)
.expect("with has_more==false, choose_next_delta always returns something for a non-empty Window");
create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
}

// Not really required: but here for future resilience:
// We make a "gap" here, so any structure the window holds should
// probably be reset.
window = Window::new();

let mut prior_lsn = job.lsn_range.start;
let mut lsn_ranges = Vec::new();
for (lsn, _size) in next_key.partition_lsns.iter() {
lsn_ranges.push(prior_lsn..*lsn);
prior_lsn = *lsn;
}
lsn_ranges.push(prior_lsn..job.lsn_range.end);
for lsn_range in lsn_ranges {
let key_range = key..key.next();
create_delta_job(key_range, &lsn_range, &mut new_jobs);
}
}
}
all_in_window = true;
}

// All the input files are rewritten. Set up the tracking for when they can
Expand Down

0 comments on commit 9cbdde7

Please sign in to comment.