diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 7a96c86ded20..2d00f311fbaa 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -129,6 +129,7 @@ fn main() -> anyhow::Result<()> { info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.get_impl, "starting with get page implementation"); info!(?conf.get_vectored_impl, "starting with vectored get page implementation"); + info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access"); let tenants_path = conf.tenants_path(); if !tenants_path.exists() { diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f71881683d12..41c2fe0af35a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -29,6 +29,7 @@ use utils::{ logging::LogFormat, }; +use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess; use crate::tenant::vectored_blob_io::MaxVectoredReadBytes; use crate::tenant::{config::TenantConfOpt, timeline::GetImpl}; use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; @@ -295,6 +296,10 @@ pub struct PageServerConf { pub ephemeral_bytes_per_memory_kb: usize, pub l0_flush: L0FlushConfig, + + /// This flag is temporary and will be removed after gradual rollout. + /// See . + pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -401,6 +406,8 @@ struct PageServerConfigBuilder { ephemeral_bytes_per_memory_kb: BuilderValue, l0_flush: BuilderValue, + + compact_level0_phase1_value_access: BuilderValue, } impl PageServerConfigBuilder { @@ -490,6 +497,7 @@ impl PageServerConfigBuilder { validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET), ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), l0_flush: Set(L0FlushConfig::default()), + compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()), } } } @@ -673,6 +681,10 @@ impl PageServerConfigBuilder { self.l0_flush = BuilderValue::Set(value); } + pub fn compact_level0_phase1_value_access(&mut self, value: CompactL0Phase1ValueAccess) { + self.compact_level0_phase1_value_access = BuilderValue::Set(value); + } + pub fn build(self, id: NodeId) -> anyhow::Result { let default = Self::default_values(); @@ -730,6 +742,7 @@ impl PageServerConfigBuilder { image_compression, ephemeral_bytes_per_memory_kb, l0_flush, + compact_level0_phase1_value_access, } CUSTOM LOGIC { @@ -1002,6 +1015,9 @@ impl PageServerConf { "l0_flush" => { builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?) } + "compact_level0_phase1_value_access" => { + builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?) + } _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -1086,6 +1102,7 @@ impl PageServerConf { validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, l0_flush: L0FlushConfig::default(), + compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(), } } } @@ -1327,6 +1344,7 @@ background_task_maximum_delay = '334 s' image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, l0_flush: L0FlushConfig::default(), + compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(), }, "Correct defaults should be used when no config values are provided" ); @@ -1401,6 +1419,7 @@ background_task_maximum_delay = '334 s' image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, l0_flush: L0FlushConfig::default(), + compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 5a334d029035..e4ebafd9275d 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -8,8 +8,7 @@ use std::time::Duration; pub use pageserver_api::key::{Key, KEY_SIZE}; /// A 'value' stored for a one Key. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[cfg_attr(test, derive(PartialEq))] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum Value { /// An Image value contains a full copy of the value Image(Bytes), diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 1583a3826af5..0107b0ac7e9c 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -296,13 +296,19 @@ where let mut stack = Vec::new(); stack.push((self.root_blk, None)); let block_cursor = self.reader.block_cursor(); + let mut node_buf = [0_u8; PAGE_SZ]; while let Some((node_blknum, opt_iter)) = stack.pop() { - // Locate the node. - let node_buf = block_cursor + // Read the node, through the PS PageCache, into local variable `node_buf`. + // We could keep the page cache read guard alive, but, at the time of writing, + // we run quite small PS PageCache s => can't risk running out of + // PageCache space because this stream isn't consumed fast enough. + let page_read_guard = block_cursor .read_blk(self.start_blk + node_blknum, ctx) .await?; + node_buf.copy_from_slice(page_read_guard.as_ref()); + drop(page_read_guard); // drop page cache read guard early - let node = OnDiskNode::deparse(node_buf.as_ref())?; + let node = OnDiskNode::deparse(&node_buf)?; let prefix_len = node.prefix_len as usize; let suffix_len = node.suffix_len as usize; @@ -345,6 +351,7 @@ where Either::Left(idx..node.num_children.into()) }; + // idx points to the first match now. Keep going from there while let Some(idx) = iter.next() { let key_off = idx * suffix_len; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 3292b4a12130..7bfa8e9d3525 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -698,7 +698,140 @@ impl Timeline { // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. - let all_values_iter = all_keys.iter(); + // If there's both a Value::Image and Value::WalRecord for the same (key,lsn), + // then the Value::Image is ordered before Value::WalRecord. + // + // TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io + // option and validation code once we've reached confidence. + enum AllValuesIter<'a> { + PageCachedBlobIo { + all_keys_iter: VecIter<'a>, + }, + StreamingKmergeBypassingPageCache { + merge_iter: MergeIterator<'a>, + }, + ValidatingStreamingKmergeBypassingPageCache { + mode: CompactL0BypassPageCacheValidation, + merge_iter: MergeIterator<'a>, + all_keys_iter: VecIter<'a>, + }, + } + type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes + impl AllValuesIter<'_> { + async fn next_all_keys_iter( + iter: &mut VecIter<'_>, + ctx: &RequestContext, + ) -> anyhow::Result> { + let Some(DeltaEntry { + key, + lsn, + val: value_ref, + .. + }) = iter.next() + else { + return Ok(None); + }; + let value = value_ref.load(ctx).await?; + Ok(Some((*key, *lsn, value))) + } + async fn next( + &mut self, + ctx: &RequestContext, + ) -> anyhow::Result> { + match self { + AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => { + Self::next_all_keys_iter(iter, ctx).await + } + AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await, + AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async { + // advance both iterators + let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await; + let merge_iter_item = merge_iter.next().await; + // compare results & log warnings as needed + macro_rules! rate_limited_warn { + ($($arg:tt)*) => {{ + if cfg!(debug_assertions) || cfg!(feature = "testing") { + warn!($($arg)*); + panic!("CompactL0BypassPageCacheValidation failure, check logs"); + } + use once_cell::sync::Lazy; + use utils::rate_limit::RateLimit; + use std::sync::Mutex; + use std::time::Duration; + static LOGGED: Lazy> = + Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10)))); + let mut rate_limit = LOGGED.lock().unwrap(); + rate_limit.call(|| { + warn!($($arg)*); + }); + }} + } + match (&all_keys_iter_item, &merge_iter_item) { + (Err(_), Err(_)) => { + // don't bother asserting equivality of the errors + } + (Err(all_keys), Ok(merge)) => { + rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}"); + }, + (Ok(all_keys), Err(merge)) => { + rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}"); + }, + (Ok(None), Ok(None)) => { } + (Ok(Some(all_keys)), Ok(None)) => { + rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some"); + } + (Ok(None), Ok(Some(merge))) => { + rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some"); + } + (Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => { + match mode { + // TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one + CompactL0BypassPageCacheValidation::KeyLsn => { + let all_keys = (all_keys_key, all_keys_lsn); + let merge = (merge_key, merge_lsn); + if all_keys != merge { + rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter"); + } + } + CompactL0BypassPageCacheValidation::KeyLsnValue => { + let all_keys = (all_keys_key, all_keys_lsn, all_keys_value); + let merge = (merge_key, merge_lsn, merge_value); + if all_keys != merge { + rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter"); + } + } + } + } + } + // in case of mismatch, trust the legacy all_keys_iter_item + all_keys_iter_item + }.instrument(info_span!("next")).await + } + } + } + let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access { + CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo { + all_keys_iter: all_keys.iter(), + }, + CompactL0Phase1ValueAccess::StreamingKmerge { validate } => { + let merge_iter = { + let mut deltas = Vec::with_capacity(deltas_to_compact.len()); + for l in deltas_to_compact.iter() { + let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?; + deltas.push(l); + } + MergeIterator::create(&deltas, &[], ctx) + }; + match validate { + None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter }, + Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { + mode: validate.clone(), + merge_iter, + all_keys_iter: all_keys.iter(), + }, + } + } + }; // This iterator walks through all keys and is needed to calculate size used by each key let mut all_keys_iter = all_keys @@ -771,11 +904,11 @@ impl Timeline { let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key let mut next_hole = 0; // index of next hole in holes vector - for &DeltaEntry { - key, lsn, ref val, .. - } in all_values_iter + while let Some((key, lsn, value)) = all_values_iter + .next(ctx) + .await + .map_err(CompactionError::Other)? { - let value = val.load(ctx).await.map_err(CompactionError::Other)?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn { @@ -960,6 +1093,10 @@ impl Timeline { } } + // Without this, rustc complains about deltas_to_compact still + // being borrowed when we `.into_iter()` below. + drop(all_values_iter); + Ok(CompactLevel0Phase1Result { new_layers, deltas_to_compact: deltas_to_compact @@ -1067,6 +1204,43 @@ impl TryFrom for CompactLevel0Phase1Stats { } } +#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] +#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)] +pub enum CompactL0Phase1ValueAccess { + /// The old way. + PageCachedBlobIo, + /// The new way. + StreamingKmerge { + /// If set, we run both the old way and the new way, validate that + /// they are identical (=> [`CompactL0BypassPageCacheValidation`]), + /// and if the validation fails, + /// - in tests: fail them with a panic or + /// - in prod, log a rate-limited warning and use the old way's results. + /// + /// If not set, we only run the new way and trust its results. + validate: Option, + }, +} + +/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`]. +#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum CompactL0BypassPageCacheValidation { + /// Validate that the series of (key, lsn) pairs are the same. + KeyLsn, + /// Validate that the entire output of old and new way is identical. + KeyLsnValue, +} + +impl Default for CompactL0Phase1ValueAccess { + fn default() -> Self { + CompactL0Phase1ValueAccess::StreamingKmerge { + // TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident + validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue), + } + } +} + impl Timeline { /// Entry point for new tiered compaction algorithm. ///