From f781df53d64bcbd89104c391ba2582800f3dd89e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 12 Sep 2023 16:02:55 +0200 Subject: [PATCH] Revert "Use tokio locks in VirtualFile and turn with_file into macro (#5247)" This reverts commit 76cc87398c58aa8856083bd3b17403af56715b17. --- pageserver/src/virtual_file.rs | 150 ++++++++++++--------------------- 1 file changed, 56 insertions(+), 94 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 5dc7e4e490a5..9612b8ec6d5e 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -18,8 +18,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use tokio::time::Instant; +use std::sync::{RwLock, RwLockWriteGuard}; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -111,7 +110,7 @@ impl OpenFiles { /// /// On return, we hold a lock on the slot, and its 'tag' has been updated /// recently_used has been set. It's all ready for reuse. - async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { + fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { // // Run the clock algorithm to find a slot to replace. // @@ -143,7 +142,7 @@ impl OpenFiles { } retries += 1; } else { - slot_guard = slot.inner.write().await; + slot_guard = slot.inner.write().unwrap(); index = next; break; } @@ -209,19 +208,6 @@ impl CrashsafeOverwriteError { } } -macro_rules! with_file { - ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{ - let $ident = $this.lock_file().await?; - let instant = Instant::now(); - let result = $($body)*; - let elapsed = instant.elapsed().as_secs_f64(); - STORAGE_IO_TIME_METRIC - .get($op) - .observe(elapsed); - result - }}; -} - impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Path) -> Result { @@ -258,7 +244,7 @@ impl VirtualFile { tenant_id = "*".to_string(); timeline_id = "*".to_string(); } - let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; + let (handle, mut slot_guard) = get_open_files().find_victim_slot(); let file = STORAGE_IO_TIME_METRIC .get(StorageIoOperation::Open) @@ -345,24 +331,22 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, |file| file - .as_ref() - .sync_all()) + self.with_file(StorageIoOperation::Fsync, |file| file.sync_all()) + .await? } pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Metadata, |file| file - .as_ref() - .metadata()) + self.with_file(StorageIoOperation::Metadata, |file| file.metadata()) + .await? } - /// Helper function internal to `VirtualFile` that looks up the underlying File, - /// opens it and evicts some other File if necessary. The passed parameter is - /// assumed to be a function available for the physical `File`. - /// - /// We are doing it via a macro as Rust doesn't support async closures that - /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + /// Helper function that looks up the underlying File for this VirtualFile, + /// opening it and evicting some other File if necessary. It calls 'func' + /// with the physical File. + async fn with_file(&self, op: StorageIoOperation, mut func: F) -> Result + where + F: FnMut(&File) -> R, + { let open_files = get_open_files(); let mut handle_guard = { @@ -372,23 +356,27 @@ impl VirtualFile { // We only need to hold the handle lock while we read the current handle. If // another thread closes the file and recycles the slot for a different file, // we will notice that the handle we read is no longer valid and retry. - let mut handle = *self.handle.read().await; + let mut handle = *self.handle.read().unwrap(); loop { // Check if the slot contains our File { let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().await; - if slot_guard.tag == handle.tag && slot_guard.file.is_some() { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - return Ok(FileGuard { slot_guard }); + let slot_guard = slot.inner.read().unwrap(); + if slot_guard.tag == handle.tag { + if let Some(file) = &slot_guard.file { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + return Ok(STORAGE_IO_TIME_METRIC + .get(op) + .observe_closure_duration(|| func(file))); + } } } // The slot didn't contain our File. We will have to open it ourselves, // but before that, grab a write lock on handle in the VirtualFile, so // that no other thread will try to concurrently open the same file. - let handle_guard = self.handle.write().await; + let handle_guard = self.handle.write().unwrap(); // If another thread changed the handle while we were not holding the lock, // then the handle might now be valid again. Loop back to retry. @@ -402,22 +390,25 @@ impl VirtualFile { // We need to open the file ourselves. The handle in the VirtualFile is // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot().await; + let (handle, mut slot_guard) = open_files.find_victim_slot(); // Open the physical file let file = STORAGE_IO_TIME_METRIC .get(StorageIoOperation::Open) .observe_closure_duration(|| self.open_options.open(&self.path))?; + // Perform the requested operation on it + let result = STORAGE_IO_TIME_METRIC + .get(op) + .observe_closure_duration(|| func(&file)); + // Store the File in the slot and update the handle in the VirtualFile // to point to it. slot_guard.file.replace(file); *handle_guard = handle; - return Ok(FileGuard { - slot_guard: slot_guard.downgrade(), - }); + Ok(result) } pub fn remove(self) { @@ -432,9 +423,11 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, |file| file - .as_ref() - .seek(SeekFrom::End(offset)))? + self.pos = self + .with_file(StorageIoOperation::Seek, |mut file| { + file.seek(SeekFrom::End(offset)) + }) + .await?? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -522,9 +515,9 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Read, |file| file - .as_ref() - .read_at(buf, offset)); + let result = self + .with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -534,9 +527,9 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Write, |file| file - .as_ref() - .write_at(buf, offset)); + let result = self + .with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) @@ -546,18 +539,6 @@ impl VirtualFile { } } -struct FileGuard<'a> { - slot_guard: RwLockReadGuard<'a, SlotInner>, -} - -impl<'a> AsRef for FileGuard<'a> { - fn as_ref(&self) -> &File { - // This unwrap is safe because we only create `FileGuard`s - // if we know that the file is Some. - self.slot_guard.file.as_ref().unwrap() - } -} - #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk( @@ -590,39 +571,20 @@ impl VirtualFile { impl Drop for VirtualFile { /// If a VirtualFile is dropped, close the underlying file if it was open. fn drop(&mut self) { - let handle = self.handle.get_mut(); - - fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) { - if slot_guard.tag == tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also operation "close-by-replace" for closes done on eviction for - // comparison. - STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Close) - .observe_closure_duration(|| drop(slot_guard.file.take())); - } - } + let handle = self.handle.get_mut().unwrap(); - // We don't have async drop so we cannot directly await the lock here. - // Instead, first do a best-effort attempt at closing the underlying - // file descriptor by using `try_write`, and if that fails, spawn - // a tokio task to do it asynchronously: we just want it to be - // cleaned up eventually. - // Most of the time, the `try_lock` should succeed though, - // as we have `&mut self` access. In other words, if the slot - // is still occupied by our file, there should be no access from - // other I/O operations; the only other possible place to lock - // the slot is the lock algorithm looking for free slots. + // We could check with a read-lock first, to avoid waiting on an + // unrelated I/O. let slot = &get_open_files().slots[handle.index]; - if let Ok(slot_guard) = slot.inner.try_write() { - clean_slot(slot, slot_guard, handle.tag); - } else { - let tag = handle.tag; - tokio::spawn(async move { - let slot_guard = slot.inner.write().await; - clean_slot(slot, slot_guard, tag); - }); - }; + let mut slot_guard = slot.inner.write().unwrap(); + if slot_guard.tag == handle.tag { + slot.recently_used.store(false, Ordering::Relaxed); + // there is also operation "close-by-replace" for closes done on eviction for + // comparison. + STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Close) + .observe_closure_duration(|| drop(slot_guard.file.take())); + } } }