From 128a85ba5eea2b63325d2f2823bfb6b623a32df1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 4 Sep 2023 17:05:20 +0200 Subject: [PATCH] Convert many VirtualFile APIs to async (#5190) ## Problem `VirtualFile` does both reading and writing, and it would be nice if both could be converted to async, so that it doesn't have to support an async read path and a blocking write path (especially for the locks this is annoying as none of the lock implementations in std, tokio or parking_lot have support for both async and blocking access). ## Summary of changes This PR is some initial work on making the `VirtualFile` APIs async. It can be reviewed commit-by-commit. * Introduce the `MaybeVirtualFile` enum to be generic in a test that compares real files with virtual files. * Make various APIs of `VirtualFile` async, including `write_all_at`, `read_at`, `read_exact_at`. Part of #4743 , successor of #5180. Co-authored-by: Christian Schwarz --- pageserver/src/tenant/blob_io.rs | 10 +- pageserver/src/tenant/block_io.rs | 9 +- pageserver/src/tenant/ephemeral_file.rs | 17 +- .../src/tenant/storage_layer/delta_layer.rs | 2 +- .../src/tenant/storage_layer/image_layer.rs | 2 +- pageserver/src/virtual_file.rs | 309 +++++++++++------- 6 files changed, 212 insertions(+), 137 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index f5ff15b50c22..e4dede2c3099 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -96,18 +96,12 @@ pub trait BlobWriter { /// An implementation of BlobWriter to write blobs to anything that /// implements std::io::Write. /// -pub struct WriteBlobWriter -where - W: std::io::Write, -{ +pub struct WriteBlobWriter { inner: W, offset: u64, } -impl WriteBlobWriter -where - W: std::io::Write, -{ +impl WriteBlobWriter { pub fn new(inner: W, start_offset: u64) -> Self { WriteBlobWriter { inner, diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 93b79d211dfc..645ec81036b2 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -8,7 +8,6 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::{Deref, DerefMut}; -use std::os::unix::fs::FileExt; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -155,9 +154,11 @@ impl FileBlockReader { } /// Read a page from the underlying file into given buffer. - fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { + async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { assert!(buf.len() == PAGE_SZ); - self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + self.file + .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + .await } /// Read a block. /// @@ -179,7 +180,7 @@ impl FileBlockReader { ReadBufResult::Found(guard) => break Ok(guard.into()), ReadBufResult::NotFound(mut write_guard) => { // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum)?; + self.fill_buffer(write_guard.deref_mut(), blknum).await?; write_guard.mark_valid(); // Swap for read lock diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 31db3869d978..4c5fe424f3ca 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -9,7 +9,6 @@ use std::cmp::min; use std::fs::OpenOptions; use std::io::{self, ErrorKind}; use std::ops::DerefMut; -use std::os::unix::prelude::FileExt; use std::path::PathBuf; use std::sync::atomic::AtomicU64; use tracing::*; @@ -88,7 +87,8 @@ impl EphemeralFile { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?; + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + .await?; write_guard.mark_valid(); // Swap for read lock @@ -128,10 +128,15 @@ impl EphemeralFile { self.off += n; src_remaining = &src_remaining[n..]; if self.off == PAGE_SZ { - match self.ephemeral_file.file.write_all_at( - &self.ephemeral_file.mutable_tail, - self.blknum as u64 * PAGE_SZ as u64, - ) { + match self + .ephemeral_file + .file + .write_all_at( + &self.ephemeral_file.mutable_tail, + self.blknum as u64 * PAGE_SZ as u64, + ) + .await + { Ok(_) => { // Pre-warm the page cache with what we just wrote. // This isn't necessary for coherency/correctness, but it's how we've always done it. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index f9a8c52c2f0f..60427a22e465 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; +use std::io::SeekFrom; use std::io::{BufWriter, Write}; -use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 32f20e6227f6..f329041fb1c4 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use std::fs::{self, File}; +use std::io::SeekFrom; use std::io::Write; -use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::path::{Path, PathBuf}; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 2553d0e3b6e3..41c5eb96cf41 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -13,7 +13,7 @@ use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; use once_cell::sync::OnceCell; use std::fs::{self, File, OpenOptions}; -use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}; +use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -406,54 +406,8 @@ impl VirtualFile { drop(self); std::fs::remove_file(path).expect("failed to remove the virtual file"); } -} - -impl Drop for VirtualFile { - /// If a VirtualFile is dropped, close the underlying file if it was open. - fn drop(&mut self) { - let handle = self.handle.get_mut().unwrap(); - - // We could check with a read-lock first, to avoid waiting on an - // unrelated I/O. - let slot = &get_open_files().slots[handle.index]; - let mut slot_guard = slot.inner.write().unwrap(); - if slot_guard.tag == handle.tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also operation "close-by-replace" for closes done on eviction for - // comparison. - STORAGE_IO_TIME - .with_label_values(&["close"]) - .observe_closure_duration(|| drop(slot_guard.file.take())); - } - } -} - -impl Read for VirtualFile { - fn read(&mut self, buf: &mut [u8]) -> Result { - let pos = self.pos; - let n = self.read_at(buf, pos)?; - self.pos += n as u64; - Ok(n) - } -} - -impl Write for VirtualFile { - fn write(&mut self, buf: &[u8]) -> Result { - let pos = self.pos; - let n = self.write_at(buf, pos)?; - self.pos += n as u64; - Ok(n) - } - - fn flush(&mut self) -> Result<(), std::io::Error> { - // flush is no-op for File (at least on unix), so we don't need to do - // anything here either. - Ok(()) - } -} -impl Seek for VirtualFile { - fn seek(&mut self, pos: SeekFrom) -> Result { + pub fn seek(&mut self, pos: SeekFrom) -> Result { match pos { SeekFrom::Start(offset) => { self.pos = offset; @@ -477,10 +431,66 @@ impl Seek for VirtualFile { } Ok(self.pos) } -} -impl FileExt for VirtualFile { - fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { + #[cfg(test)] + async fn read_to_end(&mut self, buf: &mut Vec) -> Result<(), Error> { + loop { + let mut tmp = [0; 128]; + match self.read_at(&mut tmp, self.pos).await { + Ok(0) => return Ok(()), + Ok(n) => { + self.pos += n as u64; + buf.extend_from_slice(&tmp[..n]); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + } + + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 + pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { + while !buf.is_empty() { + match self.read_at(buf, offset).await { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } + Ok(n) => { + buf = &mut buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 + pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { + while !buf.is_empty() { + match self.write_at(buf, offset) { + Ok(0) => { + return Err(Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )); + } + Ok(n) => { + buf = &buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { let result = self.with_file("read", |file| file.read_at(buf, offset))?; if let Ok(size) = result { STORAGE_IO_SIZE @@ -490,7 +500,7 @@ impl FileExt for VirtualFile { result } - fn write_at(&self, buf: &[u8], offset: u64) -> Result { + pub fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = self.with_file("write", |file| file.write_at(buf, offset))?; if let Ok(size) = result { STORAGE_IO_SIZE @@ -501,6 +511,41 @@ impl FileExt for VirtualFile { } } +impl Drop for VirtualFile { + /// If a VirtualFile is dropped, close the underlying file if it was open. + fn drop(&mut self) { + let handle = self.handle.get_mut().unwrap(); + + // We could check with a read-lock first, to avoid waiting on an + // unrelated I/O. + let slot = &get_open_files().slots[handle.index]; + let mut slot_guard = slot.inner.write().unwrap(); + if slot_guard.tag == handle.tag { + slot.recently_used.store(false, Ordering::Relaxed); + // there is also operation "close-by-replace" for closes done on eviction for + // comparison. + STORAGE_IO_TIME + .with_label_values(&["close"]) + .observe_closure_duration(|| drop(slot_guard.file.take())); + } + } +} + +impl Write for VirtualFile { + fn write(&mut self, buf: &[u8]) -> Result { + let pos = self.pos; + let n = self.write_at(buf, pos)?; + self.pos += n as u64; + Ok(n) + } + + fn flush(&mut self) -> Result<(), std::io::Error> { + // flush is no-op for File (at least on unix), so we don't need to do + // anything here either. + Ok(()) + } +} + impl OpenFiles { fn new(num_slots: usize) -> OpenFiles { let mut slots = Box::new(Vec::with_capacity(num_slots)); @@ -555,32 +600,66 @@ mod tests { use rand::thread_rng; use rand::Rng; use std::sync::Arc; - use std::thread; - // Helper function to slurp contents of a file, starting at the current position, - // into a string - fn read_string(vfile: &mut FD) -> Result - where - FD: Read, - { - let mut buf = String::new(); - vfile.read_to_string(&mut buf)?; - Ok(buf) + enum MaybeVirtualFile { + VirtualFile(VirtualFile), + File(File), } - // Helper function to slurp a portion of a file into a string - fn read_string_at(vfile: &mut FD, pos: u64, len: usize) -> Result - where - FD: FileExt, - { - let mut buf = Vec::new(); - buf.resize(len, 0); - vfile.read_exact_at(&mut buf, pos)?; - Ok(String::from_utf8(buf).unwrap()) + impl MaybeVirtualFile { + async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { + match self { + MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await, + MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset), + } + } + async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> { + match self { + MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await, + MaybeVirtualFile::File(file) => file.write_all_at(buf, offset), + } + } + fn seek(&mut self, pos: SeekFrom) -> Result { + match self { + MaybeVirtualFile::VirtualFile(file) => file.seek(pos), + MaybeVirtualFile::File(file) => file.seek(pos), + } + } + async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> { + match self { + MaybeVirtualFile::VirtualFile(file) => file.write_all(buf), + MaybeVirtualFile::File(file) => file.write_all(buf), + } + } + + // Helper function to slurp contents of a file, starting at the current position, + // into a string + async fn read_string(&mut self) -> Result { + use std::io::Read; + let mut buf = String::new(); + match self { + MaybeVirtualFile::VirtualFile(file) => { + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await?; + return Ok(String::from_utf8(buf).unwrap()); + } + MaybeVirtualFile::File(file) => { + file.read_to_string(&mut buf)?; + } + } + Ok(buf) + } + + // Helper function to slurp a portion of a file into a string + async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { + let mut buf = vec![0; len]; + self.read_exact_at(&mut buf, pos).await?; + Ok(String::from_utf8(buf).unwrap()) + } } - #[test] - fn test_virtual_files() -> Result<(), Error> { + #[tokio::test] + async fn test_virtual_files() -> Result<(), Error> { // The real work is done in the test_files() helper function. This // allows us to run the same set of tests against a native File, and // VirtualFile. We trust the native Files and wouldn't need to test them, @@ -589,21 +668,23 @@ mod tests { // native files, you will run out of file descriptors if the ulimit // is low enough.) test_files("virtual_files", |path, open_options| { - VirtualFile::open_with_options(path, open_options) + let vf = VirtualFile::open_with_options(path, open_options)?; + Ok(MaybeVirtualFile::VirtualFile(vf)) }) + .await } - #[test] - fn test_physical_files() -> Result<(), Error> { + #[tokio::test] + async fn test_physical_files() -> Result<(), Error> { test_files("physical_files", |path, open_options| { - open_options.open(path) + Ok(MaybeVirtualFile::File(open_options.open(path)?)) }) + .await } - fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> + async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> where - FD: Read + Write + Seek + FileExt, - OF: Fn(&Path, &OpenOptions) -> Result, + OF: Fn(&Path, &OpenOptions) -> Result, { let testdir = crate::config::PageServerConf::test_repo_dir(testname); std::fs::create_dir_all(&testdir)?; @@ -613,36 +694,36 @@ mod tests { &path_a, OpenOptions::new().write(true).create(true).truncate(true), )?; - file_a.write_all(b"foobar")?; + file_a.write_all(b"foobar").await?; // cannot read from a file opened in write-only mode - assert!(read_string(&mut file_a).is_err()); + let _ = file_a.read_string().await.unwrap_err(); // Close the file and re-open for reading let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; // cannot write to a file opened in read-only mode - assert!(file_a.write(b"bar").is_err()); + let _ = file_a.write_all(b"bar").await.unwrap_err(); // Try simple read - assert_eq!("foobar", read_string(&mut file_a)?); + assert_eq!("foobar", file_a.read_string().await?); // It's positioned at the EOF now. - assert_eq!("", read_string(&mut file_a)?); + assert_eq!("", file_a.read_string().await?); // Test seeks. assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); - assert_eq!("oobar", read_string(&mut file_a)?); + assert_eq!("oobar", file_a.read_string().await?); assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4); - assert_eq!("ar", read_string(&mut file_a)?); + assert_eq!("ar", file_a.read_string().await?); assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3); - assert_eq!("bar", read_string(&mut file_a)?); + assert_eq!("bar", file_a.read_string().await?); assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1); - assert_eq!("oobar", read_string(&mut file_a)?); + assert_eq!("oobar", file_a.read_string().await?); // Test erroneous seeks to before byte 0 assert!(file_a.seek(SeekFrom::End(-7)).is_err()); @@ -650,7 +731,7 @@ mod tests { assert!(file_a.seek(SeekFrom::Current(-2)).is_err()); // the erroneous seek should have left the position unchanged - assert_eq!("oobar", read_string(&mut file_a)?); + assert_eq!("oobar", file_a.read_string().await?); // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); @@ -662,10 +743,10 @@ mod tests { .create(true) .truncate(true), )?; - file_b.write_all_at(b"BAR", 3)?; - file_b.write_all_at(b"FOO", 0)?; + file_b.write_all_at(b"BAR", 3).await?; + file_b.write_all_at(b"FOO", 0).await?; - assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA"); + assert_eq!(file_b.read_string_at(2, 3).await?, "OBA"); // Open a lot of files, enough to cause some evictions. (Or to be precise, // open the same file many times. The effect is the same.) @@ -676,7 +757,7 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?; - assert_eq!("FOOBAR", read_string(&mut vfile)?); + assert_eq!("FOOBAR", vfile.read_string().await?); vfiles.push(vfile); } @@ -685,13 +766,13 @@ mod tests { // The underlying file descriptor for 'file_a' should be closed now. Try to read // from it again. We left the file positioned at offset 1 above. - assert_eq!("oobar", read_string(&mut file_a)?); + assert_eq!("oobar", file_a.read_string().await?); // Check that all the other FDs still work too. Use them in random order for // good measure. vfiles.as_mut_slice().shuffle(&mut thread_rng()); for vfile in vfiles.iter_mut() { - assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?); + assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?); } Ok(()) @@ -726,28 +807,22 @@ mod tests { let files = Arc::new(files); // Launch many threads, and use the virtual files concurrently in random order. - let mut threads = Vec::new(); - for threadno in 0..THREADS { - let builder = - thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno)); - + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(THREADS) + .thread_name("test_vfile_concurrency thread") + .build() + .unwrap(); + for _threadno in 0..THREADS { let files = files.clone(); - let thread = builder - .spawn(move || { - let mut buf = [0u8; SIZE]; - let mut rng = rand::thread_rng(); - for _ in 1..1000 { - let f = &files[rng.gen_range(0..files.len())]; - f.read_exact_at(&mut buf, 0).unwrap(); - assert!(buf == SAMPLE); - } - }) - .unwrap(); - threads.push(thread); - } - - for thread in threads { - thread.join().unwrap(); + rt.spawn(async move { + let mut buf = [0u8; SIZE]; + let mut rng = rand::rngs::OsRng; + for _ in 1..1000 { + let f = &files[rng.gen_range(0..files.len())]; + f.read_exact_at(&mut buf, 0).await.unwrap(); + assert!(buf == SAMPLE); + } + }); } Ok(())