Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tiered compaction: cut deltas along lsn as well if needed #7671

Merged
merged 11 commits into from
May 13, 2024
88 changes: 74 additions & 14 deletions pageserver/compaction/src/compact_tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,6 @@ where
// If we have accumulated only a narrow band of keyspace, create an
// image layer. Otherwise write a delta layer.

// FIXME: deal with the case of lots of values for same key

// FIXME: we are ignoring images here. Did we already divide the work
// so that we won't encounter them here?

Expand All @@ -550,16 +548,13 @@ where
let mut new_jobs = Vec::new();

// Slide a window through the keyspace
let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
let mut key_accum =
std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size));
let mut all_in_window: bool = false;
let mut window = Window::new();
loop {
if all_in_window && window.elems.is_empty() {
// All done!
break;
}
if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
{

let drain_window = |window: &mut Window<_>, new_jobs: &mut Vec<_>, has_more: bool| {
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
if let Some(key_range) = window.choose_next_delta(self.target_file_size, has_more) {
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
Expand All @@ -576,14 +571,75 @@ where
input_layers: batch_layers,
completed: false,
});
true
} else {
assert!(!all_in_window);
if let Some(next_key) = key_accum.next().await.transpose()? {
false
}
};
'outer: loop {
if all_in_window && window.is_empty() {
// All done!
break;
}
if drain_window(&mut window, &mut new_jobs, !all_in_window) {
continue;
}
assert!(!all_in_window);
while let Some(next_key) = key_accum.next().await.transpose()? {
if next_key.partition_lsns.is_empty() {
// Normal case: extend the window by the key
window.feed(next_key.key, next_key.size);
} else {
all_in_window = true;
continue 'outer;
}
// We have a key with too large size impact for a single delta layer.
// Therefore, drain window with has_more = false to make a clean cut
// before the key, and then make dedicated delta layers for the
// single key.
// We cannot cluster the key with the others, because
// layer files are not allowed to overlap with each other in
// the lsn,key space (no overlaps allowed for the rectangles).
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
let key = next_key.key;
debug!("key {key} with size impact larger than the layer size");
if !window.is_empty() {
let has_more = false;
drain_window(&mut window, &mut new_jobs, has_more);
}
assert!(window.is_empty());

// Not really required: but here for future resilience:
// We make a "gap" here, so any structure the window holds should
// probably be reset.
window = Window::new();

let mut prior_lsn = job.lsn_range.start;
let mut lsn_ranges = Vec::new();
for (lsn, _size) in next_key.partition_lsns.iter() {
lsn_ranges.push(prior_lsn..*lsn);
prior_lsn = *lsn;
}
lsn_ranges.push(prior_lsn..job.lsn_range.end);
for lsn_range in lsn_ranges {
let key_range = key..key.next();
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
let layer = &self.layers[layer_id.0].layer;
layer.key_range().contains(&key)
&& overlaps_with(layer.lsn_range(), &lsn_range)
})
.cloned()
.collect();
new_jobs.push(CompactionJob {
key_range,
lsn_range,
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
}
}
all_in_window = true;
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
}

// All the input files are rewritten. Set up the tracking for when they can
Expand Down Expand Up @@ -803,6 +859,10 @@ where
self.elems.front().unwrap().accum_size - self.splitoff_size
}

fn is_empty(&self) -> bool {
self.elems.is_empty()
}

fn commit_upto(&mut self, mut upto: usize) {
while upto > 1 {
let popped = self.elems.pop_front().unwrap();
Expand Down
23 changes: 19 additions & 4 deletions pageserver/compaction/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,14 @@ pub struct KeySize<K> {
pub key: K,
pub num_values: u64,
pub size: u64,
/// The lsns to partition at (if empty then no per-lsn partitioning)
pub partition_lsns: Vec<(Lsn, u64)>,
}

pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
pub fn accum_key_values<'a, I, K, D, E>(
input: I,
target_size: u64,
) -> impl Stream<Item = Result<KeySize<K>, E>>
where
K: Eq + PartialOrd + Display + Copy,
I: Stream<Item = Result<D, E>>,
Expand All @@ -249,25 +254,35 @@ where

if let Some(first) = input.next().await {
let first = first?;
let mut part_size = first.size();
let mut accum: KeySize<K> = KeySize {
key: first.key(),
num_values: 1,
size: first.size(),
size: part_size,
partition_lsns: Vec::new(),
};
let mut last_key = accum.key;
while let Some(this) = input.next().await {
let this = this?;
if this.key() == accum.key {
accum.size += this.size();
let add_size = this.size();
if part_size + add_size > target_size {
accum.partition_lsns.push((this.lsn(), part_size));
part_size = 0;
}
part_size += add_size;
accum.size += add_size;
accum.num_values += 1;
} else {
assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
last_key = accum.key;
yield accum;
part_size = this.size();
accum = KeySize {
key: this.key(),
num_values: 1,
size: this.size(),
size: part_size,
partition_lsns: Vec::new(),
};
}
}
Expand Down
6 changes: 6 additions & 0 deletions pageserver/compaction/src/identify_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ impl<L> Level<L> {
}
let mut events: Vec<Event<K>> = Vec::new();
for (idx, l) in self.layers.iter().enumerate() {
let key_range = l.key_range();
if key_range.end == key_range.start.next() && l.is_delta() {
// Ignore single-key delta layers as they can be stacked on top of each other
// as that is the only way to cut further.
continue;
}
events.push(Event {
key: l.key_range().start,
layer_idx: idx,
Expand Down
8 changes: 2 additions & 6 deletions pageserver/compaction/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ pub(crate) fn setup_logging() {
/// even if we produce an extremely narrow delta layer, spanning just that one
/// key, we still too many records to fit in the target file size. We need to
/// split in the LSN dimension too in that case.
///
/// TODO: The code to avoid this problem has not been implemented yet! So the
/// assertion currently fails, but we need to make it not fail.
#[ignore]
#[tokio::test]
async fn test_many_updates_for_single_key() {
setup_logging();
Expand All @@ -43,9 +39,9 @@ async fn test_many_updates_for_single_key() {
}
for l in executor.live_layers.iter() {
assert!(l.file_size() < executor.target_file_size * 2);
// sanity check that none of the delta layers are stupidly small either
// Sanity check that none of the delta layers are empty either.
if l.is_delta() {
assert!(l.file_size() > executor.target_file_size / 2);
assert!(l.file_size() > 0);
}
}
}
Expand Down
Loading