Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compaction_level0_phase1: bypass PS PageCache for data blocks #8543

Merged
merged 14 commits into from
Jul 31, 2024
1 change: 1 addition & 0 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.get_impl, "starting with get page implementation");
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");

let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
Expand Down
19 changes: 19 additions & 0 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use utils::{
logging::LogFormat,
};

use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
Expand Down Expand Up @@ -295,6 +296,10 @@ pub struct PageServerConf {
pub ephemeral_bytes_per_memory_kb: usize,

pub l0_flush: L0FlushConfig,

/// This flag is temporary and will be removed after gradual rollout.
/// See <https://github.com/neondatabase/neon/issues/8184>.
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
}

/// We do not want to store this in a PageServerConf because the latter may be logged
Expand Down Expand Up @@ -401,6 +406,8 @@ struct PageServerConfigBuilder {
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,

l0_flush: BuilderValue<L0FlushConfig>,

compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,
}

impl PageServerConfigBuilder {
Expand Down Expand Up @@ -490,6 +497,7 @@ impl PageServerConfigBuilder {
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
}
}
}
Expand Down Expand Up @@ -673,6 +681,10 @@ impl PageServerConfigBuilder {
self.l0_flush = BuilderValue::Set(value);
}

pub fn compact_level0_phase1_value_access(&mut self, value: CompactL0Phase1ValueAccess) {
self.compact_level0_phase1_value_access = BuilderValue::Set(value);
}

pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();

Expand Down Expand Up @@ -730,6 +742,7 @@ impl PageServerConfigBuilder {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
compact_level0_phase1_value_access,
}
CUSTOM LOGIC
{
Expand Down Expand Up @@ -1002,6 +1015,9 @@ impl PageServerConf {
"l0_flush" => {
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
}
"compact_level0_phase1_value_access" => {
builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
Expand Down Expand Up @@ -1086,6 +1102,7 @@ impl PageServerConf {
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
}
}
}
Expand Down Expand Up @@ -1327,6 +1344,7 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Correct defaults should be used when no config values are provided"
);
Expand Down Expand Up @@ -1401,6 +1419,7 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Should be able to parse all basic config values correctly"
);
Expand Down
3 changes: 1 addition & 2 deletions pageserver/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use std::time::Duration;
pub use pageserver_api::key::{Key, KEY_SIZE};

/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Value {
/// An Image value contains a full copy of the value
Image(Bytes),
Expand Down
13 changes: 10 additions & 3 deletions pageserver/src/tenant/disk_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,19 @@ where
let mut stack = Vec::new();
stack.push((self.root_blk, None));
let block_cursor = self.reader.block_cursor();
let mut node_buf = [0_u8; PAGE_SZ];
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor
// Read the node, through the PS PageCache, into local variable `node_buf`.
// We could keep the page cache read guard alive, but, at the time of writing,
// we run quite small PS PageCache s => can't risk running out of
// PageCache space because this stream isn't consumed fast enough.
let page_read_guard = block_cursor
.read_blk(self.start_blk + node_blknum, ctx)
.await?;
node_buf.copy_from_slice(page_read_guard.as_ref());
drop(page_read_guard); // drop page cache read guard early

let node = OnDiskNode::deparse(node_buf.as_ref())?;
let node = OnDiskNode::deparse(&node_buf)?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;

Expand Down Expand Up @@ -345,6 +351,7 @@ where
Either::Left(idx..node.num_children.into())
};


// idx points to the first match now. Keep going from there
while let Some(idx) = iter.next() {
let key_off = idx * suffix_len;
Expand Down
184 changes: 179 additions & 5 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,140 @@ impl Timeline {

// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = all_keys.iter();
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
//
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
// option and validation code once we've reached confidence.
enum AllValuesIter<'a> {
PageCachedBlobIo {
all_keys_iter: VecIter<'a>,
},
StreamingKmergeBypassingPageCache {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
iter: &mut VecIter<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
let Some(DeltaEntry {
key,
lsn,
val: value_ref,
..
}) = iter.next()
else {
return Ok(None);
};
let value = value_ref.load(ctx).await?;
Ok(Some((*key, *lsn, value)))
}
async fn next(
&mut self,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
match self {
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
if cfg!(debug_assertions) || cfg!(feature = "testing") {
warn!($($arg)*);
panic!("CompactL0BypassPageCacheValidation failure, check logs");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use std::time::Duration;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!($($arg)*);
});
}}
}
match (&all_keys_iter_item, &merge_iter_item) {
(Err(_), Err(_)) => {
// don't bother asserting equivality of the errors
}
(Err(all_keys), Ok(merge)) => {
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
},
(Ok(all_keys), Err(merge)) => {
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
},
(Ok(None), Ok(None)) => { }
(Ok(Some(all_keys)), Ok(None)) => {
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
}
(Ok(None), Ok(Some(merge))) => {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
}
}
}
}
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
problame marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
all_keys_iter: all_keys.iter(),
},
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
let merge_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
merge_iter,
all_keys_iter: all_keys.iter(),
},
}
}
};

// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = all_keys
Expand Down Expand Up @@ -771,11 +904,11 @@ impl Timeline {
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
let mut next_hole = 0; // index of next hole in holes vector

for &DeltaEntry {
key, lsn, ref val, ..
} in all_values_iter
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.await
.map_err(CompactionError::Other)?
{
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
Expand Down Expand Up @@ -960,6 +1093,10 @@ impl Timeline {
}
}

// Without this, rustc complains about deltas_to_compact still
// being borrowed when we `.into_iter()` below.
drop(all_values_iter);
problame marked this conversation as resolved.
Show resolved Hide resolved

Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: deltas_to_compact
Expand Down Expand Up @@ -1067,6 +1204,43 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
}
}

#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum CompactL0Phase1ValueAccess {
/// The old way.
PageCachedBlobIo,
/// The new way.
StreamingKmerge {
/// If set, we run both the old way and the new way, validate that
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
/// and if the validation fails,
/// - in tests: fail them with a panic or
/// - in prod, log a rate-limited warning and use the old way's results.
///
/// If not set, we only run the new way and trust its results.
validate: Option<CompactL0BypassPageCacheValidation>,
},
}

/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum CompactL0BypassPageCacheValidation {
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
}

impl Default for CompactL0Phase1ValueAccess {
fn default() -> Self {
CompactL0Phase1ValueAccess::StreamingKmerge {
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
}
}
}

impl Timeline {
/// Entry point for new tiered compaction algorithm.
///
Expand Down
Loading