Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use tokio locks in VirtualFile and turn with_file into macro #5247

Merged
merged 11 commits into from
Sep 11, 2023
186 changes: 93 additions & 93 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockWriteGuard};

///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
Expand Down Expand Up @@ -110,7 +110,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
Expand Down Expand Up @@ -142,7 +142,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
slot_guard = slot.inner.write().await;
index = next;
break;
}
Expand Down Expand Up @@ -208,6 +208,78 @@ impl CrashsafeOverwriteError {
}
}

/// Helper macro internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
macro_rules! with_file {
($this:expr, $($body:tt)*) => {{ let r: Result<_, Error> = {
let open_files = get_open_files();

let mut handle_guard = {
// Read the cached slot handle, and see if the slot that it points to still
// contains our File.
//
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *$this.handle.read().await;
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag {
#[allow(unused_mut)]
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
let mut file: &File = file;
return file.$($body)*;
}
}
}

// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = $this.handle.write().await;

// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
if *handle_guard != handle {
handle = *handle_guard;
continue;
}
break handle_guard;
}
};

// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot().await;

// Open the physical file
#[allow(unused_mut)]
let mut file = STORAGE_IO_TIME
.with_label_values(&["open"])
.observe_closure_duration(|| $this.open_options.open(&$this.path))?;

// Perform the requested operation on it
let result = file.$($body)*?;
arpad-m marked this conversation as resolved.
Show resolved Hide resolved

// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
slot_guard.file.replace(file);

*handle_guard = handle;

Ok(result)
}; r}};
}

impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Expand Down Expand Up @@ -244,7 +316,7 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.observe_closure_duration(|| open_options.open(path))?;
Expand Down Expand Up @@ -330,82 +402,11 @@ impl VirtualFile {

/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file("fsync", |file| file.sync_all()).await?
with_file!(self, sync_all())
}

pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file("metadata", |file| file.metadata()).await?
}

/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
let open_files = get_open_files();

let mut handle_guard = {
// Read the cached slot handle, and see if the slot that it points to still
// contains our File.
//
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(file)));
}
}
}

// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();

// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
if *handle_guard != handle {
handle = *handle_guard;
continue;
}
break handle_guard;
}
};

// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();

// Open the physical file
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.observe_closure_duration(|| self.open_options.open(&self.path))?;

// Perform the requested operation on it
let result = STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(&file));
koivunej marked this conversation as resolved.
Show resolved Hide resolved

// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
slot_guard.file.replace(file);

*handle_guard = handle;

Ok(result)
with_file!(self, metadata())
}

pub fn remove(self) {
Expand All @@ -419,11 +420,7 @@ impl VirtualFile {
SeekFrom::Start(offset) => {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self
.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))
.await??
}
SeekFrom::End(offset) => self.pos = with_file!(self, seek(SeekFrom::End(offset)))?,
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
Expand Down Expand Up @@ -510,9 +507,7 @@ impl VirtualFile {
}

pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file("read", |file| file.read_at(buf, offset))
.await?;
let result = with_file!(self, read_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
Expand All @@ -522,9 +517,7 @@ impl VirtualFile {
}

async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file("write", |file| file.write_at(buf, offset))
.await?;
let result = with_file!(self, write_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
Expand Down Expand Up @@ -566,12 +559,19 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();

// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let handle = self.handle.get_mut();

// We don't have async drop so we cannot wait for the lock here.
// Instead, do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`.
koivunej marked this conversation as resolved.
Show resolved Hide resolved
// This best-effort attempt should be quite good though
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, we should be the only ones
// accessing it (and if it has been reassigned since, we don't
// need to bother with dropping anyways).
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
let Ok(mut slot_guard) = slot.inner.try_write() else { return };

arpad-m marked this conversation as resolved.
Show resolved Hide resolved
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
Expand Down