Skip to content

Commit

Permalink
pageserver: integrate k-merge with bottom-most compaction (#8415)
Browse files Browse the repository at this point in the history
Use the k-merge iterator in the compaction process to reduce memory
footprint.

part of #8002

## Summary of changes

* refactor the bottom-most compaction code to use k-merge iterator
* add Send bound on some structs as it is used across the await points

---------

Signed-off-by: Alex Chi Z <[email protected]>
Co-authored-by: Arpad Müller <[email protected]>
  • Loading branch information
skyzh and arpad-m authored Jul 18, 2024
1 parent d263b18 commit a4434cf
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 71 deletions.
2 changes: 1 addition & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6810,7 +6810,7 @@ mod tests {
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..get_key(10),
key_range: Key::MIN..Key::MAX,
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/disk_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ where

pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
where
R: 'a,
R: 'a + Send,
{
DiskBtreeIterator {
stream: Box::pin(self.into_stream(start_key, ctx)),
Expand Down Expand Up @@ -521,7 +521,7 @@ where
pub struct DiskBtreeIterator<'a> {
#[allow(clippy::type_complexity)]
stream: std::pin::Pin<
Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a>,
Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
>,
}

Expand Down
2 changes: 0 additions & 2 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ pub(crate) mod inmemory_layer;
pub(crate) mod layer;
mod layer_desc;
mod layer_name;

#[cfg(test)]
pub mod merge_iterator;

use crate::context::{AccessStatsBehavior, RequestContext};
Expand Down
21 changes: 10 additions & 11 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ use crate::page_cache::{self, FileId, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
Expand All @@ -53,6 +56,7 @@ use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
Expand Down Expand Up @@ -747,12 +751,10 @@ impl DeltaLayer {
}

impl DeltaLayerInner {
#[cfg(test)]
pub(crate) fn key_range(&self) -> &Range<Key> {
&self.layer_key_range
}

#[cfg(test)]
pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
&self.layer_lsn_range
}
Expand Down Expand Up @@ -1512,7 +1514,6 @@ impl DeltaLayerInner {
offset
}

#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
Expand All @@ -1523,7 +1524,7 @@ impl DeltaLayerInner {
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
key_values_batch: std::collections::VecDeque::new(),
is_end: false,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new(
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
Expand Down Expand Up @@ -1595,17 +1596,15 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
}
}

#[cfg(test)]
pub struct DeltaLayerIterator<'a> {
delta_layer: &'a DeltaLayerInner,
ctx: &'a RequestContext,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner,
index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>,
key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>,
planner: StreamingVectoredReadPlanner,
index_iter: DiskBtreeIterator<'a>,
key_values_batch: VecDeque<(Key, Lsn, Value)>,
is_end: bool,
}

#[cfg(test)]
impl<'a> DeltaLayerIterator<'a> {
/// Retrieve a batch of key-value pairs into the iterator buffer.
async fn next_batch(&mut self) -> anyhow::Result<()> {
Expand Down
23 changes: 11 additions & 12 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ use crate::page_cache::{self, FileId, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
Expand All @@ -50,6 +53,7 @@ use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
Expand Down Expand Up @@ -369,12 +373,10 @@ impl ImageLayer {
}

impl ImageLayerInner {
#[cfg(test)]
pub(crate) fn key_range(&self) -> &Range<Key> {
&self.key_range
}

#[cfg(test)]
pub(crate) fn lsn(&self) -> Lsn {
self.lsn
}
Expand Down Expand Up @@ -699,7 +701,6 @@ impl ImageLayerInner {
}
}

#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
Expand All @@ -708,9 +709,9 @@ impl ImageLayerInner {
image_layer: self,
ctx,
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
key_values_batch: std::collections::VecDeque::new(),
key_values_batch: VecDeque::new(),
is_end: false,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new(
planner: StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
Expand Down Expand Up @@ -974,17 +975,15 @@ impl Drop for ImageLayerWriter {
}
}

#[cfg(test)]
pub struct ImageLayerIterator<'a> {
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner,
index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>,
key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>,
planner: StreamingVectoredReadPlanner,
index_iter: DiskBtreeIterator<'a>,
key_values_batch: VecDeque<(Key, Lsn, Value)>,
is_end: bool,
}

#[cfg(test)]
impl<'a> ImageLayerIterator<'a> {
/// Retrieve a batch of key-value pairs into the iterator buffer.
async fn next_batch(&mut self) -> anyhow::Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ impl Layer {
}

/// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
#[allow(dead_code)]
pub(crate) async fn load_key_values(
&self,
ctx: &RequestContext,
Expand Down Expand Up @@ -1918,7 +1919,7 @@ impl ResidentLayer {
self.owner.metadata()
}

#[cfg(test)]
/// Cast the layer to a delta, return an error if it is an image layer.
pub(crate) async fn get_as_delta(
&self,
ctx: &RequestContext,
Expand All @@ -1930,7 +1931,7 @@ impl ResidentLayer {
}
}

#[cfg(test)]
/// Cast the layer to an image, return an error if it is a delta layer.
pub(crate) async fn get_as_image(
&self,
ctx: &RequestContext,
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/tenant/storage_layer/merge_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,5 +547,9 @@ mod tests {
&ctx,
);
assert_merge_iter_equal(&mut merge_iter, &expect).await;

is_send(merge_iter);
}

fn is_send(_: impl Send) {}
}
70 changes: 31 additions & 39 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
Expand Down Expand Up @@ -1039,10 +1040,12 @@ impl Timeline {
);
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
// Also, collect the layer information to decide when to split the new delta layers.
let mut all_key_values = Vec::new();
let mut downloaded_layers = Vec::new();
let mut delta_split_points = BTreeSet::new();
for layer in &layer_selection {
all_key_values.extend(layer.load_key_values(ctx).await?);
let resident_layer = layer.download_and_keep_resident().await?;
downloaded_layers.push(resident_layer);

let desc = layer.layer_desc();
if desc.is_delta() {
// TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon)
Expand All @@ -1052,44 +1055,28 @@ impl Timeline {
delta_split_points.insert(key_range.end);
}
}
// Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and
// image layers, make image appear before than delta.
struct ValueWrapper<'a>(&'a crate::repository::Value);
impl Ord for ValueWrapper<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use crate::repository::Value;
use std::cmp::Ordering;
match (self.0, other.0) {
(Value::Image(_), Value::WalRecord(_)) => Ordering::Less,
(Value::WalRecord(_), Value::Image(_)) => Ordering::Greater,
_ => Ordering::Equal,
}
}
}
impl PartialOrd for ValueWrapper<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ValueWrapper<'_> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == std::cmp::Ordering::Equal
let mut delta_layers = Vec::new();
let mut image_layers = Vec::new();
for resident_layer in &downloaded_layers {
if resident_layer.layer_desc().is_delta() {
let layer = resident_layer.get_as_delta(ctx).await?;
delta_layers.push(layer);
} else {
let layer = resident_layer.get_as_image(ctx).await?;
image_layers.push(layer);
}
}
impl Eq for ValueWrapper<'_> {}
all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| {
(k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2)))
});
let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx);
// Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas.
// Data of the same key.
let mut accumulated_values = Vec::new();
let mut last_key = all_key_values.first().unwrap().0; // TODO: assert all_key_values not empty
let mut last_key: Option<Key> = None;

/// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
async fn flush_accumulated_states(
tline: &Arc<Timeline>,
key: Key,
accumulated_values: &[&(Key, Lsn, crate::repository::Value)],
accumulated_values: &[(Key, Lsn, crate::repository::Value)],
horizon: Lsn,
) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
let mut base_image = None;
Expand Down Expand Up @@ -1190,7 +1177,7 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(all_key_values.first().unwrap().0..all_key_values.last().unwrap().0.next()),
&(Key::MIN..Key::MAX), // covers the full key range
gc_cutoff,
ctx,
)
Expand All @@ -1200,20 +1187,24 @@ impl Timeline {
let delta_split_points = delta_split_points.into_iter().collect_vec();
let mut current_delta_split_point = 0;
let mut delta_layers = Vec::new();
for item @ (key, _, _) in &all_key_values {
if &last_key == key {
accumulated_values.push(item);
while let Some((key, lsn, val)) = merge_iter.next().await? {
if last_key.is_none() || last_key.as_ref() == Some(&key) {
if last_key.is_none() {
last_key = Some(key);
}
accumulated_values.push((key, lsn, val));
} else {
let last_key = last_key.as_mut().unwrap();
let (deltas, image) =
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff)
flush_accumulated_states(self, *last_key, &accumulated_values, gc_cutoff)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
image_layer_writer.put_image(last_key, image, ctx).await?;
image_layer_writer.put_image(*last_key, image, ctx).await?;
delta_values.extend(deltas);
delta_layers.extend(
flush_deltas(
&mut delta_values,
last_key,
*last_key,
&delta_split_points,
&mut current_delta_split_point,
self,
Expand All @@ -1223,11 +1214,12 @@ impl Timeline {
.await?,
);
accumulated_values.clear();
accumulated_values.push(item);
last_key = *key;
*last_key = key;
accumulated_values.push((key, lsn, val));
}
}

let last_key = last_key.expect("no keys produced during compaction");
// TODO: move this part to the loop body
let (deltas, image) =
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
Expand Down
2 changes: 0 additions & 2 deletions pageserver/src/tenant/vectored_blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ impl<'a> VectoredBlobReader<'a> {
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
/// max_cnt constraints.
#[cfg(test)]
pub struct StreamingVectoredReadPlanner {
read_builder: Option<VectoredReadBuilder>,
// Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
Expand All @@ -410,7 +409,6 @@ pub struct StreamingVectoredReadPlanner {
cnt: usize,
}

#[cfg(test)]
impl StreamingVectoredReadPlanner {
pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
assert!(max_cnt > 0);
Expand Down

1 comment on commit a4434cf

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3202 tests run: 3068 passed, 1 failed, 133 skipped (full report)


Failures on Postgres 14

  • test_basebackup_with_high_slru_count[github-actions-selfhosted-vectored-10-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_basebackup_with_high_slru_count[release-pg14-github-actions-selfhosted-vectored-10-13-30]"
Flaky tests (1)

Postgres 14

Code coverage* (full report)

  • functions: 32.7% (6992 of 21384 functions)
  • lines: 50.1% (55217 of 110123 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
a4434cf at 2024-07-18T17:46:49.478Z :recycle:

Please sign in to comment.