diff --git a/Cargo.lock b/Cargo.lock index 83afdaf66ff0..520163e41bc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5740,7 +5740,7 @@ dependencies = [ [[package]] name = "tokio-epoll-uring" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#d6a1c93442fb6b3a5bec490204961134e54925dc" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#868d2c42b5d54ca82fead6e8f2f233b69a540d3e" dependencies = [ "futures", "nix 0.26.4", @@ -6265,8 +6265,9 @@ dependencies = [ [[package]] name = "uring-common" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#d6a1c93442fb6b3a5bec490204961134e54925dc" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#868d2c42b5d54ca82fead6e8f2f233b69a540d3e" dependencies = [ + "bytes", "io-uring", "libc", ] diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 6de2e950558f..e2ff12665a3b 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -11,6 +11,9 @@ //! len < 128: 0XXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! +use bytes::{BufMut, BytesMut}; +use tokio_epoll_uring::{BoundedBuf, Slice}; + use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; @@ -100,6 +103,8 @@ pub struct BlobWriter { offset: u64, /// A buffer to save on write calls, only used if BUFFERED=true buf: Vec, + /// We do tiny writes for the length headers; they need to be in an owned buffer; + io_buf: Option, } impl BlobWriter { @@ -108,6 +113,7 @@ impl BlobWriter { inner, offset: start_offset, buf: Vec::with_capacity(Self::CAPACITY), + io_buf: Some(BytesMut::new()), } } @@ -117,14 +123,28 @@ impl BlobWriter { const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 }; - #[inline(always)] /// Writes the given buffer directly to the underlying `VirtualFile`. /// You need to make sure that the internal buffer is empty, otherwise /// data will be written in wrong order. - async fn write_all_unbuffered(&mut self, src_buf: &[u8]) -> Result<(), Error> { - self.inner.write_all(src_buf).await?; - self.offset += src_buf.len() as u64; - Ok(()) + #[inline(always)] + async fn write_all_unbuffered( + &mut self, + src_buf: B, + ) -> (B::Buf, Result<(), Error>) { + let src_buf_len = src_buf.bytes_init(); + let (src_buf, res) = if src_buf_len > 0 { + let src_buf = src_buf.slice(0..src_buf_len); + let res = self.inner.write_all(&src_buf).await; + let src_buf = Slice::into_inner(src_buf); + (src_buf, res) + } else { + let res = self.inner.write_all(&[]).await; + (Slice::into_inner(src_buf.slice_full()), res) + }; + if let Ok(()) = &res { + self.offset += src_buf_len as u64; + } + (src_buf, res) } #[inline(always)] @@ -146,62 +166,91 @@ impl BlobWriter { } /// Internal, possibly buffered, write function - async fn write_all(&mut self, mut src_buf: &[u8]) -> Result<(), Error> { + async fn write_all(&mut self, src_buf: B) -> (B::Buf, Result<(), Error>) { if !BUFFERED { assert!(self.buf.is_empty()); - self.write_all_unbuffered(src_buf).await?; - return Ok(()); + return self.write_all_unbuffered(src_buf).await; } let remaining = Self::CAPACITY - self.buf.len(); + let src_buf_len = src_buf.bytes_init(); + if src_buf_len == 0 { + return (Slice::into_inner(src_buf.slice_full()), Ok(())); + } + let mut src_buf = src_buf.slice(0..src_buf_len); // First try to copy as much as we can into the buffer if remaining > 0 { - let copied = self.write_into_buffer(src_buf); - src_buf = &src_buf[copied..]; + let copied = self.write_into_buffer(&src_buf); + src_buf = src_buf.slice(copied..); } // Then, if the buffer is full, flush it out if self.buf.len() == Self::CAPACITY { - self.flush_buffer().await?; + if let Err(e) = self.flush_buffer().await { + return (Slice::into_inner(src_buf), Err(e)); + } } // Finally, write the tail of src_buf: // If it wholly fits into the buffer without // completely filling it, then put it there. // If not, write it out directly. - if !src_buf.is_empty() { + let src_buf = if !src_buf.is_empty() { assert_eq!(self.buf.len(), 0); if src_buf.len() < Self::CAPACITY { - let copied = self.write_into_buffer(src_buf); + let copied = self.write_into_buffer(&src_buf); // We just verified above that src_buf fits into our internal buffer. assert_eq!(copied, src_buf.len()); + Slice::into_inner(src_buf) } else { - self.write_all_unbuffered(src_buf).await?; + let (src_buf, res) = self.write_all_unbuffered(src_buf).await; + if let Err(e) = res { + return (src_buf, Err(e)); + } + src_buf } - } - Ok(()) + } else { + Slice::into_inner(src_buf) + }; + (src_buf, Ok(())) } /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result { + pub async fn write_blob(&mut self, srcbuf: B) -> (B::Buf, Result) { let offset = self.offset; - if srcbuf.len() < 128 { - // Short blob. Write a 1-byte length header - let len_buf = srcbuf.len() as u8; - self.write_all(&[len_buf]).await?; - } else { - // Write a 4-byte length header - if srcbuf.len() > 0x7fff_ffff { - return Err(Error::new( - ErrorKind::Other, - format!("blob too large ({} bytes)", srcbuf.len()), - )); + let len = srcbuf.bytes_init(); + + let mut io_buf = self.io_buf.take().expect("we always put it back below"); + io_buf.clear(); + let (io_buf, hdr_res) = async { + if len < 128 { + // Short blob. Write a 1-byte length header + io_buf.put_u8(len as u8); + self.write_all(io_buf).await + } else { + // Write a 4-byte length header + if len > 0x7fff_ffff { + return ( + io_buf, + Err(Error::new( + ErrorKind::Other, + format!("blob too large ({} bytes)", len), + )), + ); + } + let mut len_buf = (len as u32).to_be_bytes(); + len_buf[0] |= 0x80; + io_buf.extend_from_slice(&len_buf[..]); + self.write_all(io_buf).await } - let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes(); - len_buf[0] |= 0x80; - self.write_all(&len_buf).await?; } - self.write_all(srcbuf).await?; - Ok(offset) + .await; + self.io_buf = Some(io_buf); + match hdr_res { + Ok(_) => (), + Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), + } + let (srcbuf, res) = self.write_all(srcbuf).await; + (srcbuf, res.map(|_| offset)) } } @@ -248,12 +297,14 @@ mod tests { let file = VirtualFile::create(pathbuf.as_path()).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { - let offs = wtr.write_blob(blob).await?; + let (_, res) = wtr.write_blob(blob.clone()).await; + let offs = res?; offsets.push(offs); } // Write out one page worth of zeros so that we can // read again with read_blk - let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?; + let (_, res) = wtr.write_blob(vec![0; PAGE_SZ]).await; + let offs = res?; println!("Writing final blob at offs={offs}"); wtr.flush_buffer().await?; } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 2a51884c0b71..7a5dc7a59fa4 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -416,27 +416,31 @@ impl DeltaLayerWriterInner { /// The values must be appended in key, lsn order. /// async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { - self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init()) - .await + let (_, res) = self + .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init()) + .await; + res } async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, - val: &[u8], + val: Vec, will_init: bool, - ) -> anyhow::Result<()> { + ) -> (Vec, anyhow::Result<()>) { assert!(self.lsn_range.start <= lsn); - - let off = self.blob_writer.write_blob(val).await?; + let (val, res) = self.blob_writer.write_blob(val).await; + let off = match res { + Ok(off) => off, + Err(e) => return (val, Err(anyhow::anyhow!(e))), + }; let blob_ref = BlobRef::new(off, will_init); let delta_key = DeltaKey::from_key_lsn(&key, lsn); - self.tree.append(&delta_key.0, blob_ref.0)?; - - Ok(()) + let res = self.tree.append(&delta_key.0, blob_ref.0); + (val, res.map_err(|e| anyhow::anyhow!(e))) } fn size(&self) -> u64 { @@ -587,9 +591,9 @@ impl DeltaLayerWriter { &mut self, key: Key, lsn: Lsn, - val: &[u8], + val: Vec, will_init: bool, - ) -> anyhow::Result<()> { + ) -> (Vec, anyhow::Result<()>) { self.inner .as_mut() .unwrap() diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index c62e6aed51b3..1ad195032da1 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -528,9 +528,11 @@ impl ImageLayerWriterInner { /// /// The page versions must be appended in blknum order. /// - async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { + async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let off = self.blob_writer.write_blob(img).await?; + let (_img, res) = self.blob_writer.write_blob(img).await; + // TODO: re-use the buffer for `img` further upstack + let off = res?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); @@ -659,7 +661,7 @@ impl ImageLayerWriter { /// /// The page versions must be appended in blknum order. /// - pub async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { + pub async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { self.inner.as_mut().unwrap().put_image(key, img).await } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 7c9103eea876..c597b1553345 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -383,9 +383,11 @@ impl InMemoryLayer { for (lsn, pos) in vec_map.as_slice() { cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; let will_init = Value::des(&buf)?.will_init(); - delta_layer_writer - .put_value_bytes(key, *lsn, &buf, will_init) - .await?; + let res; + (buf, res) = delta_layer_writer + .put_value_bytes(key, *lsn, buf, will_init) + .await; + res?; } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f96679ca6928..74676277d515 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3328,7 +3328,7 @@ impl Timeline { } }; - image_layer_writer.put_image(img_key, &img).await?; + image_layer_writer.put_image(img_key, img).await?; } }