Skip to content

Commit

Permalink
refactor(pageserver): better k-merge implementation for tiered compac…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed May 20, 2024
1 parent 6810d2a commit a40eee3
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 111 deletions.
212 changes: 108 additions & 104 deletions pageserver/compaction/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ use futures::future::BoxFuture;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use pageserver_api::shard::ShardIdentity;
use pin_project_lite::pin_project;
use std::collections::BinaryHeap;
use std::collections::VecDeque;
use std::collections::{binary_heap, BinaryHeap};
use std::fmt::Display;
use std::future::Future;
use std::ops::{DerefMut, Range};
use std::pin::Pin;
use std::task::{ready, Poll};
use std::ops::Range;
use utils::lsn::Lsn;

pub const PAGE_SZ: u64 = 8192;
Expand Down Expand Up @@ -85,33 +81,6 @@ pub fn intersect_keyspace<K: Ord + Clone + Copy>(
ranges
}

/// Create a stream that iterates through all DeltaEntrys among all input
/// layers, in key-lsn order.
///
/// This is public because the create_delta() implementation likely wants to use this too
/// TODO: move to a more shared place
pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
) -> MergeDeltaKeys<'a, E> {
// Use a binary heap to merge the layers. Each input layer is initially
// represented by a LazyLoadLayer::Unloaded element, which uses the start of
// the layer's key range as the key. The first time a layer reaches the top
// of the heap, all the keys of the layer are loaded into a sorted vector.
//
// This helps to keep the memory usage reasonable: we only need to hold in
// memory the DeltaEntrys of the layers that overlap with the "current" key.
let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
for l in layers {
heap.push(LazyLoadLayer::Unloaded(l));
}
MergeDeltaKeys {
heap,
ctx,
load_future: None,
}
}

pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
Expand All @@ -129,104 +98,139 @@ pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
Ok(stream)
}

enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
Unloaded(&'a E::DeltaLayer),
/// Wrapper type to make `dl.load_keys`` compile.
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;

pub enum LayerIterator<'a, E: CompactionJobExecutor> {
Loaded(
VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>,
&'a E::RequestContext,
),
Unloaded(&'a E::DeltaLayer, &'a E::RequestContext),
}
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
fn min_key(&self) -> E::Key {

impl<'a, E: CompactionJobExecutor + 'a> LayerIterator<'a, E> {
pub fn new(delta_layer: &'a E::DeltaLayer, ctx: &'a E::RequestContext) -> Self {
Self::Unloaded(delta_layer, ctx)
}

pub fn key_lsn(&self) -> (E::Key, Lsn) {
match self {
Self::Unloaded(dl, _) => (dl.key_range().start, dl.lsn_range().start),
Self::Loaded(entries, _) => entries.front().map(|x| (x.key(), x.lsn())).unwrap(),
}
}

async fn load(&mut self) -> anyhow::Result<()> {
match self {
Self::Loaded(entries) => entries.front().unwrap().key(),
Self::Unloaded(dl) => dl.key_range().start,
Self::Unloaded(dl, ctx) => {
let unloaded_key_lsn = (dl.key_range().start, dl.lsn_range().start);
let fut: LoadFuture<
'a,
<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>,
> = Box::pin(dl.load_keys(ctx));
let keys = VecDeque::from(fut.await?);
assert_eq!(
keys.front().as_ref().map(|x| (x.key(), x.lsn())).unwrap(),
unloaded_key_lsn,
"unmatched start key_lsn"
);
*self = Self::Loaded(keys, ctx);
Ok(())
}
Self::Loaded(_, _) => Ok(()),
}
}
fn min_lsn(&self) -> Lsn {

pub async fn entry(
&mut self,
) -> anyhow::Result<&<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
self.load().await?;
let Self::Loaded(x, _) = self else {
unreachable!()
};
Ok(x.front().unwrap())
}

pub async fn next(
&mut self,
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
self.load().await?; // requires Box::pin to make it compile
let Self::Loaded(x, _) = self else {
unreachable!()
};
Ok(x.pop_front().expect("already reached the end"))
}

pub fn is_end(&self) -> bool {
match self {
Self::Loaded(entries) => entries.front().unwrap().lsn(),
Self::Unloaded(dl) => dl.lsn_range().start,
Self::Unloaded(_, _) => false,
Self::Loaded(x, _) => x.is_empty(),
}
}
}
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {

impl<'a, E: CompactionJobExecutor + 'a> PartialOrd for LayerIterator<'a, E> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {

impl<'a, E: CompactionJobExecutor + 'a> Ord for LayerIterator<'a, E> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// reverse order so that we get a min-heap
(other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
// reverse comparison to get a min-heap
other.key_lsn().cmp(&self.key_lsn())
}
}
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {

impl<'a, E: CompactionJobExecutor + 'a> PartialEq for LayerIterator<'a, E> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == std::cmp::Ordering::Equal
}
}
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}

type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
impl<'a, E: CompactionJobExecutor + 'a> Eq for LayerIterator<'a, E> {}

// Stream returned by `merge_delta_keys`
pin_project! {
#[allow(clippy::type_complexity)]
pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
heap: BinaryHeap<LazyLoadLayer<'a, E>>,
pub struct DeltaMergeIterator<'a, E: CompactionJobExecutor> {
heap: BinaryHeap<LayerIterator<'a, E>>,
}

#[pin]
load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
impl<'a, E: CompactionJobExecutor + 'a> DeltaMergeIterator<'a, E> {
pub fn new(delta_layers: &'a [E::DeltaLayer], ctx: &'a E::RequestContext) -> Self {
let mut heap = BinaryHeap::new();
for dl in delta_layers {
heap.push(LayerIterator::new(dl, ctx));
}
Self { heap }
}

ctx: &'a E::RequestContext,
}
}
pub fn is_end(&self) -> bool {
self.heap.is_empty()
}

impl<'a, E> Stream for MergeDeltaKeys<'a, E>
where
E: CompactionJobExecutor + 'a,
{
type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
/// The next key-lsn entry that will be returned by `next`.
pub fn key_lsn(&self) -> (E::Key, Lsn) {
self.heap.peek().expect("already reached the end").key_lsn()
}

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
let mut this = self.project();
loop {
if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
// We are waiting for loading the keys to finish
match ready!(load_future.as_mut().poll(cx)) {
Ok(entries) => {
this.load_future.set(None);
*this.heap.peek_mut().unwrap() =
LazyLoadLayer::Loaded(VecDeque::from(entries));
}
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
/// Move to the next entry and return the current entry.
pub async fn next(
&mut self,
) -> anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>> {
let Some(mut top) = self.heap.peek_mut() else {
panic!("already reached the end")
};
match top.next().await {
Ok(entry) => {
if top.is_end() {
binary_heap::PeekMut::pop(top);
}
Ok(entry)
}

// If the topmost layer in the heap hasn't been loaded yet, start
// loading it. Otherwise return the next entry from it and update
// the layer's position in the heap (this decreaseKey operation is
// performed implicitly when `top` is dropped).
if let Some(mut top) = this.heap.peek_mut() {
match top.deref_mut() {
LazyLoadLayer::Unloaded(ref mut l) => {
let fut = l.load_keys(this.ctx);
this.load_future.set(Some(Box::pin(fut)));
continue;
}
LazyLoadLayer::Loaded(ref mut entries) => {
let result = entries.pop_front().unwrap();
if entries.is_empty() {
std::collections::binary_heap::PeekMut::pop(top);
}
return Poll::Ready(Some(Ok(result)));
}
}
} else {
return Poll::Ready(None);
Err(e) => {
// pop the item if there is an error, otherwise it might cause further panic when binary heap compares it after `PeekMut` gets dropped.
binary_heap::PeekMut::pop(top);
Err(e)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion pageserver/compaction/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ pub trait CompactionJobExecutor {
) -> impl Future<Output = anyhow::Result<()>> + Send;
}

pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
pub trait CompactionKey:
std::cmp::Ord + Clone + Copy + std::fmt::Display + std::fmt::Debug
{
const MIN: Self;
const MAX: Self;

Expand Down
11 changes: 5 additions & 6 deletions pageserver/compaction/src/simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod draw;

use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};

use futures::StreamExt;
use pageserver_api::shard::ShardIdentity;
use rand::Rng;
use tracing::info;
Expand All @@ -15,7 +14,8 @@ use std::sync::Arc;
use std::sync::Mutex;

use crate::helpers::PAGE_SZ;
use crate::helpers::{merge_delta_keys, overlaps_with};
use crate::helpers::overlaps_with;
use crate::helpers::DeltaMergeIterator;

use crate::interface;
use crate::interface::CompactionLayer;
Expand Down Expand Up @@ -545,12 +545,11 @@ impl interface::CompactionJobExecutor for MockTimeline {
input_layers: &[Arc<MockDeltaLayer>],
ctx: &MockRequestContext,
) -> anyhow::Result<()> {
let mut key_value_stream =
std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
let mut key_value_stream = DeltaMergeIterator::<MockTimeline>::new(input_layers, ctx);
let mut records: Vec<MockRecord> = Vec::new();
let mut total_len = 2;
while let Some(delta_entry) = key_value_stream.next().await {
let delta_entry: MockRecord = delta_entry?;
while !key_value_stream.is_end() {
let delta_entry: MockRecord = key_value_stream.next().await?;
if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
total_len += delta_entry.len;
records.push(delta_entry);
Expand Down

0 comments on commit a40eee3

Please sign in to comment.