Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tiered compaction: cut deltas along lsn as well if needed #7671

Merged
merged 11 commits into from
May 13, 2024
107 changes: 82 additions & 25 deletions pageserver/compaction/src/compact_tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,6 @@ where
// If we have accumulated only a narrow band of keyspace, create an
// image layer. Otherwise write a delta layer.

// FIXME: deal with the case of lots of values for same key

// FIXME: we are ignoring images here. Did we already divide the work
// so that we won't encounter them here?

Expand All @@ -550,39 +548,94 @@ where
let mut new_jobs = Vec::new();

// Slide a window through the keyspace
let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
let mut key_accum =
std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size));
let mut all_in_window: bool = false;
let mut window = Window::new();

// Helper function to create a job for a new delta layer with given key-lsn
// rectangle.
let create_delta_job = |key_range, lsn_range: &Range<Lsn>, new_jobs: &mut Vec<_>| {
// The inputs for the job are all the input layers of the original job that
// overlap with the rectangle.
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
})
.cloned()
.collect();
assert!(!batch_layers.is_empty());
new_jobs.push(CompactionJob {
key_range,
lsn_range: lsn_range.clone(),
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
};

loop {
if all_in_window && window.elems.is_empty() {
if all_in_window && window.is_empty() {
// All done!
break;
}

// If we now have enough keyspace for next delta layer in the window, create a
// new delta layer
if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
{
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
})
.cloned()
.collect();
assert!(!batch_layers.is_empty());
new_jobs.push(CompactionJob {
key_range,
lsn_range: job.lsn_range.clone(),
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
} else {
assert!(!all_in_window);
if let Some(next_key) = key_accum.next().await.transpose()? {
window.feed(next_key.key, next_key.size);
} else {
create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
continue;
}
assert!(!all_in_window);

// Process next key in the key space
match key_accum.next().await.transpose()? {
None => {
all_in_window = true;
}
Some(next_key) if next_key.partition_lsns.is_empty() => {
// Normal case: extend the window by the key
window.feed(next_key.key, next_key.size);
}
Some(next_key) => {
// A key with too large size impact for a single delta layer. This
// case occurs if you make a huge number of updates for a single key.
//
// Drain the window with has_more = false to make a clean cut before
// the key, and then make dedicated delta layers for the single key.
//
// We cannot cluster the key with the others, because we don't want
// layer files to overlap with each other in the lsn,key space (no
// overlaps for the rectangles).
let key = next_key.key;
debug!("key {key} with size impact larger than the layer size");
while !window.is_empty() {
let has_more = false;
let key_range = window.choose_next_delta(self.target_file_size, has_more)
.expect("with has_more==false, choose_next_delta always returns something for a non-empty Window");
create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
}

// Not really required: but here for future resilience:
// We make a "gap" here, so any structure the window holds should
// probably be reset.
window = Window::new();

let mut prior_lsn = job.lsn_range.start;
let mut lsn_ranges = Vec::new();
for (lsn, _size) in next_key.partition_lsns.iter() {
lsn_ranges.push(prior_lsn..*lsn);
prior_lsn = *lsn;
}
lsn_ranges.push(prior_lsn..job.lsn_range.end);
for lsn_range in lsn_ranges {
let key_range = key..key.next();
create_delta_job(key_range, &lsn_range, &mut new_jobs);
}
}
}
}

Expand Down Expand Up @@ -803,6 +856,10 @@ where
self.elems.front().unwrap().accum_size - self.splitoff_size
}

fn is_empty(&self) -> bool {
self.elems.is_empty()
}

fn commit_upto(&mut self, mut upto: usize) {
while upto > 1 {
let popped = self.elems.pop_front().unwrap();
Expand Down
23 changes: 19 additions & 4 deletions pageserver/compaction/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,14 @@ pub struct KeySize<K> {
pub key: K,
pub num_values: u64,
pub size: u64,
/// The lsns to partition at (if empty then no per-lsn partitioning)
pub partition_lsns: Vec<(Lsn, u64)>,
}

pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
pub fn accum_key_values<'a, I, K, D, E>(
input: I,
target_size: u64,
) -> impl Stream<Item = Result<KeySize<K>, E>>
where
K: Eq + PartialOrd + Display + Copy,
I: Stream<Item = Result<D, E>>,
Expand All @@ -249,25 +254,35 @@ where

if let Some(first) = input.next().await {
let first = first?;
let mut part_size = first.size();
let mut accum: KeySize<K> = KeySize {
key: first.key(),
num_values: 1,
size: first.size(),
size: part_size,
partition_lsns: Vec::new(),
};
let mut last_key = accum.key;
while let Some(this) = input.next().await {
let this = this?;
if this.key() == accum.key {
accum.size += this.size();
let add_size = this.size();
if part_size + add_size > target_size {
accum.partition_lsns.push((this.lsn(), part_size));
part_size = 0;
}
part_size += add_size;
accum.size += add_size;
accum.num_values += 1;
} else {
assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
last_key = accum.key;
yield accum;
part_size = this.size();
accum = KeySize {
key: this.key(),
num_values: 1,
size: this.size(),
size: part_size,
partition_lsns: Vec::new(),
};
}
}
Expand Down
6 changes: 6 additions & 0 deletions pageserver/compaction/src/identify_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ impl<L> Level<L> {
}
let mut events: Vec<Event<K>> = Vec::new();
for (idx, l) in self.layers.iter().enumerate() {
let key_range = l.key_range();
if key_range.end == key_range.start.next() && l.is_delta() {
// Ignore single-key delta layers as they can be stacked on top of each other
// as that is the only way to cut further.
continue;
}
events.push(Event {
key: l.key_range().start,
layer_idx: idx,
Expand Down
8 changes: 2 additions & 6 deletions pageserver/compaction/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ pub(crate) fn setup_logging() {
/// even if we produce an extremely narrow delta layer, spanning just that one
/// key, we still too many records to fit in the target file size. We need to
/// split in the LSN dimension too in that case.
///
/// TODO: The code to avoid this problem has not been implemented yet! So the
/// assertion currently fails, but we need to make it not fail.
#[ignore]
#[tokio::test]
async fn test_many_updates_for_single_key() {
setup_logging();
Expand All @@ -43,9 +39,9 @@ async fn test_many_updates_for_single_key() {
}
for l in executor.live_layers.iter() {
assert!(l.file_size() < executor.target_file_size * 2);
// sanity check that none of the delta layers are stupidly small either
// Sanity check that none of the delta layers are empty either.
if l.is_delta() {
assert!(l.file_size() > executor.target_file_size / 2);
assert!(l.file_size() > 0);
}
}
}
Expand Down
Loading