From b2acb7368b28ad4188960bfedbe5f51d075f5f14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 7 Sep 2023 15:08:34 +0200 Subject: [PATCH 01/10] Switch the locks to tokio ones --- pageserver/src/virtual_file.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 1fa5fcc29791..452f384f7c66 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -18,7 +18,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::{RwLock, RwLockWriteGuard}; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -110,7 +110,7 @@ impl OpenFiles { /// /// On return, we hold a lock on the slot, and its 'tag' has been updated /// recently_used has been set. It's all ready for reuse. - fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { + async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { // // Run the clock algorithm to find a slot to replace. // @@ -142,7 +142,7 @@ impl OpenFiles { } retries += 1; } else { - slot_guard = slot.inner.write().unwrap(); + slot_guard = slot.inner.write().await; index = next; break; } @@ -244,7 +244,7 @@ impl VirtualFile { tenant_id = "*".to_string(); timeline_id = "*".to_string(); } - let (handle, mut slot_guard) = get_open_files().find_victim_slot(); + let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; let file = STORAGE_IO_TIME .with_label_values(&["open"]) .observe_closure_duration(|| open_options.open(path))?; @@ -353,12 +353,12 @@ impl VirtualFile { // We only need to hold the handle lock while we read the current handle. If // another thread closes the file and recycles the slot for a different file, // we will notice that the handle we read is no longer valid and retry. - let mut handle = *self.handle.read().unwrap(); + let mut handle = *self.handle.read().await; loop { // Check if the slot contains our File { let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().unwrap(); + let slot_guard = slot.inner.read().await; if slot_guard.tag == handle.tag { if let Some(file) = &slot_guard.file { // Found a cached file descriptor. @@ -373,7 +373,7 @@ impl VirtualFile { // The slot didn't contain our File. We will have to open it ourselves, // but before that, grab a write lock on handle in the VirtualFile, so // that no other thread will try to concurrently open the same file. - let handle_guard = self.handle.write().unwrap(); + let handle_guard = self.handle.write().await; // If another thread changed the handle while we were not holding the lock, // then the handle might now be valid again. Loop back to retry. @@ -387,7 +387,7 @@ impl VirtualFile { // We need to open the file ourselves. The handle in the VirtualFile is // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot(); + let (handle, mut slot_guard) = open_files.find_victim_slot().await; // Open the physical file let file = STORAGE_IO_TIME @@ -566,12 +566,19 @@ impl VirtualFile { impl Drop for VirtualFile { /// If a VirtualFile is dropped, close the underlying file if it was open. fn drop(&mut self) { - let handle = self.handle.get_mut().unwrap(); - - // We could check with a read-lock first, to avoid waiting on an - // unrelated I/O. + let handle = self.handle.get_mut(); + + // We don't have async drop so we cannot wait for the lock here. + // Instead, do a best-effort attempt at closing the underlying + // file descriptor by using `try_write`. + // This best-effort attempt should be quite good though + // as we have `&mut self` access. In other words, if the slot + // is still occupied by our file, we should be the only ones + // accessing it (and if it has been reassigned since, we don't + // need to bother with dropping anyways). let slot = &get_open_files().slots[handle.index]; - let mut slot_guard = slot.inner.write().unwrap(); + let Ok(mut slot_guard) = slot.inner.try_write() else { return }; + if slot_guard.tag == handle.tag { slot.recently_used.store(false, Ordering::Relaxed); // there is also operation "close-by-replace" for closes done on eviction for From 07d408fe31030d05d336e19e7bbe69fad59ca671 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 7 Sep 2023 15:48:30 +0200 Subject: [PATCH 02/10] Switch with_file to a macro Sadly Rust doesn't support closures yet that take on parameters with lifetimes. --- pageserver/src/virtual_file.rs | 161 ++++++++++++++++----------------- 1 file changed, 77 insertions(+), 84 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 452f384f7c66..afcfa1a65807 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -208,6 +208,78 @@ impl CrashsafeOverwriteError { } } +/// Helper macro internal to `VirtualFile` that looks up the underlying File, +/// opens it and evicts some other File if necessary. The passed parameter is +/// assumed to be a function available for the physical `File`. +/// +/// We are doing it via a macro as Rust doesn't support async closures that +/// take on parameters with lifetimes. +macro_rules! with_file { + ($this:expr, $($body:tt)*) => {{ let r: Result<_, Error> = { + let open_files = get_open_files(); + + let mut handle_guard = { + // Read the cached slot handle, and see if the slot that it points to still + // contains our File. + // + // We only need to hold the handle lock while we read the current handle. If + // another thread closes the file and recycles the slot for a different file, + // we will notice that the handle we read is no longer valid and retry. + let mut handle = *$this.handle.read().await; + loop { + // Check if the slot contains our File + { + let slot = &open_files.slots[handle.index]; + let slot_guard = slot.inner.read().await; + if slot_guard.tag == handle.tag { + #[allow(unused_mut)] + if let Some(file) = &slot_guard.file { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + let mut file: &File = file; + return Ok(file.$($body)*?); + } + } + } + + // The slot didn't contain our File. We will have to open it ourselves, + // but before that, grab a write lock on handle in the VirtualFile, so + // that no other thread will try to concurrently open the same file. + let handle_guard = $this.handle.write().await; + + // If another thread changed the handle while we were not holding the lock, + // then the handle might now be valid again. Loop back to retry. + if *handle_guard != handle { + handle = *handle_guard; + continue; + } + break handle_guard; + } + }; + + // We need to open the file ourselves. The handle in the VirtualFile is + // now locked in write-mode. Find a free slot to put it in. + let (handle, mut slot_guard) = open_files.find_victim_slot().await; + + // Open the physical file + #[allow(unused_mut)] + let mut file = STORAGE_IO_TIME + .with_label_values(&["open"]) + .observe_closure_duration(|| $this.open_options.open(&$this.path))?; + + // Perform the requested operation on it + let result = file.$($body)*?; + + // Store the File in the slot and update the handle in the VirtualFile + // to point to it. + slot_guard.file.replace(file); + + *handle_guard = handle; + + Ok(result) + }; r}}; +} + impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Path) -> Result { @@ -330,82 +402,11 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - self.with_file("fsync", |file| file.sync_all()).await? + with_file!(self, sync_all()) } pub async fn metadata(&self) -> Result { - self.with_file("metadata", |file| file.metadata()).await? - } - - /// Helper function that looks up the underlying File for this VirtualFile, - /// opening it and evicting some other File if necessary. It calls 'func' - /// with the physical File. - async fn with_file(&self, op: &str, mut func: F) -> Result - where - F: FnMut(&File) -> R, - { - let open_files = get_open_files(); - - let mut handle_guard = { - // Read the cached slot handle, and see if the slot that it points to still - // contains our File. - // - // We only need to hold the handle lock while we read the current handle. If - // another thread closes the file and recycles the slot for a different file, - // we will notice that the handle we read is no longer valid and retry. - let mut handle = *self.handle.read().await; - loop { - // Check if the slot contains our File - { - let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().await; - if slot_guard.tag == handle.tag { - if let Some(file) = &slot_guard.file { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - return Ok(STORAGE_IO_TIME - .with_label_values(&[op]) - .observe_closure_duration(|| func(file))); - } - } - } - - // The slot didn't contain our File. We will have to open it ourselves, - // but before that, grab a write lock on handle in the VirtualFile, so - // that no other thread will try to concurrently open the same file. - let handle_guard = self.handle.write().await; - - // If another thread changed the handle while we were not holding the lock, - // then the handle might now be valid again. Loop back to retry. - if *handle_guard != handle { - handle = *handle_guard; - continue; - } - break handle_guard; - } - }; - - // We need to open the file ourselves. The handle in the VirtualFile is - // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot().await; - - // Open the physical file - let file = STORAGE_IO_TIME - .with_label_values(&["open"]) - .observe_closure_duration(|| self.open_options.open(&self.path))?; - - // Perform the requested operation on it - let result = STORAGE_IO_TIME - .with_label_values(&[op]) - .observe_closure_duration(|| func(&file)); - - // Store the File in the slot and update the handle in the VirtualFile - // to point to it. - slot_guard.file.replace(file); - - *handle_guard = handle; - - Ok(result) + with_file!(self, metadata()) } pub fn remove(self) { @@ -419,11 +420,7 @@ impl VirtualFile { SeekFrom::Start(offset) => { self.pos = offset; } - SeekFrom::End(offset) => { - self.pos = self - .with_file("seek", |mut file| file.seek(SeekFrom::End(offset))) - .await?? - } + SeekFrom::End(offset) => self.pos = with_file!(self, seek(SeekFrom::End(offset)))?, SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; if pos < 0 { @@ -510,9 +507,7 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = self - .with_file("read", |file| file.read_at(buf, offset)) - .await?; + let result = with_file!(self, read_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -522,9 +517,7 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = self - .with_file("write", |file| file.write_at(buf, offset)) - .await?; + let result = with_file!(self, write_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) From 3da66764c57b70197d61b5c9134ca038597a76d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 8 Sep 2023 01:56:28 +0200 Subject: [PATCH 03/10] clippy --- pageserver/src/virtual_file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index afcfa1a65807..674ecccf1923 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -237,7 +237,7 @@ macro_rules! with_file { // Found a cached file descriptor. slot.recently_used.store(true, Ordering::Relaxed); let mut file: &File = file; - return Ok(file.$($body)*?); + return file.$($body)*; } } } From 489e133d31b8f264d1639d517fd4313309af3c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 9 Sep 2023 01:52:42 +0200 Subject: [PATCH 04/10] Return guards instead --- pageserver/src/virtual_file.rs | 138 ++++++++++++++++----------------- 1 file changed, 69 insertions(+), 69 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 674ecccf1923..faeb76dfabf7 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -18,7 +18,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tokio::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -208,76 +208,13 @@ impl CrashsafeOverwriteError { } } -/// Helper macro internal to `VirtualFile` that looks up the underlying File, -/// opens it and evicts some other File if necessary. The passed parameter is -/// assumed to be a function available for the physical `File`. -/// -/// We are doing it via a macro as Rust doesn't support async closures that -/// take on parameters with lifetimes. macro_rules! with_file { - ($this:expr, $($body:tt)*) => {{ let r: Result<_, Error> = { - let open_files = get_open_files(); - - let mut handle_guard = { - // Read the cached slot handle, and see if the slot that it points to still - // contains our File. - // - // We only need to hold the handle lock while we read the current handle. If - // another thread closes the file and recycles the slot for a different file, - // we will notice that the handle we read is no longer valid and retry. - let mut handle = *$this.handle.read().await; - loop { - // Check if the slot contains our File - { - let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().await; - if slot_guard.tag == handle.tag { - #[allow(unused_mut)] - if let Some(file) = &slot_guard.file { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - let mut file: &File = file; - return file.$($body)*; - } - } - } - - // The slot didn't contain our File. We will have to open it ourselves, - // but before that, grab a write lock on handle in the VirtualFile, so - // that no other thread will try to concurrently open the same file. - let handle_guard = $this.handle.write().await; - - // If another thread changed the handle while we were not holding the lock, - // then the handle might now be valid again. Loop back to retry. - if *handle_guard != handle { - handle = *handle_guard; - continue; - } - break handle_guard; - } - }; - - // We need to open the file ourselves. The handle in the VirtualFile is - // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot().await; - - // Open the physical file + ($this:expr, $($body:tt)*) => {{ + let sl = $this.lock_file().await?; #[allow(unused_mut)] - let mut file = STORAGE_IO_TIME - .with_label_values(&["open"]) - .observe_closure_duration(|| $this.open_options.open(&$this.path))?; - - // Perform the requested operation on it - let result = file.$($body)*?; - - // Store the File in the slot and update the handle in the VirtualFile - // to point to it. - slot_guard.file.replace(file); - - *handle_guard = handle; - - Ok(result) - }; r}}; + let mut file: &File = sl.file.as_ref().unwrap(); + file.$($body)* + }}; } impl VirtualFile { @@ -409,6 +346,69 @@ impl VirtualFile { with_file!(self, metadata()) } + /// Helper function internal to `VirtualFile` that looks up the underlying File, + /// opens it and evicts some other File if necessary. The passed parameter is + /// assumed to be a function available for the physical `File`. + /// + /// We are doing it via a macro as Rust doesn't support async closures that + /// take on parameters with lifetimes. + async fn lock_file(&self) -> Result, Error> { + let open_files = get_open_files(); + + let mut handle_guard = { + // Read the cached slot handle, and see if the slot that it points to still + // contains our File. + // + // We only need to hold the handle lock while we read the current handle. If + // another thread closes the file and recycles the slot for a different file, + // we will notice that the handle we read is no longer valid and retry. + let mut handle = *self.handle.read().await; + loop { + // Check if the slot contains our File + { + let slot = &open_files.slots[handle.index]; + let slot_guard = slot.inner.read().await; + if slot_guard.tag == handle.tag && slot_guard.file.is_some() { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + return Ok(slot_guard); + } + } + + // The slot didn't contain our File. We will have to open it ourselves, + // but before that, grab a write lock on handle in the VirtualFile, so + // that no other thread will try to concurrently open the same file. + let handle_guard = self.handle.write().await; + + // If another thread changed the handle while we were not holding the lock, + // then the handle might now be valid again. Loop back to retry. + if *handle_guard != handle { + handle = *handle_guard; + continue; + } + break handle_guard; + } + }; + + // We need to open the file ourselves. The handle in the VirtualFile is + // now locked in write-mode. Find a free slot to put it in. + let (handle, mut slot_guard) = open_files.find_victim_slot().await; + + // Open the physical file + #[allow(unused_mut)] + let mut file = STORAGE_IO_TIME + .with_label_values(&["open"]) + .observe_closure_duration(|| self.open_options.open(&self.path))?; + + // Store the File in the slot and update the handle in the VirtualFile + // to point to it. + slot_guard.file.replace(file); + + *handle_guard = handle; + + return Ok(slot_guard.downgrade()); + } + pub fn remove(self) { let path = self.path.clone(); drop(self); From d40d089bec1bc550746cb9a5fbcb037ac52b5af2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 9 Sep 2023 02:04:00 +0200 Subject: [PATCH 05/10] Create a custom made struct --- pageserver/src/virtual_file.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index faeb76dfabf7..513acdf62da9 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -211,9 +211,7 @@ impl CrashsafeOverwriteError { macro_rules! with_file { ($this:expr, $($body:tt)*) => {{ let sl = $this.lock_file().await?; - #[allow(unused_mut)] - let mut file: &File = sl.file.as_ref().unwrap(); - file.$($body)* + sl.as_ref().$($body)* }}; } @@ -352,7 +350,7 @@ impl VirtualFile { /// /// We are doing it via a macro as Rust doesn't support async closures that /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + async fn lock_file(&self) -> Result, Error> { let open_files = get_open_files(); let mut handle_guard = { @@ -371,7 +369,7 @@ impl VirtualFile { if slot_guard.tag == handle.tag && slot_guard.file.is_some() { // Found a cached file descriptor. slot.recently_used.store(true, Ordering::Relaxed); - return Ok(slot_guard); + return Ok(FileGuard { slot_guard }); } } @@ -406,7 +404,9 @@ impl VirtualFile { *handle_guard = handle; - return Ok(slot_guard.downgrade()); + return Ok(FileGuard { + slot_guard: slot_guard.downgrade(), + }); } pub fn remove(self) { @@ -527,6 +527,18 @@ impl VirtualFile { } } +struct FileGuard<'a> { + slot_guard: RwLockReadGuard<'a, SlotInner>, +} + +impl<'a> AsRef for FileGuard<'a> { + fn as_ref(&self) -> &File { + // This unwrap is safe because we only create `FileGuard`s + // if we know that the file is Some. + self.slot_guard.file.as_ref().unwrap() + } +} + #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk( From c587a4d5beb9b7e26a0724ac1ce5077fecbd75d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 9 Sep 2023 02:24:54 +0200 Subject: [PATCH 06/10] Add back metrics --- pageserver/src/virtual_file.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 513acdf62da9..2e9efa72e89b 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -19,6 +19,7 @@ use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tokio::time::Instant; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -209,9 +210,15 @@ impl CrashsafeOverwriteError { } macro_rules! with_file { - ($this:expr, $($body:tt)*) => {{ + ($this:expr, $label:expr, $($body:tt)*) => {{ let sl = $this.lock_file().await?; - sl.as_ref().$($body)* + let instant = Instant::now(); + let result = sl.as_ref().$($body)*; + let elapsed = instant.elapsed().as_secs_f64(); + STORAGE_IO_TIME + .with_label_values(&[$label]) + .observe(elapsed); + result }}; } @@ -337,11 +344,11 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, sync_all()) + with_file!(self, "fsync", sync_all()) } pub async fn metadata(&self) -> Result { - with_file!(self, metadata()) + with_file!(self, "read", metadata()) } /// Helper function internal to `VirtualFile` that looks up the underlying File, @@ -420,7 +427,9 @@ impl VirtualFile { SeekFrom::Start(offset) => { self.pos = offset; } - SeekFrom::End(offset) => self.pos = with_file!(self, seek(SeekFrom::End(offset)))?, + SeekFrom::End(offset) => { + self.pos = with_file!(self, "seek", seek(SeekFrom::End(offset)))? + } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; if pos < 0 { @@ -507,7 +516,7 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = with_file!(self, read_at(buf, offset)); + let result = with_file!(self, "read", read_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -517,7 +526,7 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = with_file!(self, write_at(buf, offset)); + let result = with_file!(self, "write", write_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) From ef1608ca5eb99caae8bf51ad5339f5db8daffd41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 11 Sep 2023 15:31:49 +0200 Subject: [PATCH 07/10] Use closure-like syntax in with_file macro This is closer to what we had before. Again, sad that there is no support for async closures :). --- pageserver/src/virtual_file.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 2cc3ca53824b..ee3fb1912e93 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -210,10 +210,10 @@ impl CrashsafeOverwriteError { } macro_rules! with_file { - ($this:expr, $op:expr, $($body:tt)*) => {{ - let sl = $this.lock_file().await?; + ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{ + let $ident = $this.lock_file().await?; let instant = Instant::now(); - let result = sl.as_ref().$($body)*; + let result = $($body)*; let elapsed = instant.elapsed().as_secs_f64(); STORAGE_IO_TIME_METRIC .get($op) @@ -345,11 +345,15 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, sync_all()) + with_file!(self, StorageIoOperation::Fsync, |file| file + .as_ref() + .sync_all()) } pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Read, metadata()) + with_file!(self, StorageIoOperation::Read, |file| file + .as_ref() + .metadata()) } /// Helper function internal to `VirtualFile` that looks up the underlying File, @@ -428,7 +432,9 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, seek(SeekFrom::End(offset)))? + self.pos = with_file!(self, StorageIoOperation::Seek, |file| file + .as_ref() + .seek(SeekFrom::End(offset)))? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -516,7 +522,9 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Read, read_at(buf, offset)); + let result = with_file!(self, StorageIoOperation::Read, |file| file + .as_ref() + .read_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -526,7 +534,9 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Write, write_at(buf, offset)); + let result = with_file!(self, StorageIoOperation::Write, |file| file + .as_ref() + .write_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) From b14552c79945d5ee67b033af6d2b604af8750db4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 11 Sep 2023 15:41:28 +0200 Subject: [PATCH 08/10] Span a task in the else branch --- pageserver/src/virtual_file.rs | 40 +++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index ee3fb1912e93..73124189e672 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -592,27 +592,37 @@ impl Drop for VirtualFile { fn drop(&mut self) { let handle = self.handle.get_mut(); - // We don't have async drop so we cannot wait for the lock here. - // Instead, do a best-effort attempt at closing the underlying - // file descriptor by using `try_write`. - // This best-effort attempt should be quite good though + fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) { + if slot_guard.tag == tag { + slot.recently_used.store(false, Ordering::Relaxed); + // there is also operation "close-by-replace" for closes done on eviction for + // comparison. + STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Close) + .observe_closure_duration(|| drop(slot_guard.file.take())); + } + } + + // We don't have async drop so we cannot directly await the lock here. + // Instead, first do a best-effort attempt at closing the underlying + // file descriptor by using `try_write`, and if that fails, spawn + // a tokio task to do it asynchronously: we just want it to be + // cleaned up eventually. + // Most of the time, the `try_lock` should succeed though, // as we have `&mut self` access. In other words, if the slot // is still occupied by our file, we should be the only ones // accessing it (and if it has been reassigned since, we don't // need to bother with dropping anyways). let slot = &get_open_files().slots[handle.index]; - let Ok(mut slot_guard) = slot.inner.try_write() else { - return; + if let Ok(slot_guard) = slot.inner.try_write() { + clean_slot(slot, slot_guard, handle.tag); + } else { + let tag = handle.tag; + tokio::spawn(async move { + let slot_guard = slot.inner.write().await; + clean_slot(slot, slot_guard, tag); + }); }; - - if slot_guard.tag == handle.tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also operation "close-by-replace" for closes done on eviction for - // comparison. - STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Close) - .observe_closure_duration(|| drop(slot_guard.file.take())); - } } } From b26710659ae9cec83751154bebca7b0b3a61de96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 11 Sep 2023 16:26:48 +0200 Subject: [PATCH 09/10] fixes --- pageserver/src/virtual_file.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 73124189e672..0bdedffb5ebf 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -351,7 +351,7 @@ impl VirtualFile { } pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Read, |file| file + with_file!(self, StorageIoOperation::Metadata, |file| file .as_ref() .metadata()) } @@ -610,9 +610,9 @@ impl Drop for VirtualFile { // cleaned up eventually. // Most of the time, the `try_lock` should succeed though, // as we have `&mut self` access. In other words, if the slot - // is still occupied by our file, we should be the only ones - // accessing it (and if it has been reassigned since, we don't - // need to bother with dropping anyways). + // is still occupied by our file, there should be no access from + // other I/O operations; the only other possible place to lock + // the slot is the lock algorithm looking for any free slots. let slot = &get_open_files().slots[handle.index]; if let Ok(slot_guard) = slot.inner.try_write() { clean_slot(slot, slot_guard, handle.tag); From e1518e2db18d0548f8e689edc6d8d79f6a2ce876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 11 Sep 2023 16:28:34 +0200 Subject: [PATCH 10/10] fix --- pageserver/src/virtual_file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 0bdedffb5ebf..5dc7e4e490a5 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -612,7 +612,7 @@ impl Drop for VirtualFile { // as we have `&mut self` access. In other words, if the slot // is still occupied by our file, there should be no access from // other I/O operations; the only other possible place to lock - // the slot is the lock algorithm looking for any free slots. + // the slot is the lock algorithm looking for free slots. let slot = &get_open_files().slots[handle.index]; if let Ok(slot_guard) = slot.inner.try_write() { clean_slot(slot, slot_guard, handle.tag);