Skip to content

Commit

Permalink
feat(pageserver): add image layer iterator (#8006)
Browse files Browse the repository at this point in the history
part of #8002

## Summary of changes

This pull request adds the image layer iterator. It buffers a fixed
amount of key-value pairs in memory, and give developer an iterator
abstraction over the image layer. Once the buffer is exhausted, it will
issue 1 I/O to fetch the next batch.

Due to the Rust lifetime mysteries, the `get_stream_from` API has been
refactored to `into_stream` and consumes `self`.

Delta layer iterator implementation will be similar, therefore I'll add
it after this pull request gets merged.

---------

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Jun 25, 2024
1 parent 6c5d3b5 commit 76864e6
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 38 deletions.
7 changes: 4 additions & 3 deletions libs/pageserver_api/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ impl Key {
key
}

/// Convert a 18B slice to a key. This function should not be used for metadata keys because field2 is handled differently.
/// Use [`Key::from_i128`] instead if you want to handle 16B keys (i.e., metadata keys).
/// Convert a 18B slice to a key. This function should not be used for 16B metadata keys because `field2` is handled differently.
/// Use [`Key::from_i128`] instead if you want to handle 16B keys (i.e., metadata keys). There are some restrictions on `field2`,
/// and therefore not all 18B slices are valid page server keys.
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
Expand All @@ -173,7 +174,7 @@ impl Key {
}
}

/// Convert a key to a 18B slice. This function should not be used for metadata keys because field2 is handled differently.
/// Convert a key to a 18B slice. This function should not be used for getting a 16B metadata key because `field2` is handled differently.
/// Use [`Key::to_i128`] instead if you want to get a 16B key (i.e., metadata keys).
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl<'a> BlockCursor<'a> {
///
/// The file is assumed to be immutable. This doesn't provide any functions
/// for modifying the file, nor for invalidating the cache if it is modified.
#[derive(Clone)]
pub struct FileBlockReader<'a> {
pub file: &'a VirtualFile,

Expand Down
32 changes: 22 additions & 10 deletions pageserver/src/tenant/disk_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl<'a, const L: usize> OnDiskNode<'a, L> {
///
/// Public reader object, to search the tree.
///
#[derive(Clone)]
pub struct DiskBtreeReader<R, const L: usize>
where
R: BlockReader,
Expand Down Expand Up @@ -259,27 +260,38 @@ where
Ok(result)
}

pub fn iter<'a>(
&'a self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> DiskBtreeIterator<'a> {
pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
where
R: 'a,
{
DiskBtreeIterator {
stream: Box::pin(self.get_stream_from(start_key, ctx)),
stream: Box::pin(self.into_stream(start_key, ctx)),
}
}

/// Return a stream which yields all key, value pairs from the index
/// starting from the first key greater or equal to `start_key`.
///
/// Note that this is a copy of [`Self::visit`].
/// Note 1: that this is a copy of [`Self::visit`].
/// TODO: Once the sequential read path is removed this will become
/// the only index traversal method.
pub fn get_stream_from<'a>(
&'a self,
///
/// Note 2: this function used to take `&self` but it now consumes `self`. This is due to
/// the lifetime constraints of the reader and the stream / iterator it creates. Using `&self`
/// requires the reader to be present when the stream is used, and this creates a lifetime
/// dependency between the reader and the stream. Now if we want to create an iterator that
/// holds the stream, someone will need to keep a reference to the reader, which is inconvenient
/// to use from the image/delta layer APIs.
///
/// Feel free to add the `&self` variant back if it's necessary.
pub fn into_stream<'a>(
self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
where
R: 'a,
{
try_stream! {
let mut stack = Vec::new();
stack.push((self.root_blk, None));
Expand Down
28 changes: 14 additions & 14 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ impl DeltaLayerInner {
);
let mut result = Vec::new();
let mut stream =
Box::pin(self.stream_index_forwards(&index_reader, &[0; DELTA_KEY_SIZE], ctx));
Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let cursor = block_reader.block_cursor();
let mut buf = Vec::new();
Expand Down Expand Up @@ -976,7 +976,7 @@ impl DeltaLayerInner {
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>>
where
Reader: BlockReader,
Reader: BlockReader + Clone,
{
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
Expand All @@ -986,7 +986,7 @@ impl DeltaLayerInner {
let mut range_end_handled = false;

let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
let mut index_stream = std::pin::pin!(index_stream);

while let Some(index_entry) = index_stream.next().await {
Expand Down Expand Up @@ -1241,7 +1241,7 @@ impl DeltaLayerInner {
block_reader,
);

let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
// put in a sentinel value for getting the end offset for last item, and not having to
// repeat the whole read part
Expand Down Expand Up @@ -1300,7 +1300,7 @@ impl DeltaLayerInner {
offsets.start.pos(),
offsets.end.pos(),
meta,
max_read_size,
Some(max_read_size),
))
}
} else {
Expand Down Expand Up @@ -1459,17 +1459,17 @@ impl DeltaLayerInner {

fn stream_index_forwards<'a, R>(
&'a self,
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
start: &'a [u8; DELTA_KEY_SIZE],
ctx: &'a RequestContext,
) -> impl futures::stream::Stream<
Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
> + 'a
where
R: BlockReader,
R: BlockReader + 'a,
{
use futures::stream::TryStreamExt;
let stream = reader.get_stream_from(start, ctx);
let stream = reader.into_stream(start, ctx);
stream.map_ok(|(key, value)| {
let key = DeltaKey::from_slice(&key);
let (key, lsn) = (key.key(), key.lsn());
Expand Down Expand Up @@ -1857,7 +1857,7 @@ mod test {
.finish(entries_meta.key_range.end, &timeline, &ctx)
.await?;

let inner = resident.as_delta(&ctx).await?;
let inner = resident.get_as_delta(&ctx).await?;

let file_size = inner.file.metadata().await?.len();
tracing::info!(
Expand Down Expand Up @@ -2044,11 +2044,11 @@ mod test {

let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();

copied_layer.as_delta(ctx).await.unwrap();
copied_layer.get_as_delta(ctx).await.unwrap();

assert_keys_and_values_eq(
new_layer.as_delta(ctx).await.unwrap(),
copied_layer.as_delta(ctx).await.unwrap(),
new_layer.get_as_delta(ctx).await.unwrap(),
copied_layer.get_as_delta(ctx).await.unwrap(),
truncate_at,
ctx,
)
Expand All @@ -2073,7 +2073,7 @@ mod test {
source.index_root_blk,
&source_reader,
);
let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
let source_stream = source_stream.filter(|res| match res {
Ok((_, lsn, _)) => ready(lsn < &truncated_at),
_ => ready(true),
Expand All @@ -2086,7 +2086,7 @@ mod test {
truncated.index_root_blk,
&truncated_reader,
);
let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
let mut truncated_stream = std::pin::pin!(truncated_stream);

let mut scratch_left = Vec::new();
Expand Down
Loading

1 comment on commit 76864e6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2998 tests run: 2872 passed, 0 failed, 126 skipped (full report)


Flaky tests (2)

Postgres 16

  • test_secondary_background_downloads: release

Postgres 14

  • test_location_conf_churn[3]: debug

Code coverage* (full report)

  • functions: 32.8% (6895 of 21016 functions)
  • lines: 50.3% (53883 of 107146 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
76864e6 at 2024-06-25T22:22:13.952Z :recycle:

Please sign in to comment.