Skip to content

Commit

Permalink
use tokio_epoll_uring for read path & VirtualFile::open
Browse files Browse the repository at this point in the history
This makes Delta/Image ::load fns fully tokio-epoll-uring
  • Loading branch information
problame committed Dec 11, 2023
1 parent 7d90ef6 commit 65d0acb
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 87 deletions.
44 changes: 40 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/25
#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }

[dev-dependencies]
criterion.workspace = true
Expand Down
14 changes: 9 additions & 5 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;

/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
Expand Down Expand Up @@ -169,7 +169,11 @@ impl FileBlockReader {
}

/// Read a page from the underlying file into given buffer.
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
async fn fill_buffer(
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
Expand All @@ -196,9 +200,9 @@ impl FileBlockReader {
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
let write_guard = self.fill_buffer(write_guard, blknum).await?;
Ok(write_guard.mark_valid().into())
}
}
Expand Down
20 changes: 9 additions & 11 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::virtual_file::VirtualFile;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -45,11 +44,11 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));

let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)
.await?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true).create(true);
VirtualFile::open_with_options_async(&filename, options).await?
};

Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
Expand Down Expand Up @@ -89,11 +88,10 @@ impl EphemeralFile {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = self
.file
.read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
Expand Down
13 changes: 7 additions & 6 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,12 +647,13 @@ impl DeltaLayer {
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true);
VirtualFile::open_with_options_async(path, options)
.await
.with_context(|| format!("Failed to open file '{}'", path))?
};
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
Expand Down
23 changes: 12 additions & 11 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,13 @@ impl ImageLayer {
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true);
VirtualFile::open_with_options_async(path, options)
.await
.with_context(|| format!("Failed to open file '{}'", path))?
};
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
Expand Down Expand Up @@ -492,11 +493,11 @@ impl ImageLayerWriterInner {
},
);
info!("new image layer {path}");
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)
.await?;
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create_new(true);
VirtualFile::open_with_options_async(&path, options).await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
Expand Down
Loading

0 comments on commit 65d0acb

Please sign in to comment.