Skip to content

Commit

Permalink
ephemeral file: refactor write_blob impl to concentrate mutable state (
Browse files Browse the repository at this point in the history
…#5004)

Before this patch, we had the `off` and `blknum` as function-wide
mutable state. Now it's contained in the `Writer` struct.

The use of `push_bytes` instead of index-based filling of the buffer
also makes it easier to reason about what's going on.

This is prep for #4994
  • Loading branch information
problame authored Aug 17, 2023
1 parent 786c7b3 commit 957af04
Showing 1 changed file with 65 additions and 35 deletions.
100 changes: 65 additions & 35 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ impl EphemeralFile {
Ok(())
}

fn get_buf_for_write(&self, blkno: u32) -> Result<page_cache::PageWriteGuard, io::Error> {
fn get_buf_for_write(
&self,
blkno: u32,
) -> Result<page_cache::PageWriteGuard<'static>, io::Error> {
// Look up the right page
let cache = page_cache::get();
let mut write_guard = match cache
Expand Down Expand Up @@ -137,50 +140,77 @@ pub fn is_ephemeral_file(filename: &str) -> bool {

impl BlobWriter for EphemeralFile {
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
let pos = self.size;

let mut blknum = (self.size / PAGE_SZ as u64) as u32;
let mut off = (pos % PAGE_SZ as u64) as usize;
struct Writer<'a> {
ephemeral_file: &'a mut EphemeralFile,
/// The block to which the next [`push_bytes`] will write.
blknum: u32,
/// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
off: usize,
/// Used by [`push_bytes`] to memoize the page cache write guard across calls to it.
memo_page_guard: MemoizedPageWriteGuard,
}
struct MemoizedPageWriteGuard {
guard: page_cache::PageWriteGuard<'static>,
/// The block number of the page in `guard`.
blknum: u32,
}
impl<'a> Writer<'a> {
fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
let blknum = (ephemeral_file.size / PAGE_SZ as u64) as u32;
Ok(Writer {
blknum,
off: (ephemeral_file.size % PAGE_SZ as u64) as usize,
memo_page_guard: MemoizedPageWriteGuard {
guard: ephemeral_file.get_buf_for_write(blknum)?,
blknum,
},
ephemeral_file,
})
}
#[inline(always)]
fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
// `src_remaining` is the remaining bytes to be written
let mut src_remaining = src;
while !src_remaining.is_empty() {
let page = if self.memo_page_guard.blknum == self.blknum {
&mut self.memo_page_guard.guard
} else {
self.memo_page_guard.guard =
self.ephemeral_file.get_buf_for_write(self.blknum)?;
self.memo_page_guard.blknum = self.blknum;
&mut self.memo_page_guard.guard
};
let dst_remaining = &mut page[self.off..];
let n = min(dst_remaining.len(), src_remaining.len());
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;
}
}
Ok(())
}
}

let mut buf = self.get_buf_for_write(blknum)?;
let pos = self.size;
let mut writer = Writer::new(self)?;

// Write the length field
if srcbuf.len() < 0x80 {
buf[off] = srcbuf.len() as u8;
off += 1;
// short one-byte length header
let len_buf = [srcbuf.len() as u8];
writer.push_bytes(&len_buf)?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
let thislen = PAGE_SZ - off;
if thislen < 4 {
// it needs to be split across pages
buf[off..(off + thislen)].copy_from_slice(&len_buf[..thislen]);
blknum += 1;
buf = self.get_buf_for_write(blknum)?;
buf[0..4 - thislen].copy_from_slice(&len_buf[thislen..]);
off = 4 - thislen;
} else {
buf[off..off + 4].copy_from_slice(&len_buf);
off += 4;
}
writer.push_bytes(&len_buf)?;
}

// Write the payload
let mut buf_remain = srcbuf;
while !buf_remain.is_empty() {
let mut page_remain = PAGE_SZ - off;
if page_remain == 0 {
blknum += 1;
buf = self.get_buf_for_write(blknum)?;
off = 0;
page_remain = PAGE_SZ;
}
let this_blk_len = min(page_remain, buf_remain.len());
buf[off..(off + this_blk_len)].copy_from_slice(&buf_remain[..this_blk_len]);
off += this_blk_len;
buf_remain = &buf_remain[this_blk_len..];
}
drop(buf);
writer.push_bytes(srcbuf)?;

if srcbuf.len() < 0x80 {
self.size += 1;
Expand Down

1 comment on commit 957af04

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1664 tests run: 1580 passed, 2 failed, 82 skipped (full report)


Failures on Posgres 15

  • test_delete_tenant_exercise_crash_safety_failpoints[Check.RETRY_WITHOUT_RESTART-real_s3-tenant-delete-before-shutdown-False]: debug

Failures on Posgres 14

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_layer_map or test_delete_tenant_exercise_crash_safety_failpoints[debug-pg15-Check.RETRY_WITHOUT_RESTART-real_s3-tenant-delete-before-shutdown-False]"
The comment gets automatically updated with the latest test results
957af04 at 2023-08-17T10:43:40.646Z :recycle:

Please sign in to comment.