Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make VirtualFile::{open, open_with_options, create,sync_all,with_file} async fn #5224

Merged
merged 8 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pageserver/ctl/src/layer_map_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {

// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path)?);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
Expand Down
2 changes: 1 addition & 1 deletion pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(&path)?;
let file = VirtualFile::create(&path).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let offs = wtr.write_blob(blob).await?;
Expand All @@ -251,7 +251,7 @@ mod tests {
wtr.flush_buffer().await?;
}

let file = VirtualFile::open(&path)?;
let file = VirtualFile::open(&path).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
Expand Down
7 changes: 4 additions & 3 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct EphemeralFile {
}

impl EphemeralFile {
pub fn create(
pub async fn create(
conf: &PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
Expand All @@ -44,7 +44,8 @@ impl EphemeralFile {
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)?;
)
.await?;

Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
Expand Down Expand Up @@ -286,7 +287,7 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;

let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;

let pos_foo = file.write_blob(b"foo").await?;
assert_eq!(
Expand Down
5 changes: 2 additions & 3 deletions pageserver/src/tenant/par_fsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};

use crate::virtual_file::VirtualFile;

fn fsync_path(path: &Path) -> io::Result<()> {
let file = VirtualFile::open(path)?;
// TODO use VirtualFile::fsync_all once we fully go async.
let file = std::fs::File::open(path)?;
file.sync_all()
}

Expand Down
5 changes: 3 additions & 2 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ impl DeltaLayerWriterInner {
// FIXME: throw an error instead?
let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range);

let mut file = VirtualFile::create(&path)?;
let mut file = VirtualFile::create(&path).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
Expand Down Expand Up @@ -732,7 +732,7 @@ impl DeltaLayerWriterInner {
};

// fsync the file
file.sync_all()?;
file.sync_all().await?;
// Rename the file to its final name
//
// Note: This overwrites any existing file. There shouldn't be any.
Expand Down Expand Up @@ -851,6 +851,7 @@ impl DeltaLayerInner {
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);

Expand Down
6 changes: 4 additions & 2 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ impl ImageLayerInner {
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0).await?;
Expand Down Expand Up @@ -540,7 +541,8 @@ impl ImageLayerWriterInner {
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)?;
)
.await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
Expand Down Expand Up @@ -645,7 +647,7 @@ impl ImageLayerWriterInner {
};

// fsync the file
file.sync_all()?;
file.sync_all().await?;

// Rename the file to its final name
//
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,15 @@ impl InMemoryLayer {
///
/// Create a new, empty, in-memory layer
///
pub fn create(
pub async fn create(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
start_lsn: Lsn,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");

let file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
let file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;

Ok(InMemoryLayer {
conf,
Expand Down
16 changes: 9 additions & 7 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2544,13 +2544,15 @@ impl Timeline {
///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)?;
let layer = guard
.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)
.await?;
Ok(layer)
}

Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/timeline/layer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl LayerManager {

/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
/// called within `get_layer_for_write`.
pub(crate) fn get_layer_for_write(
pub(crate) async fn get_layer_for_write(
&mut self,
lsn: Lsn,
last_record_lsn: Lsn,
Expand Down Expand Up @@ -129,7 +129,7 @@ impl LayerManager {
lsn
);

let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?;
let layer = Arc::new(new_layer);

self.layer_map.open_layer = Some(layer.clone());
Expand Down
62 changes: 39 additions & 23 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,25 +210,26 @@ impl CrashsafeOverwriteError {

impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
}

/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
pub async fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
}

/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub fn open_with_options(
pub async fn open_with_options(
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
Expand Down Expand Up @@ -299,11 +300,13 @@ impl VirtualFile {
// we bail out instead of causing damage.
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
file.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
drop(file); // before the rename, that's important!
// renames are atomic
Expand All @@ -316,26 +319,28 @@ impl VirtualFile {
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
final_parent_dirfd
.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
Ok(())
}

/// Call File::sync_all() on the underlying File.
pub fn sync_all(&self) -> Result<(), Error> {
self.with_file("fsync", |file| file.sync_all())?
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file("fsync", |file| file.sync_all()).await?
}

pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file("metadata", |file| file.metadata())?
self.with_file("metadata", |file| file.metadata()).await?
}

/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
async fn with_file<F, R>(&self, op: &str, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
Expand Down Expand Up @@ -415,7 +420,9 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))??
self.pos = self
.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))
.await??
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
Expand Down Expand Up @@ -503,7 +510,9 @@ impl VirtualFile {
}

pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
let result = self
.with_file("read", |file| file.read_at(buf, offset))
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
Expand All @@ -513,7 +522,9 @@ impl VirtualFile {
}

async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
let result = self
.with_file("write", |file| file.write_at(buf, offset))
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
Expand Down Expand Up @@ -625,6 +636,7 @@ mod tests {
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
use std::future::Future;
use std::io::Write;
use std::sync::Arc;

Expand Down Expand Up @@ -694,24 +706,25 @@ mod tests {
// results with VirtualFiles as with native Files. (Except that with
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| {
let vf = VirtualFile::open_with_options(path, open_options)?;
test_files("virtual_files", |path, open_options| async {
let vf = VirtualFile::open_with_options(path, open_options).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
}

#[tokio::test]
async fn test_physical_files() -> Result<(), Error> {
test_files("physical_files", |path, open_options| {
test_files("physical_files", |path, open_options| async {
Ok(MaybeVirtualFile::File(open_options.open(path)?))
})
.await
}

async fn test_files<OF>(testname: &str, openfunc: OF) -> Result<(), Error>
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
where
OF: Fn(&Path, &OpenOptions) -> Result<MaybeVirtualFile, std::io::Error>,
OF: Fn(&Path, &OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?;
Expand All @@ -720,14 +733,15 @@ mod tests {
let mut file_a = openfunc(
&path_a,
OpenOptions::new().write(true).create(true).truncate(true),
)?;
)
.await?;
file_a.write_all(b"foobar").await?;

// cannot read from a file opened in write-only mode
let _ = file_a.read_string().await.unwrap_err();

// Close the file and re-open for reading
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true)).await?;

// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar").await.unwrap_err();
Expand Down Expand Up @@ -769,7 +783,8 @@ mod tests {
.write(true)
.create(true)
.truncate(true),
)?;
)
.await?;
file_b.write_all_at(b"BAR", 3).await?;
file_b.write_all_at(b"FOO", 0).await?;

Expand All @@ -783,7 +798,7 @@ mod tests {

let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true)).await?;
assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile);
}
Expand All @@ -808,8 +823,8 @@ mod tests {
/// Test using VirtualFiles from many threads concurrently. This tests both using
/// a lot of VirtualFiles concurrently, causing evictions, and also using the same
/// VirtualFile from multiple threads concurrently.
#[test]
fn test_vfile_concurrency() -> Result<(), Error> {
#[tokio::test]
async fn test_vfile_concurrency() -> Result<(), Error> {
const SIZE: usize = 8 * 1024;
const VIRTUAL_FILES: usize = 100;
const THREADS: usize = 100;
Expand All @@ -828,7 +843,8 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
.await?;
files.push(f);
}
let files = Arc::new(files);
Expand Down