Skip to content

Commit

Permalink
Minor improvements to tiered compaction (#7020)
Browse files Browse the repository at this point in the history
Minor non-functional improvements to tiered compaction, mostly
consisting of comment fixes.

Followup of  #6830, part of #6768
  • Loading branch information
arpad-m authored Mar 5, 2024
1 parent b036c32 commit e69a255
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 43 deletions.
21 changes: 6 additions & 15 deletions pageserver/compaction/src/compact_tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub async fn compact_tiered<E: CompactionJobExecutor>(
);

// Identify the range of LSNs that belong to this level. We assume that
// each file in this level span an LSN range up to 1.75x target file
// each file in this level spans an LSN range up to 1.75x target file
// size. That should give us enough slop that if we created a slightly
// oversized L0 layer, e.g. because flushing the in-memory layer was
// delayed for some reason, we don't consider the oversized layer to
Expand Down Expand Up @@ -248,7 +248,6 @@ enum CompactionStrategy {
CreateImage,
}

#[allow(dead_code)] // Todo
struct CompactionJob<E: CompactionJobExecutor> {
key_range: Range<E::Key>,
lsn_range: Range<Lsn>,
Expand Down Expand Up @@ -345,7 +344,7 @@ where
///
/// TODO: Currently, this is called exactly once for the level, and we
/// decide whether to create new image layers to cover the whole level, or
/// write a new set of delta. In the future, this should try to partition
/// write a new set of deltas. In the future, this should try to partition
/// the key space, and make the decision separately for each partition.
async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
let job = &self.jobs[job_id.0];
Expand Down Expand Up @@ -709,18 +708,6 @@ where
}
}

// Sliding window through keyspace and values
//
// This is used to decide what layer to write next, from the beginning of the window.
//
// Candidates:
//
// 1. Create an image layer, snapping to previous images
// 2. Create a delta layer, snapping to previous images
// 3. Create an image layer, snapping to
//
//

// Take previous partitioning, based on the image layers below.
//
// Candidate is at the front:
Expand All @@ -739,6 +726,10 @@ struct WindowElement<K> {
last_key: K, // inclusive
accum_size: u64,
}

// Sliding window through keyspace and values
//
// This is used to decide what layer to write next, from the beginning of the window.
struct Window<K> {
elems: VecDeque<WindowElement<K>>,

Expand Down
19 changes: 9 additions & 10 deletions pageserver/compaction/src/identify_levels.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! An LSM tree consists of multiple levels, each exponential larger than the
//! previous level. And each level consists of be multiple "tiers". With tiered
//! An LSM tree consists of multiple levels, each exponentially larger than the
//! previous level. And each level consists of multiple "tiers". With tiered
//! compaction, a level is compacted when it has accumulated more than N tiers,
//! forming one tier on the next level.
//!
Expand Down Expand Up @@ -170,20 +170,18 @@ where
})
}

// helper struct used in depth()
struct Event<K> {
key: K,
layer_idx: usize,
start: bool,
}

impl<L> Level<L> {
/// Count the number of deltas stacked on each other.
pub fn depth<K>(&self) -> u64
where
K: CompactionKey,
L: CompactionLayer<K>,
{
struct Event<K> {
key: K,
layer_idx: usize,
start: bool,
}
let mut events: Vec<Event<K>> = Vec::new();
for (idx, l) in self.layers.iter().enumerate() {
events.push(Event {
Expand All @@ -202,7 +200,7 @@ impl<L> Level<L> {
// Sweep the key space left to right. Stop at each distinct key, and
// count the number of deltas on top of the highest image at that key.
//
// This is a little enefficient, as we walk through the active_set on
// This is a little inefficient, as we walk through the active_set on
// every key. We could increment/decrement a counter on each step
// instead, but that'd require a bit more complex bookkeeping.
let mut active_set: BTreeSet<(Lsn, bool, usize)> = BTreeSet::new();
Expand Down Expand Up @@ -236,6 +234,7 @@ impl<L> Level<L> {
}
}
}
debug_assert_eq!(active_set, BTreeSet::new());
max_depth
}
}
Expand Down
31 changes: 15 additions & 16 deletions pageserver/compaction/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@
//! All the heavy lifting is done by the create_image and create_delta
//! functions that the implementor provides.
use async_trait::async_trait;
use futures::Future;
use pageserver_api::{key::Key, keyspace::key_range_size};
use std::ops::Range;
use utils::lsn::Lsn;

/// Public interface. This is the main thing that the implementor needs to provide
#[async_trait]
pub trait CompactionJobExecutor {
// Type system.
//
// We assume that there are two kinds of layers, deltas and images. The
// compaction doesn't distinguish whether they are stored locally or
// remotely.
//
// The keyspace is defined by CompactionKey trait.
//
// The keyspace is defined by the CompactionKey trait.
type Key: CompactionKey;

type Layer: CompactionLayer<Self::Key> + Clone;
Expand All @@ -35,61 +34,61 @@ pub trait CompactionJobExecutor {
// ----

/// Return all layers that overlap the given bounding box.
async fn get_layers(
fn get_layers(
&mut self,
key_range: &Range<Self::Key>,
lsn_range: &Range<Lsn>,
ctx: &Self::RequestContext,
) -> anyhow::Result<Vec<Self::Layer>>;
) -> impl Future<Output = anyhow::Result<Vec<Self::Layer>>> + Send;

async fn get_keyspace(
fn get_keyspace(
&mut self,
key_range: &Range<Self::Key>,
lsn: Lsn,
ctx: &Self::RequestContext,
) -> anyhow::Result<CompactionKeySpace<Self::Key>>;
) -> impl Future<Output = anyhow::Result<CompactionKeySpace<Self::Key>>> + Send;

/// NB: This is a pretty expensive operation. In the real pageserver
/// implementation, it downloads the layer, and keeps it resident
/// until the DeltaLayer is dropped.
async fn downcast_delta_layer(
fn downcast_delta_layer(
&self,
layer: &Self::Layer,
) -> anyhow::Result<Option<Self::DeltaLayer>>;
) -> impl Future<Output = anyhow::Result<Option<Self::DeltaLayer>>> + Send;

// ----
// Functions to execute the plan
// ----

/// Create a new image layer, materializing all the values in the key range,
/// at given 'lsn'.
async fn create_image(
fn create_image(
&mut self,
lsn: Lsn,
key_range: &Range<Self::Key>,
ctx: &Self::RequestContext,
) -> anyhow::Result<()>;
) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Create a new delta layer, containing all the values from 'input_layers'
/// in the given key and LSN range.
async fn create_delta(
fn create_delta(
&mut self,
lsn_range: &Range<Lsn>,
key_range: &Range<Self::Key>,
input_layers: &[Self::DeltaLayer],
ctx: &Self::RequestContext,
) -> anyhow::Result<()>;
) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Delete a layer. The compaction implementation will call this only after
/// all the create_image() or create_delta() calls that deletion of this
/// layer depends on have finished. But if the implementor has extra lazy
/// background tasks, like uploading the index json file to remote storage,
/// background tasks, like uploading the index json file to remote storage.
/// it is the implementation's responsibility to track those.
async fn delete_layer(
fn delete_layer(
&mut self,
layer: &Self::Layer,
ctx: &Self::RequestContext,
) -> anyhow::Result<()>;
) -> impl Future<Output = anyhow::Result<()>> + Send;
}

pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
Expand Down
1 change: 0 additions & 1 deletion pageserver/compaction/src/simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ impl From<&Arc<MockImageLayer>> for MockLayer {
}
}

#[async_trait]
impl interface::CompactionJobExecutor for MockTimeline {
type Key = Key;
type Layer = MockLayer;
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ struct ResidentDeltaLayer(ResidentLayer);
#[derive(Clone)]
struct ResidentImageLayer(ResidentLayer);

#[async_trait]
impl CompactionJobExecutor for TimelineAdaptor {
type Key = crate::repository::Key;

Expand Down

1 comment on commit e69a255

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2567 tests run: 2432 passed, 1 failed, 134 skipped (full report)


Failures on Postgres 14

  • test_basebackup_with_high_slru_count[github-actions-selfhosted-vectored-10-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_basebackup_with_high_slru_count[release-pg14-github-actions-selfhosted-vectored-10-13-30]"

Code coverage* (full report)

  • functions: 28.8% (6981 of 24250 functions)
  • lines: 47.4% (42875 of 90534 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
e69a255 at 2024-03-05T16:51:59.248Z :recycle:

Please sign in to comment.