From 3a8b630f90702b26877c67523527cd504b0f7968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 4 Sep 2023 18:04:36 +0200 Subject: [PATCH 1/8] Make VirtualFile::sync_all async --- pageserver/src/tenant/par_fsync.rs | 5 ++--- pageserver/src/tenant/storage_layer/delta_layer.rs | 2 +- pageserver/src/tenant/storage_layer/image_layer.rs | 2 +- pageserver/src/virtual_file.rs | 4 +++- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pageserver/src/tenant/par_fsync.rs b/pageserver/src/tenant/par_fsync.rs index 3cbcfe8774d2..705b42aff7cf 100644 --- a/pageserver/src/tenant/par_fsync.rs +++ b/pageserver/src/tenant/par_fsync.rs @@ -4,10 +4,9 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; -use crate::virtual_file::VirtualFile; - fn fsync_path(path: &Path) -> io::Result<()> { - let file = VirtualFile::open(path)?; + // TODO use VirtualFile::fsync_all once we fully go async. + let file = std::fs::File::open(path)?; file.sync_all() } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b6fbf989624c..7305ec1486bb 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -732,7 +732,7 @@ impl DeltaLayerWriterInner { }; // fsync the file - file.sync_all()?; + file.sync_all().await?; // Rename the file to its final name // // Note: This overwrites any existing file. There shouldn't be any. diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 8f7fb8175c4b..2d0fcf0b236d 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -645,7 +645,7 @@ impl ImageLayerWriterInner { }; // fsync the file - file.sync_all()?; + file.sync_all().await?; // Rename the file to its final name // diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index c4241c427016..54f5a89c5bb1 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -304,6 +304,7 @@ impl VirtualFile { .await .map_err(CrashsafeOverwriteError::WriteContents)?; file.sync_all() + .await .map_err(CrashsafeOverwriteError::SyncTempfile)?; drop(file); // before the rename, that's important! // renames are atomic @@ -319,12 +320,13 @@ impl VirtualFile { .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?; final_parent_dirfd .sync_all() + .await .map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?; Ok(()) } /// Call File::sync_all() on the underlying File. - pub fn sync_all(&self) -> Result<(), Error> { + pub async fn sync_all(&self) -> Result<(), Error> { self.with_file("fsync", |file| file.sync_all())? } From 41e87f92c37aca3969b333b7c07520de6f1be5cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 6 Sep 2023 18:19:14 +0200 Subject: [PATCH 2/8] Make VirtualFile::with_file async --- pageserver/src/virtual_file.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 54f5a89c5bb1..ec3032843680 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -327,17 +327,17 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - self.with_file("fsync", |file| file.sync_all())? + self.with_file("fsync", |file| file.sync_all()).await? } pub async fn metadata(&self) -> Result { - self.with_file("metadata", |file| file.metadata())? + self.with_file("metadata", |file| file.metadata()).await? } /// Helper function that looks up the underlying File for this VirtualFile, /// opening it and evicting some other File if necessary. It calls 'func' /// with the physical File. - fn with_file(&self, op: &str, mut func: F) -> Result + async fn with_file(&self, op: &str, mut func: F) -> Result where F: FnMut(&File) -> R, { @@ -417,7 +417,9 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))?? + self.pos = self + .with_file("seek", |mut file| file.seek(SeekFrom::End(offset))) + .await?? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -505,7 +507,9 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = self.with_file("read", |file| file.read_at(buf, offset))?; + let result = self + .with_file("read", |file| file.read_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -515,7 +519,9 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = self.with_file("write", |file| file.write_at(buf, offset))?; + let result = self + .with_file("write", |file| file.write_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) From ae1af9d10e449b6fafd62f417d927fd003212af0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 6 Sep 2023 18:26:51 +0200 Subject: [PATCH 3/8] Make VirtualFile::open async fn --- pageserver/ctl/src/layer_map_analyzer.rs | 2 +- pageserver/ctl/src/layers.rs | 2 +- pageserver/src/tenant/blob_io.rs | 2 +- pageserver/src/tenant/storage_layer/delta_layer.rs | 1 + pageserver/src/tenant/storage_layer/image_layer.rs | 1 + pageserver/src/virtual_file.rs | 2 +- 6 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 32d0d1bed286..495dae87e361 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -97,7 +97,7 @@ pub(crate) fn parse_filename(name: &str) -> Option { // Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH" async fn get_holes(path: &Path, max_holes: usize) -> Result> { - let file = FileBlockReader::new(VirtualFile::open(path)?); + let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index ff2044653a79..33a6f197cfa3 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -48,7 +48,7 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { let path = path.as_ref(); virtual_file::init(10); page_cache::init(100); - let file = FileBlockReader::new(VirtualFile::open(path)?); + let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 71db8d297874..ad8693b18b62 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -251,7 +251,7 @@ mod tests { wtr.flush_buffer().await?; } - let file = VirtualFile::open(&path)?; + let file = VirtualFile::open(&path).await?; let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new(rdr); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 7305ec1486bb..ad97476a5f1b 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -851,6 +851,7 @@ impl DeltaLayerInner { summary: Option, ) -> anyhow::Result { let file = VirtualFile::open(path) + .await .with_context(|| format!("Failed to open file '{}'", path.display()))?; let file = FileBlockReader::new(file); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2d0fcf0b236d..3592877c5d25 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -438,6 +438,7 @@ impl ImageLayerInner { summary: Option, ) -> anyhow::Result { let file = VirtualFile::open(path) + .await .with_context(|| format!("Failed to open file '{}'", path.display()))?; let file = FileBlockReader::new(file); let summary_blk = file.read_blk(0).await?; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index ec3032843680..8b152782a1e6 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -210,7 +210,7 @@ impl CrashsafeOverwriteError { impl VirtualFile { /// Open a file in read-only mode. Like File::open. - pub fn open(path: &Path) -> Result { + pub async fn open(path: &Path) -> Result { Self::open_with_options(path, OpenOptions::new().read(true)) } From bd04abbcab431abaf344875b607407fe40c3b9f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 6 Sep 2023 18:30:02 +0200 Subject: [PATCH 4/8] Make VirtualFile::{open_with_options, create} async fn --- pageserver/src/tenant/blob_io.rs | 2 +- pageserver/src/tenant/ephemeral_file.rs | 7 ++++--- .../src/tenant/storage_layer/delta_layer.rs | 2 +- .../src/tenant/storage_layer/image_layer.rs | 3 ++- .../src/tenant/storage_layer/inmemory_layer.rs | 4 ++-- pageserver/src/tenant/timeline.rs | 16 +++++++++------- pageserver/src/tenant/timeline/layer_manager.rs | 4 ++-- pageserver/src/virtual_file.rs | 14 +++++++++----- 8 files changed, 30 insertions(+), 22 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index ad8693b18b62..4fad1f3c14aa 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -238,7 +238,7 @@ mod tests { // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = VirtualFile::create(&path)?; + let file = VirtualFile::create(&path).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { let offs = wtr.write_blob(blob).await?; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 4c5fe424f3ca..887834cd9b60 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -28,7 +28,7 @@ pub struct EphemeralFile { } impl EphemeralFile { - pub fn create( + pub async fn create( conf: &PageServerConf, tenant_id: TenantId, timeline_id: TimelineId, @@ -44,7 +44,8 @@ impl EphemeralFile { let file = VirtualFile::open_with_options( &filename, OpenOptions::new().read(true).write(true).create(true), - )?; + ) + .await?; Ok(EphemeralFile { page_cache_file_id: page_cache::next_file_id(), @@ -286,7 +287,7 @@ mod tests { async fn test_ephemeral_blobs() -> Result<(), io::Error> { let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?; - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?; let pos_foo = file.write_blob(b"foo").await?; assert_eq!( diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ad97476a5f1b..6925cb59cd16 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -604,7 +604,7 @@ impl DeltaLayerWriterInner { // FIXME: throw an error instead? let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range); - let mut file = VirtualFile::create(&path)?; + let mut file = VirtualFile::create(&path).await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 3592877c5d25..2a6cabcc97b1 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -541,7 +541,8 @@ impl ImageLayerWriterInner { let mut file = VirtualFile::open_with_options( &path, std::fs::OpenOptions::new().write(true).create_new(true), - )?; + ) + .await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 374b0bb60c7c..3ff1c6bb1837 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -236,7 +236,7 @@ impl InMemoryLayer { /// /// Create a new, empty, in-memory layer /// - pub fn create( + pub async fn create( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, @@ -244,7 +244,7 @@ impl InMemoryLayer { ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); - let file = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let file = EphemeralFile::create(conf, tenant_id, timeline_id).await?; Ok(InMemoryLayer { conf, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7b6c6dbfadce..fa77d5be94ca 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2544,13 +2544,15 @@ impl Timeline { /// async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { let mut guard = self.layers.write().await; - let layer = guard.get_layer_for_write( - lsn, - self.get_last_record_lsn(), - self.conf, - self.timeline_id, - self.tenant_id, - )?; + let layer = guard + .get_layer_for_write( + lsn, + self.get_last_record_lsn(), + self.conf, + self.timeline_id, + self.tenant_id, + ) + .await?; Ok(layer) } diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 5522ea178803..3c88d31f24c2 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -87,7 +87,7 @@ impl LayerManager { /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer, /// called within `get_layer_for_write`. - pub(crate) fn get_layer_for_write( + pub(crate) async fn get_layer_for_write( &mut self, lsn: Lsn, last_record_lsn: Lsn, @@ -129,7 +129,7 @@ impl LayerManager { lsn ); - let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?; + let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?; let layer = Arc::new(new_layer); self.layer_map.open_layer = Some(layer.clone()); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 8b152782a1e6..07292f1e9981 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -211,16 +211,17 @@ impl CrashsafeOverwriteError { impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Path) -> Result { - Self::open_with_options(path, OpenOptions::new().read(true)) + Self::open_with_options(path, OpenOptions::new().read(true)).await } /// Create a new file for writing. If the file exists, it will be truncated. /// Like File::create. - pub fn create(path: &Path) -> Result { + pub async fn create(path: &Path) -> Result { Self::open_with_options( path, OpenOptions::new().write(true).create(true).truncate(true), ) + .await } /// Open a file with given options. @@ -228,7 +229,7 @@ impl VirtualFile { /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, /// they will be applied also when the file is subsequently re-opened, not only /// on the first time. Make sure that's sane! - pub fn open_with_options( + pub async fn open_with_options( path: &Path, open_options: &OpenOptions, ) -> Result { @@ -299,6 +300,7 @@ impl VirtualFile { // we bail out instead of causing damage. .create_new(true), ) + .await .map_err(CrashsafeOverwriteError::CreateTempfile)?; file.write_all(content) .await @@ -317,6 +319,7 @@ impl VirtualFile { // try_lock. let final_parent_dirfd = Self::open_with_options(final_path_parent, OpenOptions::new().read(true)) + .await .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?; final_parent_dirfd .sync_all() @@ -703,7 +706,7 @@ mod tests { // native files, you will run out of file descriptors if the ulimit // is low enough.) test_files("virtual_files", |path, open_options| { - let vf = VirtualFile::open_with_options(path, open_options)?; + let vf = VirtualFile::open_with_options(path, open_options).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) }) .await @@ -836,7 +839,8 @@ mod tests { // Open the file many times. let mut files = Vec::new(); for _ in 0..VIRTUAL_FILES { - let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?; + let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true)) + .await?; files.push(f); } let files = Arc::new(files); From f64a2d723a8a25a07bb85b4384c49a2c43f96c20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 6 Sep 2023 19:56:28 +0200 Subject: [PATCH 5/8] Fix tests --- pageserver/src/virtual_file.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 07292f1e9981..8d4e5f73c0ec 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -636,6 +636,7 @@ mod tests { use rand::seq::SliceRandom; use rand::thread_rng; use rand::Rng; + use std::future::Future; use std::io::Write; use std::sync::Arc; @@ -705,7 +706,7 @@ mod tests { // results with VirtualFiles as with native Files. (Except that with // native files, you will run out of file descriptors if the ulimit // is low enough.) - test_files("virtual_files", |path, open_options| { + test_files("virtual_files", |path, open_options| async { let vf = VirtualFile::open_with_options(path, open_options).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) }) @@ -714,15 +715,16 @@ mod tests { #[tokio::test] async fn test_physical_files() -> Result<(), Error> { - test_files("physical_files", |path, open_options| { + test_files("physical_files", |path, open_options| async { Ok(MaybeVirtualFile::File(open_options.open(path)?)) }) .await } - async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> + async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> where - OF: Fn(&Path, &OpenOptions) -> Result, + OF: Fn(&Path, &OpenOptions) -> FT, + FT: Future>, { let testdir = crate::config::PageServerConf::test_repo_dir(testname); std::fs::create_dir_all(&testdir)?; @@ -731,14 +733,15 @@ mod tests { let mut file_a = openfunc( &path_a, OpenOptions::new().write(true).create(true).truncate(true), - )?; + ) + .await?; file_a.write_all(b"foobar").await?; // cannot read from a file opened in write-only mode let _ = file_a.read_string().await.unwrap_err(); // Close the file and re-open for reading - let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; + let mut file_a = openfunc(&path_a, OpenOptions::new().read(true)).await?; // cannot write to a file opened in read-only mode let _ = file_a.write_all(b"bar").await.unwrap_err(); @@ -780,7 +783,8 @@ mod tests { .write(true) .create(true) .truncate(true), - )?; + ) + .await?; file_b.write_all_at(b"BAR", 3).await?; file_b.write_all_at(b"FOO", 0).await?; @@ -794,7 +798,7 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { - let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?; + let mut vfile = openfunc(&path_b, OpenOptions::new().read(true)).await?; assert_eq!("FOOBAR", vfile.read_string().await?); vfiles.push(vfile); } @@ -819,8 +823,8 @@ mod tests { /// Test using VirtualFiles from many threads concurrently. This tests both using /// a lot of VirtualFiles concurrently, causing evictions, and also using the same /// VirtualFile from multiple threads concurrently. - #[test] - fn test_vfile_concurrency() -> Result<(), Error> { + #[tokio::test] + async fn test_vfile_concurrency() -> Result<(), Error> { const SIZE: usize = 8 * 1024; const VIRTUAL_FILES: usize = 100; const THREADS: usize = 100; From 9e23a91c0bce8ac44914b122ed3722a9f22a76a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 7 Sep 2023 17:36:31 +0200 Subject: [PATCH 6/8] Fix them for real this time --- pageserver/src/virtual_file.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 8d4e5f73c0ec..df621863544e 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -723,7 +723,7 @@ mod tests { async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> where - OF: Fn(&Path, &OpenOptions) -> FT, + OF: Fn(PathBuf, OpenOptions) -> FT, FT: Future>, { let testdir = crate::config::PageServerConf::test_repo_dir(testname); @@ -731,8 +731,12 @@ mod tests { let path_a = testdir.join("file_a"); let mut file_a = openfunc( - &path_a, - OpenOptions::new().write(true).create(true).truncate(true), + path_a.clone(), + OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .to_owned(), ) .await?; file_a.write_all(b"foobar").await?; @@ -741,7 +745,7 @@ mod tests { let _ = file_a.read_string().await.unwrap_err(); // Close the file and re-open for reading - let mut file_a = openfunc(&path_a, OpenOptions::new().read(true)).await?; + let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?; // cannot write to a file opened in read-only mode let _ = file_a.write_all(b"bar").await.unwrap_err(); @@ -777,12 +781,13 @@ mod tests { // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); let mut file_b = openfunc( - &path_b, + path_b.clone(), OpenOptions::new() .read(true) .write(true) .create(true) - .truncate(true), + .truncate(true) + .to_owned(), ) .await?; file_b.write_all_at(b"BAR", 3).await?; @@ -798,7 +803,8 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { - let mut vfile = openfunc(&path_b, OpenOptions::new().read(true)).await?; + let mut vfile = + openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?; assert_eq!("FOOBAR", vfile.read_string().await?); vfiles.push(vfile); } From 089012051765e77d31488bbeff40019eeb2ad5ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 7 Sep 2023 17:47:18 +0200 Subject: [PATCH 7/8] fix --- pageserver/src/virtual_file.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index df621863544e..7a98ab14fb21 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -706,8 +706,8 @@ mod tests { // results with VirtualFiles as with native Files. (Except that with // native files, you will run out of file descriptors if the ulimit // is low enough.) - test_files("virtual_files", |path, open_options| async { - let vf = VirtualFile::open_with_options(path, open_options).await?; + test_files("virtual_files", |path, open_options| async move { + let vf = VirtualFile::open_with_options(&path, &open_options).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) }) .await @@ -715,7 +715,7 @@ mod tests { #[tokio::test] async fn test_physical_files() -> Result<(), Error> { - test_files("physical_files", |path, open_options| async { + test_files("physical_files", |path, open_options| async move { Ok(MaybeVirtualFile::File(open_options.open(path)?)) }) .await From eb2dd7118e4d2790019150f3bf23a325289aefbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 7 Sep 2023 19:25:56 +0200 Subject: [PATCH 8/8] Fix test_vfile_concurrency test --- pageserver/src/virtual_file.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 7a98ab14fb21..1fa5fcc29791 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -861,9 +861,10 @@ mod tests { .thread_name("test_vfile_concurrency thread") .build() .unwrap(); + let mut hdls = Vec::new(); for _threadno in 0..THREADS { let files = files.clone(); - rt.spawn(async move { + let hdl = rt.spawn(async move { let mut buf = [0u8; SIZE]; let mut rng = rand::rngs::OsRng; for _ in 1..1000 { @@ -872,7 +873,12 @@ mod tests { assert!(buf == SAMPLE); } }); + hdls.push(hdl); } + for hdl in hdls { + hdl.await?; + } + std::mem::forget(rt); Ok(()) }