Skip to content

Commit

Permalink
Make VirtualFile::{open_with_options, create} async fn
Browse files Browse the repository at this point in the history
  • Loading branch information
arpad-m committed Sep 6, 2023
1 parent ae1af9d commit bd04abb
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(&path)?;
let file = VirtualFile::create(&path).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let offs = wtr.write_blob(blob).await?;
Expand Down
7 changes: 4 additions & 3 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct EphemeralFile {
}

impl EphemeralFile {
pub fn create(
pub async fn create(
conf: &PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
Expand All @@ -44,7 +44,8 @@ impl EphemeralFile {
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)?;
)
.await?;

Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
Expand Down Expand Up @@ -286,7 +287,7 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;

let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;

let pos_foo = file.write_blob(b"foo").await?;
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ impl DeltaLayerWriterInner {
// FIXME: throw an error instead?
let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range);

let mut file = VirtualFile::create(&path)?;
let mut file = VirtualFile::create(&path).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,8 @@ impl ImageLayerWriterInner {
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)?;
)
.await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,15 @@ impl InMemoryLayer {
///
/// Create a new, empty, in-memory layer
///
pub fn create(
pub async fn create(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
start_lsn: Lsn,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");

let file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
let file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;

Ok(InMemoryLayer {
conf,
Expand Down
16 changes: 9 additions & 7 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2544,13 +2544,15 @@ impl Timeline {
///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)?;
let layer = guard
.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)
.await?;
Ok(layer)
}

Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/timeline/layer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl LayerManager {

/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
/// called within `get_layer_for_write`.
pub(crate) fn get_layer_for_write(
pub(crate) async fn get_layer_for_write(
&mut self,
lsn: Lsn,
last_record_lsn: Lsn,
Expand Down Expand Up @@ -129,7 +129,7 @@ impl LayerManager {
lsn
);

let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?;
let layer = Arc::new(new_layer);

self.layer_map.open_layer = Some(layer.clone());
Expand Down
14 changes: 9 additions & 5 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,24 +211,25 @@ impl CrashsafeOverwriteError {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
Self::open_with_options(path, OpenOptions::new().read(true)).await
}

/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
pub async fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
}

/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub fn open_with_options(
pub async fn open_with_options(
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
Expand Down Expand Up @@ -299,6 +300,7 @@ impl VirtualFile {
// we bail out instead of causing damage.
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
Expand All @@ -317,6 +319,7 @@ impl VirtualFile {
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
final_parent_dirfd
.sync_all()
Expand Down Expand Up @@ -703,7 +706,7 @@ mod tests {
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| {
let vf = VirtualFile::open_with_options(path, open_options)?;
let vf = VirtualFile::open_with_options(path, open_options).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
Expand Down Expand Up @@ -836,7 +839,8 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
.await?;
files.push(f);
}
let files = Arc::new(files);
Expand Down

0 comments on commit bd04abb

Please sign in to comment.