Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compaction_level0_phase1: bypass PS PageCache for data blocks #8543

Merged
merged 14 commits into from
Jul 31, 2024
13 changes: 10 additions & 3 deletions pageserver/src/tenant/disk_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,19 @@ where
let mut stack = Vec::new();
stack.push((self.root_blk, None));
let block_cursor = self.reader.block_cursor();
let mut node_buf = [0_u8; PAGE_SZ];
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor
// Read the node, through the PS PageCache, into local variable `node_buf`.
// We could keep the page cache read guard alive, but, at the time of writing,
// we run quite small PS PageCache s => can't risk running out of
// PageCache space because this stream isn't consumed fast enough.
let page_read_guard = block_cursor
.read_blk(self.start_blk + node_blknum, ctx)
.await?;
node_buf.copy_from_slice(page_read_guard.as_ref());
drop(page_read_guard); // drop page cache read guard early

let node = OnDiskNode::deparse(node_buf.as_ref())?;
let node = OnDiskNode::deparse(&node_buf)?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;

Expand Down Expand Up @@ -345,6 +351,7 @@ where
Either::Left(idx..node.num_children.into())
};


// idx points to the first match now. Keep going from there
while let Some(idx) = iter.next() {
let key_off = idx * suffix_len;
Expand Down
21 changes: 16 additions & 5 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,16 @@ impl Timeline {

// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = all_keys.iter();
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
let mut all_values_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};

// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = all_keys
Expand Down Expand Up @@ -739,11 +748,11 @@ impl Timeline {
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key

for &DeltaEntry {
key, lsn, ref val, ..
} in all_values_iter
while let Some((key, lsn, value)) = all_values_iter
.next()
.await
.map_err(CompactionError::Other)?
{
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
Expand Down Expand Up @@ -928,6 +937,8 @@ impl Timeline {
}
}

drop(all_values_iter);
problame marked this conversation as resolved.
Show resolved Hide resolved

Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: deltas_to_compact
Expand Down
Loading