Skip to content

Commit

Permalink
Revert "tokio-epoll-uring: use it on the layer-creating code paths (#…
Browse files Browse the repository at this point in the history
…6378)"

This reverts commit 3da410c.
  • Loading branch information
problame committed Mar 12, 2024
1 parent bac06ea commit 9876045
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 243 deletions.
14 changes: 4 additions & 10 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use bytes::{BufMut, BytesMut};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tokio_epoll_uring::{BoundedBuf, Slice};

use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// You need to make sure that the internal buffer is empty, otherwise
/// data will be written in wrong order.
#[inline(always)]
async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
async fn write_all_unbuffered<B: BoundedBuf>(
&mut self,
src_buf: B,
) -> (B::Buf, Result<(), Error>) {
Expand Down Expand Up @@ -162,10 +162,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}

/// Internal, possibly buffered, write function
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
) -> (B::Buf, Result<(), Error>) {
async fn write_all<B: BoundedBuf>(&mut self, src_buf: B) -> (B::Buf, Result<(), Error>) {
if !BUFFERED {
assert!(self.buf.is_empty());
return self.write_all_unbuffered(src_buf).await;
Expand Down Expand Up @@ -213,10 +210,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {

/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
) -> (B::Buf, Result<u64, Error>) {
pub async fn write_blob<B: BoundedBuf>(&mut self, srcbuf: B) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;

let len = srcbuf.bytes_init();
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ impl Layer {
let downloaded = resident.expect("just initialized");

// if the rename works, the path is as expected
// TODO: sync system call
std::fs::rename(temp_path, owner.local_path())
.with_context(|| format!("rename temporary file as correct path for {owner}"))?;

Expand Down
78 changes: 37 additions & 41 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3256,48 +3256,44 @@ impl Timeline {
frozen_layer: &Arc<InMemoryLayer>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
let work = async move {
let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?;
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
//
// We use fatal_err() below because the after write_to_disk returns with success,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let timeline_dir = VirtualFile::open(
&self_clone
.conf
.timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id),
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
anyhow::Ok(new_delta)
};
// Before tokio-epoll-uring, we ran write_to_disk & the sync_all inside spawn_blocking.
// Preserve that behavior to maintain the same behavior for `virtual_file_io_engine=std-fs`.
use crate::virtual_file::io_engine::IoEngine;
match crate::virtual_file::io_engine::get() {
IoEngine::NotSet => panic!("io engine not set"),
IoEngine::StdFs => {
let span = tracing::info_span!("blocking");
tokio::task::spawn_blocking({
move || Handle::current().block_on(work.instrument(span))
})
.await
.context("spawn_blocking")
.and_then(|x| x)
let span = tracing::info_span!("blocking");
let new_delta: ResidentLayer = tokio::task::spawn_blocking({
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
move || {
Handle::current().block_on(
async move {
let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?;
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
//
// We use fatal_err() below because the after write_to_disk returns with success,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let timeline_dir =
VirtualFile::open(&self_clone.conf.timeline_path(
&self_clone.tenant_shard_id,
&self_clone.timeline_id,
))
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
anyhow::Ok(new_delta)
}
.instrument(span),
)
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => work.await,
}
})
.await
.context("spawn_blocking")
.and_then(|x| x)?;

Ok(new_delta)
}

async fn repartition(
Expand Down
105 changes: 29 additions & 76 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::File;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};

use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;

pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
mod metadata;
mod open_options;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;

///
Expand Down Expand Up @@ -436,25 +435,13 @@ impl VirtualFile {

/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file_guard| {
let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
res
})
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
.with_std_file(|std_file| std_file.sync_all()))
}

/// Call File::sync_data() on the underlying File.
pub async fn sync_data(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file_guard| {
let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
res
})
}

pub async fn metadata(&self) -> Result<Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file_guard| {
let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
res
})
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
.with_std_file(|std_file| std_file.metadata()))
}

/// Helper function internal to `VirtualFile` that looks up the underlying File,
Expand Down Expand Up @@ -592,7 +579,7 @@ impl VirtualFile {
}

// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
pub async fn write_all_at<B: BoundedBuf>(
&self,
buf: B,
mut offset: u64,
Expand All @@ -603,9 +590,8 @@ impl VirtualFile {
}
let mut buf = buf.slice(0..buf_len);
while !buf.is_empty() {
let res;
(buf, res) = self.write_at(buf, offset).await;
match res {
// TODO: push `buf` further down
match self.write_at(&buf, offset).await {
Ok(0) => {
return (
Slice::into_inner(buf),
Expand All @@ -619,7 +605,7 @@ impl VirtualFile {
buf = buf.slice(n..);
offset += n as u64;
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (Slice::into_inner(buf), Err(e)),
}
}
Expand All @@ -630,19 +616,15 @@ impl VirtualFile {
/// Returns the IoBuf that is underlying the BoundedBuf `buf`.
/// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
/// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> (B::Buf, Result<usize, Error>) {
pub async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(0));
}
let mut buf = buf.slice(0..nbytes);
while !buf.is_empty() {
let res;
(buf, res) = self.write(buf).await;
match res {
// TODO: push `Slice` further down
match self.write(&buf).await {
Ok(0) => {
return (
Slice::into_inner(buf),
Expand All @@ -662,18 +644,11 @@ impl VirtualFile {
(Slice::into_inner(buf), Ok(nbytes))
}

async fn write<B: IoBuf + Send>(
&mut self,
buf: Slice<B>,
) -> (Slice<B>, Result<usize, std::io::Error>) {
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let (buf, res) = self.write_at(buf, pos).await;
let n = match res {
Ok(n) => n,
Err(e) => return (buf, Err(e)),
};
let n = self.write_at(buf, pos).await?;
self.pos += n as u64;
(buf, Ok(n))
Ok(n)
}

pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
Expand Down Expand Up @@ -701,30 +676,16 @@ impl VirtualFile {
})
}

async fn write_at<B: IoBuf + Send>(
&self,
buf: Slice<B>,
offset: u64,
) -> (Slice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
observe_duration!(StorageIoOperation::Write, {
let ((_file_guard, buf), result) =
io_engine::get().write_at(file_guard, offset, buf).await;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&[
"write",
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
])
.add(size as i64);
}
(buf, result)
})
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
});
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.shard_id, &self.timeline_id])
.add(size as i64);
}
result
}
}

Expand Down Expand Up @@ -1122,7 +1083,6 @@ mod tests {
use rand::Rng;
use std::future::Future;
use std::io::Write;
use std::os::unix::fs::FileExt;
use std::sync::Arc;

enum MaybeVirtualFile {
Expand All @@ -1143,11 +1103,7 @@ mod tests {
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&self,
buf: B,
offset: u64,
) -> Result<(), Error> {
async fn write_all_at<B: BoundedBuf>(&self, buf: B, offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all_at(buf, offset).await;
Expand All @@ -1168,10 +1124,7 @@ mod tests {
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> Result<(), Error> {
async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf).await;
Expand Down
Loading

0 comments on commit 9876045

Please sign in to comment.