From 29699529dfdd4642d71e018047071a01dacb0cf0 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:30:44 -0400 Subject: [PATCH] feat(pageserver): filter keys with gc-compaction (#9004) Part of https://github.com/neondatabase/neon/issues/8002 Close https://github.com/neondatabase/neon/issues/8920 Legacy compaction (as well as gc-compaction) rely on the GC process to remove unused layer files, but this relies on many factors (i.e., key partition) to ensure data in a dropped table can be eventually removed. In gc-compaction, we consider the keyspace information when doing the compaction process. If a key is not in the keyspace, we will skip that key and not include it in the final output. However, this is not easy to implement because gc-compaction considers branch points (i.e., retain_lsns) and the retained keyspaces could change across different LSNs. Therefore, for now, we only remove aux v1 keys in the compaction process. ## Summary of changes * Add `FilterIterator` to filter out keys. * Integrate `FilterIterator` with gc-compaction. * Add `collect_gc_compaction_keyspace` for a spec of keyspaces that can be retained during the gc-compaction process. --------- Signed-off-by: Alex Chi Z --- pageserver/src/pgdatadir_mapping.rs | 30 +++ pageserver/src/tenant/storage_layer.rs | 2 +- .../tenant/storage_layer/filter_iterator.rs | 205 ++++++++++++++++++ pageserver/src/tenant/timeline/compaction.rs | 9 +- 4 files changed, 244 insertions(+), 2 deletions(-) create mode 100644 pageserver/src/tenant/storage_layer/filter_iterator.rs diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 5f8766ca2c51..7aa313f03143 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -840,6 +840,36 @@ impl Timeline { Ok(total_size * BLCKSZ as u64) } + /// Get a KeySpace that covers all the Keys that are in use at AND below the given LSN. This is only used + /// for gc-compaction. + /// + /// gc-compaction cannot use the same `collect_keyspace` function as the legacy compaction because it + /// processes data at multiple LSNs and needs to be aware of the fact that some key ranges might need to + /// be kept only for a specific range of LSN. + /// + /// Consider the case that the user created branches at LSN 10 and 20, where the user created a table A at + /// LSN 10 and dropped that table at LSN 20. `collect_keyspace` at LSN 10 will return the key range + /// corresponding to that table, while LSN 20 won't. The keyspace info at a single LSN is not enough to + /// determine which keys to retain/drop for gc-compaction. + /// + /// For now, it only drops AUX-v1 keys. But in the future, the function will be extended to return the keyspace + /// to be retained for each of the branch LSN. + /// + /// The return value is (dense keyspace, sparse keyspace). + pub(crate) async fn collect_gc_compaction_keyspace( + &self, + ) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> { + let metadata_key_begin = Key::metadata_key_range().start; + let aux_v1_key = AUX_FILES_KEY; + let dense_keyspace = KeySpace { + ranges: vec![Key::MIN..aux_v1_key, aux_v1_key.next()..metadata_key_begin], + }; + Ok(( + dense_keyspace, + SparseKeySpace(KeySpace::single(Key::metadata_key_range())), + )) + } + /// /// Get a KeySpace that covers all the Keys that are in use at the given LSN. /// Anything that's not listed maybe removed from the underlying storage (from diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index cd252aa37132..99bd0ece5763 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -1,13 +1,13 @@ //! Common traits and structs for layers pub mod delta_layer; +pub mod filter_iterator; pub mod image_layer; pub mod inmemory_layer; pub(crate) mod layer; mod layer_desc; mod layer_name; pub mod merge_iterator; - pub mod split_writer; use crate::context::{AccessStatsBehavior, RequestContext}; diff --git a/pageserver/src/tenant/storage_layer/filter_iterator.rs b/pageserver/src/tenant/storage_layer/filter_iterator.rs new file mode 100644 index 000000000000..f45dd4b80198 --- /dev/null +++ b/pageserver/src/tenant/storage_layer/filter_iterator.rs @@ -0,0 +1,205 @@ +use std::ops::Range; + +use anyhow::bail; +use pageserver_api::{ + key::Key, + keyspace::{KeySpace, SparseKeySpace}, +}; +use utils::lsn::Lsn; + +use crate::repository::Value; + +use super::merge_iterator::MergeIterator; + +/// A filter iterator over merge iterators (and can be easily extended to other types of iterators). +/// +/// The iterator will skip any keys not included in the keyspace filter. In other words, the keyspace filter contains the keys +/// to be retained. +pub struct FilterIterator<'a> { + inner: MergeIterator<'a>, + retain_key_filters: Vec>, + current_filter_idx: usize, +} + +impl<'a> FilterIterator<'a> { + pub fn create( + inner: MergeIterator<'a>, + dense_keyspace: KeySpace, + sparse_keyspace: SparseKeySpace, + ) -> anyhow::Result { + let mut retain_key_filters = Vec::new(); + retain_key_filters.extend(dense_keyspace.ranges); + retain_key_filters.extend(sparse_keyspace.0.ranges); + retain_key_filters.sort_by(|a, b| a.start.cmp(&b.start)); + // Verify key filters are non-overlapping and sorted + for window in retain_key_filters.windows(2) { + if window[0].end > window[1].start { + bail!( + "Key filters are overlapping: {:?} and {:?}", + window[0], + window[1] + ); + } + } + Ok(Self { + inner, + retain_key_filters, + current_filter_idx: 0, + }) + } + + pub async fn next(&mut self) -> anyhow::Result> { + while let Some(item) = self.inner.next().await? { + while self.current_filter_idx < self.retain_key_filters.len() + && item.0 >= self.retain_key_filters[self.current_filter_idx].end + { + // [filter region] [filter region] [filter region] + // ^ item + // ^ current filter + self.current_filter_idx += 1; + // [filter region] [filter region] [filter region] + // ^ item + // ^ current filter + } + if self.current_filter_idx >= self.retain_key_filters.len() { + // We already exhausted all filters, so we should return now + // [filter region] [filter region] [filter region] + // ^ item + // ^ current filter (nothing) + return Ok(None); + } + if self.retain_key_filters[self.current_filter_idx].contains(&item.0) { + // [filter region] [filter region] [filter region] + // ^ item + // ^ current filter + return Ok(Some(item)); + } + // If the key is not contained in the key retaining filters, continue to the next item. + // [filter region] [filter region] [filter region] + // ^ item + // ^ current filter + } + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use itertools::Itertools; + use pageserver_api::key::Key; + use utils::lsn::Lsn; + + use crate::{ + tenant::{ + harness::{TenantHarness, TIMELINE_ID}, + storage_layer::delta_layer::test::produce_delta_layer, + }, + DEFAULT_PG_VERSION, + }; + + async fn assert_filter_iter_equal( + filter_iter: &mut FilterIterator<'_>, + expect: &[(Key, Lsn, Value)], + ) { + let mut expect_iter = expect.iter(); + loop { + let o1 = filter_iter.next().await.unwrap(); + let o2 = expect_iter.next(); + assert_eq!(o1.is_some(), o2.is_some()); + if o1.is_none() && o2.is_none() { + break; + } + let (k1, l1, v1) = o1.unwrap(); + let (k2, l2, v2) = o2.unwrap(); + assert_eq!(&k1, k2); + assert_eq!(l1, *l2); + assert_eq!(&v1, v2); + } + } + + #[tokio::test] + async fn filter_keyspace_iterator() { + use crate::repository::Value; + use bytes::Bytes; + + let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + const N: usize = 100; + let test_deltas1 = (0..N) + .map(|idx| { + ( + get_key(idx as u32), + Lsn(0x20 * ((idx as u64) % 10 + 1)), + Value::Image(Bytes::from(format!("img{idx:05}"))), + ) + }) + .collect_vec(); + let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx) + .await + .unwrap(); + + let merge_iter = MergeIterator::create( + &[resident_layer_1.get_as_delta(&ctx).await.unwrap()], + &[], + &ctx, + ); + + let mut filter_iter = FilterIterator::create( + merge_iter, + KeySpace { + ranges: vec![ + get_key(5)..get_key(10), + get_key(20)..get_key(30), + get_key(90)..get_key(110), + get_key(1000)..get_key(2000), + ], + }, + SparseKeySpace(KeySpace::default()), + ) + .unwrap(); + let mut result = Vec::new(); + result.extend(test_deltas1[5..10].iter().cloned()); + result.extend(test_deltas1[20..30].iter().cloned()); + result.extend(test_deltas1[90..100].iter().cloned()); + assert_filter_iter_equal(&mut filter_iter, &result).await; + + let merge_iter = MergeIterator::create( + &[resident_layer_1.get_as_delta(&ctx).await.unwrap()], + &[], + &ctx, + ); + + let mut filter_iter = FilterIterator::create( + merge_iter, + KeySpace { + ranges: vec![ + get_key(0)..get_key(10), + get_key(20)..get_key(30), + get_key(90)..get_key(95), + ], + }, + SparseKeySpace(KeySpace::default()), + ) + .unwrap(); + let mut result = Vec::new(); + result.extend(test_deltas1[0..10].iter().cloned()); + result.extend(test_deltas1[20..30].iter().cloned()); + result.extend(test_deltas1[90..95].iter().cloned()); + assert_filter_iter_equal(&mut filter_iter, &result).await; + } +} diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index d1567b6b39f3..6edc28a11b2e 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -31,6 +31,7 @@ use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder} use crate::page_cache; use crate::tenant::checks::check_valid_layermap; use crate::tenant::remote_timeline_client::WaitCompletionError; +use crate::tenant::storage_layer::filter_iterator::FilterIterator; use crate::tenant::storage_layer::merge_iterator::MergeIterator; use crate::tenant::storage_layer::split_writer::{ SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult, @@ -1772,6 +1773,7 @@ impl Timeline { gc_cutoff, lowest_retain_lsn ); + // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs. // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?) @@ -1820,7 +1822,12 @@ impl Timeline { image_layers.push(layer); } } - let mut merge_iter = MergeIterator::create(&delta_layers, &image_layers, ctx); + let (dense_ks, sparse_ks) = self.collect_gc_compaction_keyspace().await?; + let mut merge_iter = FilterIterator::create( + MergeIterator::create(&delta_layers, &image_layers, ctx), + dense_ks, + sparse_ks, + )?; // Step 2: Produce images+deltas. TODO: ensure newly-produced delta does not overlap with other deltas. // Data of the same key. let mut accumulated_values = Vec::new();