Skip to content

Commit

Permalink
ephemeral file: refactor write_blob impl to concentrate mutable state
Browse files Browse the repository at this point in the history
Before this patch, we had the `off` and `blknum` as function-wide
mutable state. Now it's contained in the `Writer` struct.

The use of `push_bytes` instead of index-based filling of the buffer
also makes it easier to reason about what's going on.

This is prep for #4994
  • Loading branch information
problame committed Aug 16, 2023
1 parent 0f47bc0 commit e91a03f
Showing 1 changed file with 38 additions and 34 deletions.
72 changes: 38 additions & 34 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,50 +198,54 @@ impl FileExt for EphemeralFile {

impl BlobWriter for EphemeralFile {
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
let pos = self.size;

let mut blknum = (self.size / PAGE_SZ as u64) as u32;
let mut off = (pos % PAGE_SZ as u64) as usize;
struct Writer<'a> {
ephemeral_file: &'a mut EphemeralFile,
blknum: u32,
off: usize,
}
impl<'a> Writer<'a> {
fn new(ephemeral_file: &'a mut EphemeralFile) -> Writer<'a> {
Writer {
blknum: (ephemeral_file.size / PAGE_SZ as u64) as u32,
off: (ephemeral_file.size % PAGE_SZ as u64) as usize,
ephemeral_file,
}
}
fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
let mut src_remaining = src;
while !src_remaining.is_empty() {
{
let mut head_page = self.ephemeral_file.get_buf_for_write(self.blknum)?;
let dst_remaining = &mut head_page[self.off..];
let n = min(dst_remaining.len(), src_remaining.len());
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
self.off += n;
src_remaining = &src_remaining[n..];
}
if self.off == PAGE_SZ {
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;
}
}
Ok(())
}
}

let mut buf = self.get_buf_for_write(blknum)?;
let pos = self.size;
let mut writer = Writer::new(self);

// Write the length field
if srcbuf.len() < 0x80 {
buf[off] = srcbuf.len() as u8;
off += 1;
writer.push_bytes(&[srcbuf.len() as u8])?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
let thislen = PAGE_SZ - off;
if thislen < 4 {
// it needs to be split across pages
buf[off..(off + thislen)].copy_from_slice(&len_buf[..thislen]);
blknum += 1;
buf = self.get_buf_for_write(blknum)?;
buf[0..4 - thislen].copy_from_slice(&len_buf[thislen..]);
off = 4 - thislen;
} else {
buf[off..off + 4].copy_from_slice(&len_buf);
off += 4;
}
writer.push_bytes(&len_buf)?;
}

// Write the payload
let mut buf_remain = srcbuf;
while !buf_remain.is_empty() {
let mut page_remain = PAGE_SZ - off;
if page_remain == 0 {
blknum += 1;
buf = self.get_buf_for_write(blknum)?;
off = 0;
page_remain = PAGE_SZ;
}
let this_blk_len = min(page_remain, buf_remain.len());
buf[off..(off + this_blk_len)].copy_from_slice(&buf_remain[..this_blk_len]);
off += this_blk_len;
buf_remain = &buf_remain[this_blk_len..];
}
drop(buf);
writer.push_bytes(srcbuf)?;

if srcbuf.len() < 0x80 {
self.size += 1;
Expand Down

0 comments on commit e91a03f

Please sign in to comment.