Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(virtual_file) make write_all_at take owned buffers #6673

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9a4880e
WIP
problame Feb 7, 2024
b5a00b0
refactor(disk_btree): make BlockWriter::write_blk infallible
problame Feb 7, 2024
7ba1949
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/btree-b…
problame Feb 7, 2024
91d3e25
finish
problame Feb 7, 2024
6f65648
Revert "refactor(disk_btree): make BlockWriter::write_blk infallible"
problame Feb 7, 2024
85d5fc6
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
edabce6
use right branch
problame Feb 7, 2024
e5d15df
WIP
problame Feb 7, 2024
2a39457
WIP
problame Feb 7, 2024
4659794
WIP
problame Feb 7, 2024
14bdd84
it turns out one wants to take BoundedBuf, not Slice<T>
problame Feb 7, 2024
4fe3b49
make tests pass
problame Feb 7, 2024
a6605a1
pull bunch of changes down
problame Feb 7, 2024
c92e8a7
don't pull that in
problame Feb 7, 2024
b0144e2
update lib
problame Feb 7, 2024
54561a8
fixup
problame Feb 7, 2024
6c47083
we can't use impl IoBuf for Array
problame Feb 7, 2024
238296a
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
207764b
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
e5da261
work around BoundedBuf.slice(0..x) panicking for x == 0
problame Feb 7, 2024
26e51c7
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
6f4d182
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
33f3053
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Feb 7, 2024
33261e4
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
720f633
refactor(virtual_file) make write_all_at take owned buffers
problame Feb 7, 2024
8998178
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 12, 2024
98fe109
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/virtual…
problame Feb 12, 2024
87f05ca
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 13, 2024
ebffde1
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 14, 2024
ab51748
formatting
problame Feb 14, 2024
6a55ac7
complete comment; https://github.com/neondatabase/neon/pull/6673#disc…
problame Feb 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::{self, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use std::cmp::min;
Expand All @@ -26,7 +27,10 @@ pub struct EphemeralFile {
/// An ephemeral file is append-only.
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
/// The other pages, which can no longer be modified, are accessed through the page cache.
mutable_tail: [u8; PAGE_SZ],
///
/// None <=> IO is ongoing.
/// Size is fixed to PAGE_SZ at creation time and must not be changed.
mutable_tail: Option<BytesMut>,
}

impl EphemeralFile {
Expand Down Expand Up @@ -60,7 +64,7 @@ impl EphemeralFile {
_timeline_id: timeline_id,
file,
len: 0,
mutable_tail: [0u8; PAGE_SZ],
mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
})
}

Expand Down Expand Up @@ -103,7 +107,13 @@ impl EphemeralFile {
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
Ok(BlockLease::EphemeralFileMutableTail(
self.mutable_tail
.as_deref()
.expect("we're not doing IO, it must be Some()")
.try_into()
.expect("we ensure that it's always PAGE_SZ"),
))
}
}

Expand Down Expand Up @@ -135,21 +145,27 @@ impl EphemeralFile {
) -> Result<(), io::Error> {
let mut src_remaining = src;
while !src_remaining.is_empty() {
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
let dst_remaining = &mut self
.ephemeral_file
.mutable_tail
.as_deref_mut()
.expect("IO is not yet ongoing")[self.off..];
let n = min(dst_remaining.len(), src_remaining.len());
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
match self
let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
.expect("IO is not yet ongoing");
let (mutable_tail, res) = self
.ephemeral_file
.file
.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
)
.await
{
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
.await;
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
// I.e., the IO isn't retryable if we panic.
self.ephemeral_file.mutable_tail = Some(mutable_tail);
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
match res {
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
Expand All @@ -169,7 +185,12 @@ impl EphemeralFile {
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
buf.copy_from_slice(
self.ephemeral_file
.mutable_tail
.as_deref()
.expect("IO is not ongoing"),
);
let _ = write_guard.mark_valid();
// pre-warm successful
}
Expand All @@ -181,7 +202,11 @@ impl EphemeralFile {
// Zero the buffer for re-use.
// Zeroing is critical for correcntess because the write_blob code below
// and similarly read_blk expect zeroed pages.
self.ephemeral_file.mutable_tail.fill(0);
self.ephemeral_file
.mutable_tail
.as_deref_mut()
.expect("IO is not ongoing")
.fill(0);
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;
Expand Down
50 changes: 36 additions & 14 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,24 +582,37 @@ impl VirtualFile {
}

// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
pub async fn write_all_at<B: BoundedBuf>(
&self,
buf: B,
mut offset: u64,
) -> (B::Buf, Result<(), Error>) {
problame marked this conversation as resolved.
Show resolved Hide resolved
let buf_len = buf.bytes_init();
if buf_len == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(()));
}
let mut buf = buf.slice(0..buf_len);
while !buf.is_empty() {
match self.write_at(buf, offset).await {
// TODO: push `buf` further down
match self.write_at(&buf, offset).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
return (
Slice::into_inner(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
}
Ok(n) => {
buf = &buf[n..];
buf = buf.slice(n..);
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
Err(e) => return (Slice::into_inner(buf), Err(e)),
}
}
Ok(())
(Slice::into_inner(buf), Ok(()))
}

/// Writes `buf.slice(0..buf.bytes_init())`.
Expand Down Expand Up @@ -1064,10 +1077,19 @@ mod tests {
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
async fn write_all_at<B: BoundedBuf>(&self, buf: B, offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all_at(buf, offset).await;
res
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all_at(&buf.slice(0..buf_len), offset)
}
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
Expand Down Expand Up @@ -1214,8 +1236,8 @@ mod tests {
.to_owned(),
)
.await?;
file_b.write_all_at(b"BAR", 3).await?;
file_b.write_all_at(b"FOO", 0).await?;
file_b.write_all_at(b"BAR".to_vec(), 3).await?;
file_b.write_all_at(b"FOO".to_vec(), 0).await?;

assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");

Expand Down
Loading