Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Sep 20, 2023
1 parent 2ae2d21 commit 00688d5
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 87 deletions.
6 changes: 3 additions & 3 deletions pageserver/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl PageCache {
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
crate::metrics::PAGE_CACHE
.for_task_kind(ctx)
.for_task_kind(ctx.task_kind())
.read_accesses_materialized_page
.inc();

Expand All @@ -370,12 +370,12 @@ impl PageCache {
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.for_task_kind(ctx)
.for_task_kind(ctx.task_kind())
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.for_task_kind(ctx)
.for_task_kind(ctx.task_kind())
.read_hits_materialized_page_older_lsn
.inc();
}
Expand Down
17 changes: 10 additions & 7 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile;
Expand All @@ -19,9 +20,9 @@ use std::io::{Error, ErrorKind};

impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
pub async fn read_blob(&self, offset: u64, ctx: &RequestContext) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf).await?;
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)
}
/// Read blob into the given buffer. Any previous contents in the buffer
Expand All @@ -30,11 +31,12 @@ impl<'a> BlockCursor<'a> {
&self,
offset: u64,
dstbuf: &mut Vec<u8>,
ctx: &RequestContext,
) -> Result<(), std::io::Error> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;

let mut buf = self.read_blk(blknum).await?;
let mut buf = self.read_blk(blknum, ctx).await?;

// peek at the first byte, to determine if it's a 1- or 4-byte length
let first_len_byte = buf[off];
Expand All @@ -50,7 +52,7 @@ impl<'a> BlockCursor<'a> {
// it is split across two pages
len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
blknum += 1;
buf = self.read_blk(blknum).await?;
buf = self.read_blk(blknum, ctx).await?;
len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
off = 4 - thislen;
} else {
Expand All @@ -71,7 +73,7 @@ impl<'a> BlockCursor<'a> {
if page_remain == 0 {
// continue on next page
blknum += 1;
buf = self.read_blk(blknum).await?;
buf = self.read_blk(blknum, ctx).await?;
off = 0;
page_remain = PAGE_SZ;
}
Expand Down Expand Up @@ -228,12 +230,13 @@ impl BlobWriter<false> {
#[cfg(test)]
mod tests {
use super::*;
use crate::tenant::block_io::BlockReaderRef;
use crate::{tenant::block_io::BlockReaderRef, task_mgr::TaskKind, context::DownloadBehavior};
use rand::{Rng, SeedableRng};

async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
let temp_dir = tempfile::tempdir()?;
let path = temp_dir.path().join("file");
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);

// Write part (in block to drop the file)
let mut offsets = Vec::new();
Expand All @@ -255,7 +258,7 @@ mod tests {
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let blob_read = rdr.read_blob(*offset).await?;
let blob_read = rdr.read_blob(*offset, &ctx).await?;
assert_eq!(
blob, &blob_read,
"mismatch for idx={idx} at offset={offset}"
Expand Down
12 changes: 6 additions & 6 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ pub(crate) enum BlockReaderRef<'a> {

impl<'a> BlockReaderRef<'a> {
#[inline(always)]
async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
async fn read_blk(&self, blknum: u32, ctx: &RequestContext) -> Result<BlockLease, std::io::Error> {
use BlockReaderRef::*;
match self {
FileBlockReader(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await,
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
EphemeralFile(r) => r.read_blk(blknum, ctx).await,
Adapter(r) => r.read_blk(blknum, ctx).await,
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
Expand Down Expand Up @@ -135,8 +135,8 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum).await
pub async fn read_blk(&self, blknum: u32, ctx: &RequestContext) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum, ctx).await
}
}

Expand Down
Loading

0 comments on commit 00688d5

Please sign in to comment.