diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 52219a014cc7..60fc7ac9252a 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -63,7 +63,7 @@ pub async fn compact_tiered( ); // Identify the range of LSNs that belong to this level. We assume that - // each file in this level span an LSN range up to 1.75x target file + // each file in this level spans an LSN range up to 1.75x target file // size. That should give us enough slop that if we created a slightly // oversized L0 layer, e.g. because flushing the in-memory layer was // delayed for some reason, we don't consider the oversized layer to @@ -248,7 +248,6 @@ enum CompactionStrategy { CreateImage, } -#[allow(dead_code)] // Todo struct CompactionJob { key_range: Range, lsn_range: Range, @@ -345,7 +344,7 @@ where /// /// TODO: Currently, this is called exactly once for the level, and we /// decide whether to create new image layers to cover the whole level, or - /// write a new set of delta. In the future, this should try to partition + /// write a new set of deltas. In the future, this should try to partition /// the key space, and make the decision separately for each partition. async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> { let job = &self.jobs[job_id.0]; @@ -709,18 +708,6 @@ where } } -// Sliding window through keyspace and values -// -// This is used to decide what layer to write next, from the beginning of the window. -// -// Candidates: -// -// 1. Create an image layer, snapping to previous images -// 2. Create a delta layer, snapping to previous images -// 3. Create an image layer, snapping to -// -// - // Take previous partitioning, based on the image layers below. // // Candidate is at the front: @@ -739,6 +726,10 @@ struct WindowElement { last_key: K, // inclusive accum_size: u64, } + +// Sliding window through keyspace and values +// +// This is used to decide what layer to write next, from the beginning of the window. struct Window { elems: VecDeque>, diff --git a/pageserver/compaction/src/identify_levels.rs b/pageserver/compaction/src/identify_levels.rs index ef388fd92be0..98dd46925ca9 100644 --- a/pageserver/compaction/src/identify_levels.rs +++ b/pageserver/compaction/src/identify_levels.rs @@ -1,5 +1,5 @@ -//! An LSM tree consists of multiple levels, each exponential larger than the -//! previous level. And each level consists of be multiple "tiers". With tiered +//! An LSM tree consists of multiple levels, each exponentially larger than the +//! previous level. And each level consists of multiple "tiers". With tiered //! compaction, a level is compacted when it has accumulated more than N tiers, //! forming one tier on the next level. //! @@ -170,13 +170,6 @@ where }) } -// helper struct used in depth() -struct Event { - key: K, - layer_idx: usize, - start: bool, -} - impl Level { /// Count the number of deltas stacked on each other. pub fn depth(&self) -> u64 @@ -184,6 +177,11 @@ impl Level { K: CompactionKey, L: CompactionLayer, { + struct Event { + key: K, + layer_idx: usize, + start: bool, + } let mut events: Vec> = Vec::new(); for (idx, l) in self.layers.iter().enumerate() { events.push(Event { @@ -202,7 +200,7 @@ impl Level { // Sweep the key space left to right. Stop at each distinct key, and // count the number of deltas on top of the highest image at that key. // - // This is a little enefficient, as we walk through the active_set on + // This is a little inefficient, as we walk through the active_set on // every key. We could increment/decrement a counter on each step // instead, but that'd require a bit more complex bookkeeping. let mut active_set: BTreeSet<(Lsn, bool, usize)> = BTreeSet::new(); @@ -236,6 +234,7 @@ impl Level { } } } + debug_assert_eq!(active_set, BTreeSet::new()); max_depth } } diff --git a/pageserver/compaction/src/interface.rs b/pageserver/compaction/src/interface.rs index 979ceebf0e2f..2bb2e749c071 100644 --- a/pageserver/compaction/src/interface.rs +++ b/pageserver/compaction/src/interface.rs @@ -4,12 +4,12 @@ //! All the heavy lifting is done by the create_image and create_delta //! functions that the implementor provides. use async_trait::async_trait; +use futures::Future; use pageserver_api::{key::Key, keyspace::key_range_size}; use std::ops::Range; use utils::lsn::Lsn; /// Public interface. This is the main thing that the implementor needs to provide -#[async_trait] pub trait CompactionJobExecutor { // Type system. // @@ -17,8 +17,7 @@ pub trait CompactionJobExecutor { // compaction doesn't distinguish whether they are stored locally or // remotely. // - // The keyspace is defined by CompactionKey trait. - // + // The keyspace is defined by the CompactionKey trait. type Key: CompactionKey; type Layer: CompactionLayer + Clone; @@ -35,27 +34,27 @@ pub trait CompactionJobExecutor { // ---- /// Return all layers that overlap the given bounding box. - async fn get_layers( + fn get_layers( &mut self, key_range: &Range, lsn_range: &Range, ctx: &Self::RequestContext, - ) -> anyhow::Result>; + ) -> impl Future>> + Send; - async fn get_keyspace( + fn get_keyspace( &mut self, key_range: &Range, lsn: Lsn, ctx: &Self::RequestContext, - ) -> anyhow::Result>; + ) -> impl Future>> + Send; /// NB: This is a pretty expensive operation. In the real pageserver /// implementation, it downloads the layer, and keeps it resident /// until the DeltaLayer is dropped. - async fn downcast_delta_layer( + fn downcast_delta_layer( &self, layer: &Self::Layer, - ) -> anyhow::Result>; + ) -> impl Future>> + Send; // ---- // Functions to execute the plan @@ -63,33 +62,33 @@ pub trait CompactionJobExecutor { /// Create a new image layer, materializing all the values in the key range, /// at given 'lsn'. - async fn create_image( + fn create_image( &mut self, lsn: Lsn, key_range: &Range, ctx: &Self::RequestContext, - ) -> anyhow::Result<()>; + ) -> impl Future> + Send; /// Create a new delta layer, containing all the values from 'input_layers' /// in the given key and LSN range. - async fn create_delta( + fn create_delta( &mut self, lsn_range: &Range, key_range: &Range, input_layers: &[Self::DeltaLayer], ctx: &Self::RequestContext, - ) -> anyhow::Result<()>; + ) -> impl Future> + Send; /// Delete a layer. The compaction implementation will call this only after /// all the create_image() or create_delta() calls that deletion of this /// layer depends on have finished. But if the implementor has extra lazy - /// background tasks, like uploading the index json file to remote storage, + /// background tasks, like uploading the index json file to remote storage. /// it is the implementation's responsibility to track those. - async fn delete_layer( + fn delete_layer( &mut self, layer: &Self::Layer, ctx: &Self::RequestContext, - ) -> anyhow::Result<()>; + ) -> impl Future> + Send; } pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display { diff --git a/pageserver/compaction/src/simulator.rs b/pageserver/compaction/src/simulator.rs index 6d07038dcd8f..def7983e7573 100644 --- a/pageserver/compaction/src/simulator.rs +++ b/pageserver/compaction/src/simulator.rs @@ -429,7 +429,6 @@ impl From<&Arc> for MockLayer { } } -#[async_trait] impl interface::CompactionJobExecutor for MockTimeline { type Key = Key; type Layer = MockLayer; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 914e3948efa2..8b544b1c3a02 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -134,7 +134,6 @@ struct ResidentDeltaLayer(ResidentLayer); #[derive(Clone)] struct ResidentImageLayer(ResidentLayer); -#[async_trait] impl CompactionJobExecutor for TimelineAdaptor { type Key = crate::repository::Key;