Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blob_io): use owned buffers #6660

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 86 additions & 35 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use bytes::{BufMut, BytesMut};
use tokio_epoll_uring::{BoundedBuf, Slice};

use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
Expand Down Expand Up @@ -100,6 +103,8 @@ pub struct BlobWriter<const BUFFERED: bool> {
offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>,
/// We do tiny writes for the length headers; they need to be in an owned buffer;
io_buf: Option<BytesMut>,
}

impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
Expand All @@ -108,6 +113,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
inner,
offset: start_offset,
buf: Vec::with_capacity(Self::CAPACITY),
io_buf: Some(BytesMut::new()),
}
}

Expand All @@ -117,14 +123,28 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {

const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };

#[inline(always)]
/// Writes the given buffer directly to the underlying `VirtualFile`.
/// You need to make sure that the internal buffer is empty, otherwise
/// data will be written in wrong order.
async fn write_all_unbuffered(&mut self, src_buf: &[u8]) -> Result<(), Error> {
self.inner.write_all(src_buf).await?;
self.offset += src_buf.len() as u64;
Ok(())
#[inline(always)]
async fn write_all_unbuffered<B: BoundedBuf>(
koivunej marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
src_buf: B,
) -> (B::Buf, Result<(), Error>) {
let src_buf_len = src_buf.bytes_init();
let (src_buf, res) = if src_buf_len > 0 {
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
let src_buf = src_buf.slice(0..src_buf_len);
let res = self.inner.write_all(&src_buf).await;
let src_buf = Slice::into_inner(src_buf);
(src_buf, res)
} else {
let res = self.inner.write_all(&[]).await;
(Slice::into_inner(src_buf.slice_full()), res)
koivunej marked this conversation as resolved.
Show resolved Hide resolved
};
if let Ok(()) = &res {
self.offset += src_buf_len as u64;
}
(src_buf, res)
}

#[inline(always)]
Expand All @@ -146,62 +166,91 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}

/// Internal, possibly buffered, write function
async fn write_all(&mut self, mut src_buf: &[u8]) -> Result<(), Error> {
async fn write_all<B: BoundedBuf>(&mut self, src_buf: B) -> (B::Buf, Result<(), Error>) {
if !BUFFERED {
assert!(self.buf.is_empty());
self.write_all_unbuffered(src_buf).await?;
return Ok(());
return self.write_all_unbuffered(src_buf).await;
}
let remaining = Self::CAPACITY - self.buf.len();
let src_buf_len = src_buf.bytes_init();
if src_buf_len == 0 {
return (Slice::into_inner(src_buf.slice_full()), Ok(()));
}
let mut src_buf = src_buf.slice(0..src_buf_len);
// First try to copy as much as we can into the buffer
if remaining > 0 {
let copied = self.write_into_buffer(src_buf);
src_buf = &src_buf[copied..];
let copied = self.write_into_buffer(&src_buf);
src_buf = src_buf.slice(copied..);
}
// Then, if the buffer is full, flush it out
if self.buf.len() == Self::CAPACITY {
self.flush_buffer().await?;
if let Err(e) = self.flush_buffer().await {
return (Slice::into_inner(src_buf), Err(e));
}
}
// Finally, write the tail of src_buf:
// If it wholly fits into the buffer without
// completely filling it, then put it there.
// If not, write it out directly.
if !src_buf.is_empty() {
let src_buf = if !src_buf.is_empty() {
assert_eq!(self.buf.len(), 0);
if src_buf.len() < Self::CAPACITY {
let copied = self.write_into_buffer(src_buf);
let copied = self.write_into_buffer(&src_buf);
// We just verified above that src_buf fits into our internal buffer.
assert_eq!(copied, src_buf.len());
Slice::into_inner(src_buf)
} else {
self.write_all_unbuffered(src_buf).await?;
let (src_buf, res) = self.write_all_unbuffered(src_buf).await;
if let Err(e) = res {
return (src_buf, Err(e));
}
src_buf
}
}
Ok(())
} else {
Slice::into_inner(src_buf)
};
(src_buf, Ok(()))
}

/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
pub async fn write_blob<B: BoundedBuf>(&mut self, srcbuf: B) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;

if srcbuf.len() < 128 {
// Short blob. Write a 1-byte length header
let len_buf = srcbuf.len() as u8;
self.write_all(&[len_buf]).await?;
} else {
// Write a 4-byte length header
if srcbuf.len() > 0x7fff_ffff {
return Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", srcbuf.len()),
));
let len = srcbuf.bytes_init();

let mut io_buf = self.io_buf.take().expect("we always put it back below");
io_buf.clear();
let (io_buf, hdr_res) = async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not 100% clear to me why there is an async block here. It would be useful to have a comment that explains it also for other future readers. I suppose some ownership/borrow checker issue?

if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf).await
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
return (
io_buf,
Err(Error::new(
ErrorKind::Other,
format!("blob too large ({} bytes)", len),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
format!("blob too large ({} bytes)", len),
format!("blob too large ({len} bytes)"),

)),
);
}
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf).await
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the write call can be moved out of the if as both branches have it.

}
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.write_all(&len_buf).await?;
}
self.write_all(srcbuf).await?;
Ok(offset)
.await;
self.io_buf = Some(io_buf);
match hdr_res {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
}
let (srcbuf, res) = self.write_all(srcbuf).await;
(srcbuf, res.map(|_| offset))
}
}

Expand Down Expand Up @@ -248,12 +297,14 @@ mod tests {
let file = VirtualFile::create(pathbuf.as_path()).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let offs = wtr.write_blob(blob).await?;
let (_, res) = wtr.write_blob(blob.clone()).await;
let offs = res?;
offsets.push(offs);
}
// Write out one page worth of zeros so that we can
// read again with read_blk
let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?;
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ]).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
wtr.flush_buffer().await?;
}
Expand Down
26 changes: 15 additions & 11 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,27 +416,31 @@ impl DeltaLayerWriterInner {
/// The values must be appended in key, lsn order.
///
async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
.await
let (_, res) = self
.put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
.await;
res
}

async fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
val: &[u8],
val: Vec<u8>,
will_init: bool,
) -> anyhow::Result<()> {
) -> (Vec<u8>, anyhow::Result<()>) {
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
assert!(self.lsn_range.start <= lsn);

let off = self.blob_writer.write_blob(val).await?;
let (val, res) = self.blob_writer.write_blob(val).await;
let off = match res {
Ok(off) => off,
Err(e) => return (val, Err(anyhow::anyhow!(e))),
};

let blob_ref = BlobRef::new(off, will_init);

let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;

Ok(())
let res = self.tree.append(&delta_key.0, blob_ref.0);
(val, res.map_err(|e| anyhow::anyhow!(e)))
}

fn size(&self) -> u64 {
Expand Down Expand Up @@ -587,9 +591,9 @@ impl DeltaLayerWriter {
&mut self,
key: Key,
lsn: Lsn,
val: &[u8],
val: Vec<u8>,
will_init: bool,
) -> anyhow::Result<()> {
) -> (Vec<u8>, anyhow::Result<()>) {
self.inner
.as_mut()
.unwrap()
Expand Down
8 changes: 5 additions & 3 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,11 @@ impl ImageLayerWriterInner {
///
/// The page versions must be appended in blknum order.
///
async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img).await?;
let (_img, res) = self.blob_writer.write_blob(img).await;
// TODO: re-use the buffer for `img` further upstack
let off = res?;

let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
Expand Down Expand Up @@ -659,7 +661,7 @@ impl ImageLayerWriter {
///
/// The page versions must be appended in blknum order.
///
pub async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
pub async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img).await
}

Expand Down
8 changes: 5 additions & 3 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,11 @@ impl InMemoryLayer {
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer
.put_value_bytes(key, *lsn, &buf, will_init)
.await?;
let res;
(buf, res) = delta_layer_writer
.put_value_bytes(key, *lsn, buf, will_init)
.await;
res?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3328,7 +3328,7 @@ impl Timeline {
}
};

image_layer_writer.put_image(img_key, &img).await?;
image_layer_writer.put_image(img_key, img).await?;
}
}

Expand Down
Loading