Skip to content

Commit

Permalink
feat(pageserver): add delta layer iterator (#8064)
Browse files Browse the repository at this point in the history
part of #8002

## Summary of changes

Add delta layer iterator and tests.

---------

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Jun 27, 2024
1 parent 66b0bf4 commit 23827c6
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 4 deletions.
197 changes: 197 additions & 0 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,24 @@ impl DeltaLayerInner {
);
offset
}

#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
DeltaLayerIterator {
delta_layer: self,
ctx,
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
key_values_batch: std::collections::VecDeque::new(),
is_end: false,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new(
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1024, // The default value. Unit tests might use a different value
),
}
}
}

/// A set of data associated with a delta layer key and its value
Expand Down Expand Up @@ -1551,6 +1569,70 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
}
}

#[cfg(test)]
pub struct DeltaLayerIterator<'a> {
delta_layer: &'a DeltaLayerInner,
ctx: &'a RequestContext,
planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner,
index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>,
key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>,
is_end: bool,
}

#[cfg(test)]
impl<'a> DeltaLayerIterator<'a> {
/// Retrieve a batch of key-value pairs into the iterator buffer.
async fn next_batch(&mut self) -> anyhow::Result<()> {
assert!(self.key_values_batch.is_empty());
assert!(!self.is_end);

let plan = loop {
if let Some(res) = self.index_iter.next().await {
let (raw_key, value) = res?;
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let offset = blob_ref.pos();
if let Some(batch_plan) = self.planner.handle(key, lsn, offset, BlobFlag::None) {
break batch_plan;
}
} else {
self.is_end = true;
let data_end_offset = self.delta_layer.index_start_offset();
break self.planner.handle_range_end(data_end_offset);
}
};
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
for meta in blobs_buf.blobs.iter() {
let value = Value::des(&frozen_buf[meta.start..meta.end])?;
next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
}
self.key_values_batch = next_batch;
Ok(())
}

pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
if self.key_values_batch.is_empty() {
if self.is_end {
return Ok(None);
}
self.next_batch().await?;
}
Ok(Some(
self.key_values_batch
.pop_front()
.expect("should not be empty"),
))
}
}

#[cfg(test)]
mod test {
use std::collections::BTreeMap;
Expand All @@ -1560,6 +1642,9 @@ mod test {
use rand::RngCore;

use super::*;
use crate::tenant::harness::TIMELINE_ID;
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::Tenant;
use crate::{
context::DownloadBehavior,
task_mgr::TaskKind,
Expand Down Expand Up @@ -2126,4 +2211,116 @@ mod test {
assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
}
}

async fn produce_delta_layer(
tenant: &Tenant,
tline: &Arc<Timeline>,
mut deltas: Vec<(Key, Lsn, Value)>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
deltas.sort_by(|(k1, l1, _), (k2, l2, _)| (k1, l1).cmp(&(k2, l2)));
let (key_start, _, _) = deltas.first().unwrap();
let (key_max, _, _) = deltas.first().unwrap();
let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
let lsn_end = Lsn(lsn_max.0 + 1);
let mut writer = DeltaLayerWriter::new(
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
*key_start,
(*lsn_min)..lsn_end,
ctx,
)
.await?;
let key_end = key_max.next();

for (key, lsn, value) in deltas {
writer.put_value(key, lsn, value, ctx).await?;
}
let delta_layer = writer.finish(key_end, tline, ctx).await?;

Ok::<_, anyhow::Error>(delta_layer)
}

async fn assert_delta_iter_equal(
delta_iter: &mut DeltaLayerIterator<'_>,
expect: &[(Key, Lsn, Value)],
) {
let mut expect_iter = expect.iter();
loop {
let o1 = delta_iter.next().await.unwrap();
let o2 = expect_iter.next();
assert_eq!(o1.is_some(), o2.is_some());
if o1.is_none() && o2.is_none() {
break;
}
let (k1, l1, v1) = o1.unwrap();
let (k2, l2, v2) = o2.unwrap();
assert_eq!(&k1, k2);
assert_eq!(l1, *l2);
assert_eq!(&v1, v2);
}
}

#[tokio::test]
async fn delta_layer_iterator() {
use crate::repository::Value;
use bytes::Bytes;

let harness = TenantHarness::create("delta_layer_iterator").unwrap();
let (tenant, ctx) = harness.load().await;

let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
const N: usize = 1000;
let test_deltas = (0..N)
.map(|idx| {
(
get_key(idx as u32 / 10),
Lsn(0x10 * ((idx as u64) % 10 + 1)),
Value::Image(Bytes::from(format!("img{idx:05}"))),
)
})
.collect_vec();
let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
.await
.unwrap();
let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
for max_read_size in [1, 1024] {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
let mut iter = delta_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
let mut num_items = 0;
for _ in 0..3 {
iter.next_batch().await.unwrap();
num_items += iter.key_values_batch.len();
if max_read_size == 1 {
// every key should be a batch b/c the value is larger than max_read_size
assert_eq!(iter.key_values_batch.len(), 1);
} else {
assert_eq!(iter.key_values_batch.len(), batch_size);
}
if num_items >= N {
break;
}
iter.key_values_batch.clear();
}
// Test if the result is correct
let mut iter = delta_layer.iter(&ctx);
iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
assert_delta_iter_equal(&mut iter, &test_deltas).await;
}
}
}
}
8 changes: 4 additions & 4 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5481,12 +5481,12 @@ impl Timeline {
}
images.sort_unstable_by(|(ka, _), (kb, _)| ka.cmp(kb));
let min_key = *images.first().map(|(k, _)| k).unwrap();
let max_key = images.last().map(|(k, _)| k).unwrap().next();
let end_key = images.last().map(|(k, _)| k).unwrap().next();
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(min_key..max_key),
&(min_key..end_key),
lsn,
ctx,
)
Expand Down Expand Up @@ -5518,7 +5518,7 @@ impl Timeline {
let last_record_lsn = self.get_last_record_lsn();
deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
let min_key = *deltas.first().map(|(k, _, _)| k).unwrap();
let max_key = deltas.last().map(|(k, _, _)| k).unwrap().next();
let end_key = deltas.last().map(|(k, _, _)| k).unwrap().next();
let min_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
let max_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
assert!(
Expand All @@ -5541,7 +5541,7 @@ impl Timeline {
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
let delta_layer = delta_layer_writer.finish(max_key, self, ctx).await?;
let delta_layer = delta_layer_writer.finish(end_key, self, ctx).await?;

{
let mut guard = self.layers.write().await;
Expand Down

1 comment on commit 23827c6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3028 tests run: 2900 passed, 2 failed, 126 skipped (full report)


Failures on Postgres 14

  • test_pgbench_intensive_init_workload[neon_on-github-actions-selfhosted-1000]: release
  • test_heavy_write_workload[neon_on-github-actions-selfhosted-10-5-5]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pgbench_intensive_init_workload[neon_on-release-pg14-github-actions-selfhosted-1000] or test_heavy_write_workload[neon_on-release-pg14-github-actions-selfhosted-10-5-5]"
Flaky tests (1)

Postgres 15

  • test_secondary_background_downloads: debug

Code coverage* (full report)

  • functions: 32.7% (6918 of 21143 functions)
  • lines: 50.1% (54111 of 108061 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
23827c6 at 2024-06-27T17:29:02.608Z :recycle:

Please sign in to comment.