Skip to content

Commit

Permalink
pull in WIP: integrate tokio-epoll-uring #5824
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 5ec61ce
Author: Christian Schwarz <[email protected]>
Date:   Wed Nov 29 16:17:12 2023 +0000

    bump

commit 34c33d1
Author: Christian Schwarz <[email protected]>
Date:   Mon Nov 20 14:38:29 2023 +0000

    bump

commit 8fa6b76
Author: Christian Schwarz <[email protected]>
Date:   Mon Nov 20 11:47:19 2023 +0000

    bump

commit 6c359a4
Author: Christian Schwarz <[email protected]>
Date:   Mon Nov 20 11:33:58 2023 +0000

    use neondatabase/tokio-epoll-uring#25

commit 7d484b0
Author: Christian Schwarz <[email protected]>
Date:   Tue Aug 29 19:13:38 2023 +0000

    use WIP tokio_epoll_uring open_at for async VirtualFile::open

    This makes Delta/Image ::load fns fully tokio-epoll-uring

commit 51b26b1
Author: Christian Schwarz <[email protected]>
Date:   Tue Aug 29 12:24:30 2023 +0000

    use `tokio_epoll_uring` for read path

commit a4e6f0c
Author: Christian Schwarz <[email protected]>
Date:   Wed Nov 8 12:36:34 2023 +0000

    Revert "revert recent VirtualFile asyncification changes (#5291)"

    This reverts commit ab1f37e.

    fixes #5479
  • Loading branch information
problame committed Nov 29, 2023
1 parent 0608411 commit cfeeda4
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 136 deletions.
56 changes: 46 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/25
#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }

[dev-dependencies]
criterion.workspace = true
Expand Down
14 changes: 9 additions & 5 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;

/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
Expand Down Expand Up @@ -169,7 +169,11 @@ impl FileBlockReader {
}

/// Read a page from the underlying file into given buffer.
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
async fn fill_buffer(
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
Expand All @@ -196,9 +200,9 @@ impl FileBlockReader {
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
let write_guard = self.fill_buffer(write_guard, blknum).await?;
Ok(write_guard.mark_valid().into())
}
}
Expand Down
20 changes: 9 additions & 11 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use camino::Utf8PathBuf;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -44,11 +43,11 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));

let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)
.await?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true).create(true);
VirtualFile::open_with_options_async(&filename, options).await?
};

Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
Expand Down Expand Up @@ -88,11 +87,10 @@ impl EphemeralFile {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = self
.file
.read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
Expand Down
10 changes: 5 additions & 5 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,11 @@ impl ImageLayerWriterInner {
},
);
info!("new image layer {path}");
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)
.await?;
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create_new(true);
VirtualFile::open_with_options_async(&path, options).await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
Expand Down
Loading

0 comments on commit cfeeda4

Please sign in to comment.