diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 6b8cd77d78b5..2bedbf7f6134 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -6,6 +6,7 @@ use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; use crate::virtual_file::{self, VirtualFile}; +use bytes::BytesMut; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; use std::cmp::min; @@ -26,7 +27,10 @@ pub struct EphemeralFile { /// An ephemeral file is append-only. /// We keep the last page, which can still be modified, in [`Self::mutable_tail`]. /// The other pages, which can no longer be modified, are accessed through the page cache. - mutable_tail: [u8; PAGE_SZ], + /// + /// None <=> IO is ongoing. + /// Size is fixed to PAGE_SZ at creation time and must not be changed. + mutable_tail: Option, } impl EphemeralFile { @@ -60,7 +64,7 @@ impl EphemeralFile { _timeline_id: timeline_id, file, len: 0, - mutable_tail: [0u8; PAGE_SZ], + mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)), }) } @@ -103,7 +107,13 @@ impl EphemeralFile { }; } else { debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64); - Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) + Ok(BlockLease::EphemeralFileMutableTail( + self.mutable_tail + .as_deref() + .expect("we're not doing IO, it must be Some()") + .try_into() + .expect("we ensure that it's always PAGE_SZ"), + )) } } @@ -135,21 +145,27 @@ impl EphemeralFile { ) -> Result<(), io::Error> { let mut src_remaining = src; while !src_remaining.is_empty() { - let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..]; + let dst_remaining = &mut self + .ephemeral_file + .mutable_tail + .as_deref_mut() + .expect("IO is not yet ongoing")[self.off..]; let n = min(dst_remaining.len(), src_remaining.len()); dst_remaining[..n].copy_from_slice(&src_remaining[..n]); self.off += n; src_remaining = &src_remaining[n..]; if self.off == PAGE_SZ { - match self + let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail) + .expect("IO is not yet ongoing"); + let (mutable_tail, res) = self .ephemeral_file .file - .write_all_at( - &self.ephemeral_file.mutable_tail, - self.blknum as u64 * PAGE_SZ as u64, - ) - .await - { + .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64) + .await; + // TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail. + // I.e., the IO isn't retryable if we panic. + self.ephemeral_file.mutable_tail = Some(mutable_tail); + match res { Ok(_) => { // Pre-warm the page cache with what we just wrote. // This isn't necessary for coherency/correctness, but it's how we've always done it. @@ -169,7 +185,12 @@ impl EphemeralFile { Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); - buf.copy_from_slice(&self.ephemeral_file.mutable_tail); + buf.copy_from_slice( + self.ephemeral_file + .mutable_tail + .as_deref() + .expect("IO is not ongoing"), + ); let _ = write_guard.mark_valid(); // pre-warm successful } @@ -181,7 +202,11 @@ impl EphemeralFile { // Zero the buffer for re-use. // Zeroing is critical for correcntess because the write_blob code below // and similarly read_blk expect zeroed pages. - self.ephemeral_file.mutable_tail.fill(0); + self.ephemeral_file + .mutable_tail + .as_deref_mut() + .expect("IO is not ongoing") + .fill(0); // This block is done, move to next one. self.blknum += 1; self.off = 0; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 6cff748d42c0..45c3e19cfcde 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -582,24 +582,37 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 - pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { + pub async fn write_all_at( + &self, + buf: B, + mut offset: u64, + ) -> (B::Buf, Result<(), Error>) { + let buf_len = buf.bytes_init(); + if buf_len == 0 { + return (Slice::into_inner(buf.slice_full()), Ok(())); + } + let mut buf = buf.slice(0..buf_len); while !buf.is_empty() { - match self.write_at(buf, offset).await { + // TODO: push `buf` further down + match self.write_at(&buf, offset).await { Ok(0) => { - return Err(Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )); + return ( + Slice::into_inner(buf), + Err(Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + ); } Ok(n) => { - buf = &buf[n..]; + buf = buf.slice(n..); offset += n as u64; } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), + Err(e) => return (Slice::into_inner(buf), Err(e)), } } - Ok(()) + (Slice::into_inner(buf), Ok(())) } /// Writes `buf.slice(0..buf.bytes_init())`. @@ -1064,10 +1077,19 @@ mod tests { MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), } } - async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> { + async fn write_all_at(&self, buf: B, offset: u64) -> Result<(), Error> { match self { - MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await, - MaybeVirtualFile::File(file) => file.write_all_at(buf, offset), + MaybeVirtualFile::VirtualFile(file) => { + let (_buf, res) = file.write_all_at(buf, offset).await; + res + } + MaybeVirtualFile::File(file) => { + let buf_len = buf.bytes_init(); + if buf_len == 0 { + return Ok(()); + } + file.write_all_at(&buf.slice(0..buf_len), offset) + } } } async fn seek(&mut self, pos: SeekFrom) -> Result { @@ -1214,8 +1236,8 @@ mod tests { .to_owned(), ) .await?; - file_b.write_all_at(b"BAR", 3).await?; - file_b.write_all_at(b"FOO", 0).await?; + file_b.write_all_at(b"BAR".to_vec(), 3).await?; + file_b.write_all_at(b"FOO".to_vec(), 0).await?; assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");