Skip to content

Commit

Permalink
Convert many VirtualFile APIs to async (#5190)
Browse files Browse the repository at this point in the history
## Problem

`VirtualFile` does both reading and writing, and it would be nice if
both could be converted to async, so that it doesn't have to support an
async read path and a blocking write path (especially for the locks this
is annoying as none of the lock implementations in std, tokio or
parking_lot have support for both async and blocking access).

## Summary of changes

This PR is some initial work on making the `VirtualFile` APIs async. It
can be reviewed commit-by-commit.

* Introduce the `MaybeVirtualFile` enum to be generic in a test that
compares real files with virtual files.
* Make various APIs of `VirtualFile` async, including `write_all_at`,
`read_at`, `read_exact_at`.

Part of #4743 , successor of #5180.

Co-authored-by: Christian Schwarz <[email protected]>
  • Loading branch information
arpad-m and problame authored Sep 4, 2023
1 parent 6cd497b commit 128a85b
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 137 deletions.
10 changes: 2 additions & 8 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,12 @@ pub trait BlobWriter {
/// An implementation of BlobWriter to write blobs to anything that
/// implements std::io::Write.
///
pub struct WriteBlobWriter<W>
where
W: std::io::Write,
{
pub struct WriteBlobWriter<W> {
inner: W,
offset: u64,
}

impl<W> WriteBlobWriter<W>
where
W: std::io::Write,
{
impl<W> WriteBlobWriter<W> {
pub fn new(inner: W, start_offset: u64) -> Self {
WriteBlobWriter {
inner,
Expand Down
9 changes: 5 additions & 4 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;

/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
Expand Down Expand Up @@ -155,9 +154,11 @@ impl FileBlockReader {
}

/// Read a page from the underlying file into given buffer.
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.await
}
/// Read a block.
///
Expand All @@ -179,7 +180,7 @@ impl FileBlockReader {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();

// Swap for read lock
Expand Down
17 changes: 11 additions & 6 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use tracing::*;
Expand Down Expand Up @@ -88,7 +87,8 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid();

// Swap for read lock
Expand Down Expand Up @@ -128,10 +128,15 @@ impl EphemeralFile {
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
match self.ephemeral_file.file.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
) {
match self
.ephemeral_file
.file
.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
)
.await
{
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
Expand Down
Loading

1 comment on commit 128a85b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1692 tests run: 1613 passed, 0 failed, 79 skipped (full report)


The comment gets automatically updated with the latest test results
128a85b at 2023-09-04T15:42:06.934Z :recycle:

Please sign in to comment.