From bd04abbcab431abaf344875b607407fe40c3b9f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 6 Sep 2023 18:30:02 +0200 Subject: [PATCH] Make VirtualFile::{open_with_options, create} async fn --- pageserver/src/tenant/blob_io.rs | 2 +- pageserver/src/tenant/ephemeral_file.rs | 7 ++++--- .../src/tenant/storage_layer/delta_layer.rs | 2 +- .../src/tenant/storage_layer/image_layer.rs | 3 ++- .../src/tenant/storage_layer/inmemory_layer.rs | 4 ++-- pageserver/src/tenant/timeline.rs | 16 +++++++++------- pageserver/src/tenant/timeline/layer_manager.rs | 4 ++-- pageserver/src/virtual_file.rs | 14 +++++++++----- 8 files changed, 30 insertions(+), 22 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index ad8693b18b62..4fad1f3c14aa 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -238,7 +238,7 @@ mod tests { // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = VirtualFile::create(&path)?; + let file = VirtualFile::create(&path).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { let offs = wtr.write_blob(blob).await?; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 4c5fe424f3ca..887834cd9b60 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -28,7 +28,7 @@ pub struct EphemeralFile { } impl EphemeralFile { - pub fn create( + pub async fn create( conf: &PageServerConf, tenant_id: TenantId, timeline_id: TimelineId, @@ -44,7 +44,8 @@ impl EphemeralFile { let file = VirtualFile::open_with_options( &filename, OpenOptions::new().read(true).write(true).create(true), - )?; + ) + .await?; Ok(EphemeralFile { page_cache_file_id: page_cache::next_file_id(), @@ -286,7 +287,7 @@ mod tests { async fn test_ephemeral_blobs() -> Result<(), io::Error> { let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?; - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?; let pos_foo = file.write_blob(b"foo").await?; assert_eq!( diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ad97476a5f1b..6925cb59cd16 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -604,7 +604,7 @@ impl DeltaLayerWriterInner { // FIXME: throw an error instead? let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range); - let mut file = VirtualFile::create(&path)?; + let mut file = VirtualFile::create(&path).await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 3592877c5d25..2a6cabcc97b1 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -541,7 +541,8 @@ impl ImageLayerWriterInner { let mut file = VirtualFile::open_with_options( &path, std::fs::OpenOptions::new().write(true).create_new(true), - )?; + ) + .await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 374b0bb60c7c..3ff1c6bb1837 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -236,7 +236,7 @@ impl InMemoryLayer { /// /// Create a new, empty, in-memory layer /// - pub fn create( + pub async fn create( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, @@ -244,7 +244,7 @@ impl InMemoryLayer { ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); - let file = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let file = EphemeralFile::create(conf, tenant_id, timeline_id).await?; Ok(InMemoryLayer { conf, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7b6c6dbfadce..fa77d5be94ca 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2544,13 +2544,15 @@ impl Timeline { /// async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { let mut guard = self.layers.write().await; - let layer = guard.get_layer_for_write( - lsn, - self.get_last_record_lsn(), - self.conf, - self.timeline_id, - self.tenant_id, - )?; + let layer = guard + .get_layer_for_write( + lsn, + self.get_last_record_lsn(), + self.conf, + self.timeline_id, + self.tenant_id, + ) + .await?; Ok(layer) } diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 5522ea178803..3c88d31f24c2 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -87,7 +87,7 @@ impl LayerManager { /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer, /// called within `get_layer_for_write`. - pub(crate) fn get_layer_for_write( + pub(crate) async fn get_layer_for_write( &mut self, lsn: Lsn, last_record_lsn: Lsn, @@ -129,7 +129,7 @@ impl LayerManager { lsn ); - let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?; + let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?; let layer = Arc::new(new_layer); self.layer_map.open_layer = Some(layer.clone()); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 8b152782a1e6..07292f1e9981 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -211,16 +211,17 @@ impl CrashsafeOverwriteError { impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Path) -> Result { - Self::open_with_options(path, OpenOptions::new().read(true)) + Self::open_with_options(path, OpenOptions::new().read(true)).await } /// Create a new file for writing. If the file exists, it will be truncated. /// Like File::create. - pub fn create(path: &Path) -> Result { + pub async fn create(path: &Path) -> Result { Self::open_with_options( path, OpenOptions::new().write(true).create(true).truncate(true), ) + .await } /// Open a file with given options. @@ -228,7 +229,7 @@ impl VirtualFile { /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, /// they will be applied also when the file is subsequently re-opened, not only /// on the first time. Make sure that's sane! - pub fn open_with_options( + pub async fn open_with_options( path: &Path, open_options: &OpenOptions, ) -> Result { @@ -299,6 +300,7 @@ impl VirtualFile { // we bail out instead of causing damage. .create_new(true), ) + .await .map_err(CrashsafeOverwriteError::CreateTempfile)?; file.write_all(content) .await @@ -317,6 +319,7 @@ impl VirtualFile { // try_lock. let final_parent_dirfd = Self::open_with_options(final_path_parent, OpenOptions::new().read(true)) + .await .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?; final_parent_dirfd .sync_all() @@ -703,7 +706,7 @@ mod tests { // native files, you will run out of file descriptors if the ulimit // is low enough.) test_files("virtual_files", |path, open_options| { - let vf = VirtualFile::open_with_options(path, open_options)?; + let vf = VirtualFile::open_with_options(path, open_options).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) }) .await @@ -836,7 +839,8 @@ mod tests { // Open the file many times. let mut files = Vec::new(); for _ in 0..VIRTUAL_FILES { - let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?; + let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true)) + .await?; files.push(f); } let files = Arc::new(files);