Skip to content

Commit

Permalink
use WIP tokio_epoll_uring open_at for async VirtualFile::open
Browse files Browse the repository at this point in the history
This makes Delta/Image ::load fns fully tokio-epoll-uring
  • Loading branch information
problame committed Nov 8, 2023
1 parent be22467 commit 0003744
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 31 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/21
#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "problame/openat" }

[dev-dependencies]
criterion.workspace = true
Expand Down
11 changes: 5 additions & 6 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use camino::Utf8PathBuf;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -44,11 +43,11 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));

let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)
.await?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true).create(true);
VirtualFile::open_with_options_async(&filename, options).await?
};

Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
Expand Down
10 changes: 5 additions & 5 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,11 @@ impl ImageLayerWriterInner {
},
);
info!("new image layer {path}");
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)
.await?;
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create_new(true);
VirtualFile::open_with_options_async(&path, options).await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
Expand Down
111 changes: 93 additions & 18 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
//!
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
use crate::page_cache::PageWriteGuard;
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
use std::os::unix::fs::FileExt;
Expand Down Expand Up @@ -56,7 +55,7 @@ pub struct VirtualFile {
/// opened, in the VirtualFile::create() function, and strip the flag before
/// storing it here.
pub path: Utf8PathBuf,
open_options: OpenOptions,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,

// These are strings becase we only use them for metrics, and those expect strings.
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
Expand Down Expand Up @@ -279,24 +278,25 @@ macro_rules! with_file {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
Self::open_with_options_async(path, options).await
}

/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create(true).truncate(true);
Self::open_with_options_async(path, options).await
}

/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
#[cfg(test)]
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
Expand Down Expand Up @@ -357,15 +357,15 @@ impl VirtualFile {
));
};
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true),
)
.await?;
.create_new(true);
Self::open_with_options_async(tmp_path, options).await?
};
file.write_all(content).await?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
Expand All @@ -376,12 +376,74 @@ impl VirtualFile {
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
let final_parent_dirfd = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
Self::open_with_options_async(final_path_parent, options).await?
};
final_parent_dirfd.sync_all().await?;
Ok(())
}

/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub async fn open_with_options_async(
path: &Utf8Path,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
tenant_id = parts[parts.len() - 4].to_string();
timeline_id = parts[parts.len() - 2].to_string();
} else {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;

slot_guard.file = Some({
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd = system
.open(path, &open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
file
});

// Strip all options other than read and write.
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options;
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);

let vfile = VirtualFile {
handle: RwLock::new(handle),
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenant_id,
timeline_id,
};

Ok(vfile)
}

/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file| file
Expand Down Expand Up @@ -444,7 +506,20 @@ impl VirtualFile {
let (handle, mut slot_guard) = open_files.find_victim_slot().await;

// Open the physical file
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
let file = {
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd = system
.open(&self.path, &self.open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
file
};

// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
Expand Down

0 comments on commit 0003744

Please sign in to comment.