From a4434cf1c0b42133de1196c8adc5468637bbb8eb Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." Date: Thu, 18 Jul 2024 12:16:44 -0400 Subject: [PATCH] pageserver: integrate k-merge with bottom-most compaction (#8415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use the k-merge iterator in the compaction process to reduce memory footprint. part of https://github.com/neondatabase/neon/issues/8002 ## Summary of changes * refactor the bottom-most compaction code to use k-merge iterator * add Send bound on some structs as it is used across the await points --------- Signed-off-by: Alex Chi Z Co-authored-by: Arpad Müller --- pageserver/src/tenant.rs | 2 +- pageserver/src/tenant/disk_btree.rs | 4 +- pageserver/src/tenant/storage_layer.rs | 2 - .../src/tenant/storage_layer/delta_layer.rs | 21 +++--- .../src/tenant/storage_layer/image_layer.rs | 23 +++--- pageserver/src/tenant/storage_layer/layer.rs | 5 +- .../tenant/storage_layer/merge_iterator.rs | 4 ++ pageserver/src/tenant/timeline/compaction.rs | 70 ++++++++----------- pageserver/src/tenant/vectored_blob_io.rs | 2 - 9 files changed, 62 insertions(+), 71 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dc6f42eaebaf..637051413f16 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -6810,7 +6810,7 @@ mod tests { vec![ // Image layer at GC horizon PersistentLayerKey { - key_range: Key::MIN..get_key(10), + key_range: Key::MIN..Key::MAX, lsn_range: Lsn(0x30)..Lsn(0x31), is_delta: false }, diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 251d2ab4aded..1583a3826af5 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -262,7 +262,7 @@ where pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a> where - R: 'a, + R: 'a + Send, { DiskBtreeIterator { stream: Box::pin(self.into_stream(start_key, ctx)), @@ -521,7 +521,7 @@ where pub struct DiskBtreeIterator<'a> { #[allow(clippy::type_complexity)] stream: std::pin::Pin< - Box, u64), DiskBtreeError>> + 'a>, + Box, u64), DiskBtreeError>> + 'a + Send>, >, } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2f0c45317d9a..a389358f0d27 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -6,8 +6,6 @@ pub(crate) mod inmemory_layer; pub(crate) mod layer; mod layer_desc; mod layer_name; - -#[cfg(test)] pub mod merge_iterator; use crate::context::{AccessStatsBehavior, RequestContext}; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 43941b6e1739..c34923320aee 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -33,11 +33,14 @@ use crate::page_cache::{self, FileId, PAGE_SZ}; use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; -use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; +use crate::tenant::disk_btree::{ + DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection, +}; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner, + BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + VectoredReadPlanner, }; use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; @@ -53,6 +56,7 @@ use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind}; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; @@ -747,12 +751,10 @@ impl DeltaLayer { } impl DeltaLayerInner { - #[cfg(test)] pub(crate) fn key_range(&self) -> &Range { &self.layer_key_range } - #[cfg(test)] pub(crate) fn lsn_range(&self) -> &Range { &self.layer_lsn_range } @@ -1512,7 +1514,6 @@ impl DeltaLayerInner { offset } - #[cfg(test)] pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> { let block_reader = FileBlockReader::new(&self.file, self.file_id); let tree_reader = @@ -1523,7 +1524,7 @@ impl DeltaLayerInner { index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx), key_values_batch: std::collections::VecDeque::new(), is_end: false, - planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new( + planner: StreamingVectoredReadPlanner::new( 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer. 1024, // The default value. Unit tests might use a different value ), @@ -1595,17 +1596,15 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del } } -#[cfg(test)] pub struct DeltaLayerIterator<'a> { delta_layer: &'a DeltaLayerInner, ctx: &'a RequestContext, - planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner, - index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>, - key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>, + planner: StreamingVectoredReadPlanner, + index_iter: DiskBtreeIterator<'a>, + key_values_batch: VecDeque<(Key, Lsn, Value)>, is_end: bool, } -#[cfg(test)] impl<'a> DeltaLayerIterator<'a> { /// Retrieve a batch of key-value pairs into the iterator buffer. async fn next_batch(&mut self) -> anyhow::Result<()> { diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a88a1e642958..c7f41b66befc 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -29,13 +29,16 @@ use crate::page_cache::{self, FileId, PAGE_SZ}; use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; -use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; +use crate::tenant::disk_btree::{ + DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection, +}; use crate::tenant::storage_layer::{ LayerAccessStats, ValueReconstructResult, ValueReconstructState, }; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner, + BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + VectoredReadPlanner, }; use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; @@ -50,6 +53,7 @@ use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; @@ -369,12 +373,10 @@ impl ImageLayer { } impl ImageLayerInner { - #[cfg(test)] pub(crate) fn key_range(&self) -> &Range { &self.key_range } - #[cfg(test)] pub(crate) fn lsn(&self) -> Lsn { self.lsn } @@ -699,7 +701,6 @@ impl ImageLayerInner { } } - #[cfg(test)] pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> { let block_reader = FileBlockReader::new(&self.file, self.file_id); let tree_reader = @@ -708,9 +709,9 @@ impl ImageLayerInner { image_layer: self, ctx, index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx), - key_values_batch: std::collections::VecDeque::new(), + key_values_batch: VecDeque::new(), is_end: false, - planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new( + planner: StreamingVectoredReadPlanner::new( 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer. 1024, // The default value. Unit tests might use a different value ), @@ -974,17 +975,15 @@ impl Drop for ImageLayerWriter { } } -#[cfg(test)] pub struct ImageLayerIterator<'a> { image_layer: &'a ImageLayerInner, ctx: &'a RequestContext, - planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner, - index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>, - key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>, + planner: StreamingVectoredReadPlanner, + index_iter: DiskBtreeIterator<'a>, + key_values_batch: VecDeque<(Key, Lsn, Value)>, is_end: bool, } -#[cfg(test)] impl<'a> ImageLayerIterator<'a> { /// Retrieve a batch of key-value pairs into the iterator buffer. async fn next_batch(&mut self) -> anyhow::Result<()> { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index dbf6c60aaee0..d9cbaba529d5 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -385,6 +385,7 @@ impl Layer { } /// Get all key/values in the layer. Should be replaced with an iterator-based API in the future. + #[allow(dead_code)] pub(crate) async fn load_key_values( &self, ctx: &RequestContext, @@ -1918,7 +1919,7 @@ impl ResidentLayer { self.owner.metadata() } - #[cfg(test)] + /// Cast the layer to a delta, return an error if it is an image layer. pub(crate) async fn get_as_delta( &self, ctx: &RequestContext, @@ -1930,7 +1931,7 @@ impl ResidentLayer { } } - #[cfg(test)] + /// Cast the layer to an image, return an error if it is a delta layer. pub(crate) async fn get_as_image( &self, ctx: &RequestContext, diff --git a/pageserver/src/tenant/storage_layer/merge_iterator.rs b/pageserver/src/tenant/storage_layer/merge_iterator.rs index 0edfd4bd4075..6f59b2fd7765 100644 --- a/pageserver/src/tenant/storage_layer/merge_iterator.rs +++ b/pageserver/src/tenant/storage_layer/merge_iterator.rs @@ -547,5 +547,9 @@ mod tests { &ctx, ); assert_merge_iter_equal(&mut merge_iter, &expect).await; + + is_send(merge_iter); } + + fn is_send(_: impl Send) {} } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index f251b667c2fb..a648432b4d08 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -27,6 +27,7 @@ use utils::id::TimelineId; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD}; +use crate::tenant::storage_layer::merge_iterator::MergeIterator; use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc}; use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter}; use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome}; @@ -1039,10 +1040,12 @@ impl Timeline { ); // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs. // Also, collect the layer information to decide when to split the new delta layers. - let mut all_key_values = Vec::new(); + let mut downloaded_layers = Vec::new(); let mut delta_split_points = BTreeSet::new(); for layer in &layer_selection { - all_key_values.extend(layer.load_key_values(ctx).await?); + let resident_layer = layer.download_and_keep_resident().await?; + downloaded_layers.push(resident_layer); + let desc = layer.layer_desc(); if desc.is_delta() { // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon) @@ -1052,44 +1055,28 @@ impl Timeline { delta_split_points.insert(key_range.end); } } - // Key small to large, LSN low to high, if the same LSN has both image and delta due to the merge of delta layers and - // image layers, make image appear before than delta. - struct ValueWrapper<'a>(&'a crate::repository::Value); - impl Ord for ValueWrapper<'_> { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - use crate::repository::Value; - use std::cmp::Ordering; - match (self.0, other.0) { - (Value::Image(_), Value::WalRecord(_)) => Ordering::Less, - (Value::WalRecord(_), Value::Image(_)) => Ordering::Greater, - _ => Ordering::Equal, - } - } - } - impl PartialOrd for ValueWrapper<'_> { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } - } - impl PartialEq for ValueWrapper<'_> { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == std::cmp::Ordering::Equal + let mut delta_layers = Vec::new(); + let mut image_layers = Vec::new(); + for resident_layer in &downloaded_layers { + if resident_layer.layer_desc().is_delta() { + let layer = resident_layer.get_as_delta(ctx).await?; + delta_layers.push(layer); + } else { + let layer = resident_layer.get_as_image(ctx).await?; + image_layers.push(layer); } } - impl Eq for ValueWrapper<'_> {} - all_key_values.sort_by(|(k1, l1, v1), (k2, l2, v2)| { - (k1, l1, ValueWrapper(v1)).cmp(&(k2, l2, ValueWrapper(v2))) - }); + let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx); // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas. // Data of the same key. let mut accumulated_values = Vec::new(); - let mut last_key = all_key_values.first().unwrap().0; // TODO: assert all_key_values not empty + let mut last_key: Option = None; /// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon. async fn flush_accumulated_states( tline: &Arc, key: Key, - accumulated_values: &[&(Key, Lsn, crate::repository::Value)], + accumulated_values: &[(Key, Lsn, crate::repository::Value)], horizon: Lsn, ) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> { let mut base_image = None; @@ -1190,7 +1177,7 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, - &(all_key_values.first().unwrap().0..all_key_values.last().unwrap().0.next()), + &(Key::MIN..Key::MAX), // covers the full key range gc_cutoff, ctx, ) @@ -1200,20 +1187,24 @@ impl Timeline { let delta_split_points = delta_split_points.into_iter().collect_vec(); let mut current_delta_split_point = 0; let mut delta_layers = Vec::new(); - for item @ (key, _, _) in &all_key_values { - if &last_key == key { - accumulated_values.push(item); + while let Some((key, lsn, val)) = merge_iter.next().await? { + if last_key.is_none() || last_key.as_ref() == Some(&key) { + if last_key.is_none() { + last_key = Some(key); + } + accumulated_values.push((key, lsn, val)); } else { + let last_key = last_key.as_mut().unwrap(); let (deltas, image) = - flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff) + flush_accumulated_states(self, *last_key, &accumulated_values, gc_cutoff) .await?; // Put the image into the image layer. Currently we have a single big layer for the compaction. - image_layer_writer.put_image(last_key, image, ctx).await?; + image_layer_writer.put_image(*last_key, image, ctx).await?; delta_values.extend(deltas); delta_layers.extend( flush_deltas( &mut delta_values, - last_key, + *last_key, &delta_split_points, &mut current_delta_split_point, self, @@ -1223,11 +1214,12 @@ impl Timeline { .await?, ); accumulated_values.clear(); - accumulated_values.push(item); - last_key = *key; + *last_key = key; + accumulated_values.push((key, lsn, val)); } } + let last_key = last_key.expect("no keys produced during compaction"); // TODO: move this part to the loop body let (deltas, image) = flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?; diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 5a0986ea12ec..54a3ad789b9f 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -396,7 +396,6 @@ impl<'a> VectoredBlobReader<'a> { /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for /// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and /// max_cnt constraints. -#[cfg(test)] pub struct StreamingVectoredReadPlanner { read_builder: Option, // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`] @@ -410,7 +409,6 @@ pub struct StreamingVectoredReadPlanner { cnt: usize, } -#[cfg(test)] impl StreamingVectoredReadPlanner { pub fn new(max_read_size: u64, max_cnt: usize) -> Self { assert!(max_cnt > 0);