diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 1a2d00c7071a6..4c056c96865ef 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -550,34 +550,13 @@ impl VirtualFile { Ok(self.pos) } - // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at<B>(&self, buf: B, mut offset: u64) -> Result<B, Error> + pub async fn read_exact_at<B>(&self, buf: B, offset: u64) -> Result<B, Error> where B: IoBufMut + Send, { - use tokio_epoll_uring::BoundedBuf; - let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); - while buf.bytes_total() != 0 { - let res; - (buf, res) = self.read_at(buf, offset).await; - match res { - Ok(0) => break, - Ok(n) => { - buf = buf.slice(n..); - offset += n as u64; - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - } - if !buf.is_empty() { - Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )) - } else { - Ok(buf.into_inner()) - } + let (buf, res) = + read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await; + res.map(|()| buf) } /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`]. @@ -680,6 +659,188 @@ impl VirtualFile { } } +// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 +pub async fn read_exact_at_impl<B, F, Fut>( + buf: B, + mut offset: u64, + mut read_at: F, +) -> (B, std::io::Result<()>) +where + B: IoBufMut + Send, + F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut, + Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>, +{ + use tokio_epoll_uring::BoundedBuf; + let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory + while buf.bytes_total() != 0 { + let res; + (buf, res) = read_at(buf, offset).await; + match res { + Ok(0) => break, + Ok(n) => { + buf = buf.slice(n..); + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (buf.into_inner(), Err(e)), + } + } + if !buf.is_empty() { + ( + buf.into_inner(), + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )), + ) + } else { + assert_eq!(buf.len(), buf.bytes_total()); + (buf.into_inner(), Ok(())) + } +} + +#[cfg(test)] +mod test_read_exact_at_impl { + + use std::{collections::VecDeque, sync::Arc}; + + use tokio_epoll_uring::{BoundedBuf, BoundedBufMut}; + + use super::read_exact_at_impl; + + struct Expectation { + offset: u64, + bytes_total: usize, + result: std::io::Result<Vec<u8>>, + } + struct MockReadAt { + expectations: VecDeque<Expectation>, + } + + impl MockReadAt { + async fn read_at( + &mut self, + mut buf: tokio_epoll_uring::Slice<Vec<u8>>, + offset: u64, + ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) { + let exp = self + .expectations + .pop_front() + .expect("read_at called but we have no expectations left"); + assert_eq!(exp.offset, offset); + assert_eq!(exp.bytes_total, buf.bytes_total()); + match exp.result { + Ok(bytes) => { + assert!(bytes.len() <= buf.bytes_total()); + buf.put_slice(&bytes); + (buf, Ok(bytes.len())) + } + Err(e) => (buf, Err(e)), + } + } + } + + impl Drop for MockReadAt { + fn drop(&mut self) { + assert_eq!(self.expectations.len(), 0); + } + } + + #[tokio::test] + async fn test_basic() { + let buf = Vec::with_capacity(5); + let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { + expectations: VecDeque::from(vec![Expectation { + offset: 0, + bytes_total: 5, + result: Ok(vec![b'a', b'b', b'c', b'd', b'e']), + }]), + })); + let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { + let mock_read_at = Arc::clone(&mock_read_at); + async move { mock_read_at.lock().await.read_at(buf, offset).await } + }) + .await; + assert!(res.is_ok()); + assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']); + } + + #[tokio::test] + async fn test_empty_buf_issues_no_syscall() { + let buf = Vec::new(); + let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { + expectations: VecDeque::new(), + })); + let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { + let mock_read_at = Arc::clone(&mock_read_at); + async move { mock_read_at.lock().await.read_at(buf, offset).await } + }) + .await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_two_read_at_calls_needed_until_buf_filled() { + let buf = Vec::with_capacity(4); + let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { + expectations: VecDeque::from(vec![ + Expectation { + offset: 0, + bytes_total: 4, + result: Ok(vec![b'a', b'b']), + }, + Expectation { + offset: 2, + bytes_total: 2, + result: Ok(vec![b'c', b'd']), + }, + ]), + })); + let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { + let mock_read_at = Arc::clone(&mock_read_at); + async move { mock_read_at.lock().await.read_at(buf, offset).await } + }) + .await; + assert!(res.is_ok()); + assert_eq!(buf, vec![b'a', b'b', b'c', b'd']); + } + + #[tokio::test] + async fn test_eof_before_buffer_full() { + let buf = Vec::with_capacity(3); + let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { + expectations: VecDeque::from(vec![ + Expectation { + offset: 0, + bytes_total: 3, + result: Ok(vec![b'a']), + }, + Expectation { + offset: 1, + bytes_total: 2, + result: Ok(vec![b'b']), + }, + Expectation { + offset: 2, + bytes_total: 1, + result: Ok(vec![]), + }, + ]), + })); + let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { + let mock_read_at = Arc::clone(&mock_read_at); + async move { mock_read_at.lock().await.read_at(buf, offset).await } + }) + .await; + let Err(err) = res else { + panic!("should return an error"); + }; + assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof); + assert_eq!(format!("{err}"), "failed to fill whole buffer"); + // buffer contents on error are unspecified + } +} + struct FileGuard { slot_guard: RwLockReadGuard<'static, SlotInner>, }