diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 0d33100eadba..ec70bdc67917 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -12,7 +12,7 @@ //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use bytes::{BufMut, BytesMut}; -use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use tokio_epoll_uring::{BoundedBuf, Slice}; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; @@ -127,7 +127,7 @@ impl BlobWriter { /// You need to make sure that the internal buffer is empty, otherwise /// data will be written in wrong order. #[inline(always)] - async fn write_all_unbuffered, Buf: IoBuf + Send>( + async fn write_all_unbuffered( &mut self, src_buf: B, ) -> (B::Buf, Result<(), Error>) { @@ -162,10 +162,7 @@ impl BlobWriter { } /// Internal, possibly buffered, write function - async fn write_all, Buf: IoBuf + Send>( - &mut self, - src_buf: B, - ) -> (B::Buf, Result<(), Error>) { + async fn write_all(&mut self, src_buf: B) -> (B::Buf, Result<(), Error>) { if !BUFFERED { assert!(self.buf.is_empty()); return self.write_all_unbuffered(src_buf).await; @@ -213,10 +210,7 @@ impl BlobWriter { /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob, Buf: IoBuf + Send>( - &mut self, - srcbuf: B, - ) -> (B::Buf, Result) { + pub async fn write_blob(&mut self, srcbuf: B) -> (B::Buf, Result) { let offset = self.offset; let len = srcbuf.bytes_init(); diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 959065bc4c41..30bfe1003df7 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -195,7 +195,6 @@ impl Layer { let downloaded = resident.expect("just initialized"); // if the rename works, the path is as expected - // TODO: sync system call std::fs::rename(temp_path, owner.local_path()) .with_context(|| format!("rename temporary file as correct path for {owner}"))?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a733a3b1a7f5..14988cbc8077 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3256,48 +3256,44 @@ impl Timeline { frozen_layer: &Arc, ctx: &RequestContext, ) -> anyhow::Result { - let self_clone = Arc::clone(self); - let frozen_layer = Arc::clone(frozen_layer); - let ctx = ctx.attached_child(); - let work = async move { - let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?; - // The write_to_disk() above calls writer.finish() which already did the fsync of the inodes. - // We just need to fsync the directory in which these inodes are linked, - // which we know to be the timeline directory. - // - // We use fatal_err() below because the after write_to_disk returns with success, - // the in-memory state of the filesystem already has the layer file in its final place, - // and subsequent pageserver code could think it's durable while it really isn't. - let timeline_dir = VirtualFile::open( - &self_clone - .conf - .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id), - ) - .await - .fatal_err("VirtualFile::open for timeline dir fsync"); - timeline_dir - .sync_all() - .await - .fatal_err("VirtualFile::sync_all timeline dir"); - anyhow::Ok(new_delta) - }; - // Before tokio-epoll-uring, we ran write_to_disk & the sync_all inside spawn_blocking. - // Preserve that behavior to maintain the same behavior for `virtual_file_io_engine=std-fs`. - use crate::virtual_file::io_engine::IoEngine; - match crate::virtual_file::io_engine::get() { - IoEngine::NotSet => panic!("io engine not set"), - IoEngine::StdFs => { - let span = tracing::info_span!("blocking"); - tokio::task::spawn_blocking({ - move || Handle::current().block_on(work.instrument(span)) - }) - .await - .context("spawn_blocking") - .and_then(|x| x) + let span = tracing::info_span!("blocking"); + let new_delta: ResidentLayer = tokio::task::spawn_blocking({ + let self_clone = Arc::clone(self); + let frozen_layer = Arc::clone(frozen_layer); + let ctx = ctx.attached_child(); + move || { + Handle::current().block_on( + async move { + let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?; + // The write_to_disk() above calls writer.finish() which already did the fsync of the inodes. + // We just need to fsync the directory in which these inodes are linked, + // which we know to be the timeline directory. + // + // We use fatal_err() below because the after write_to_disk returns with success, + // the in-memory state of the filesystem already has the layer file in its final place, + // and subsequent pageserver code could think it's durable while it really isn't. + let timeline_dir = + VirtualFile::open(&self_clone.conf.timeline_path( + &self_clone.tenant_shard_id, + &self_clone.timeline_id, + )) + .await + .fatal_err("VirtualFile::open for timeline dir fsync"); + timeline_dir + .sync_all() + .await + .fatal_err("VirtualFile::sync_all timeline dir"); + anyhow::Ok(new_delta) + } + .instrument(span), + ) } - #[cfg(target_os = "linux")] - IoEngine::TokioEpollUring => work.await, - } + }) + .await + .context("spawn_blocking") + .and_then(|x| x)?; + + Ok(new_delta) } async fn repartition( diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 6d4774cf7522..b7112108f273 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -17,21 +17,20 @@ use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; -use std::fs::File; +use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; +use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; pub use pageserver_api::models::virtual_file as api; pub(crate) mod io_engine; -mod metadata; mod open_options; pub(crate) use io_engine::IoEngineKind; -pub(crate) use metadata::Metadata; pub(crate) use open_options::*; /// @@ -436,25 +435,13 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, |file_guard| { - let (_file_guard, res) = io_engine::get().sync_all(file_guard).await; - res - }) + with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard + .with_std_file(|std_file| std_file.sync_all())) } - /// Call File::sync_data() on the underlying File. - pub async fn sync_data(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, |file_guard| { - let (_file_guard, res) = io_engine::get().sync_data(file_guard).await; - res - }) - } - - pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Metadata, |file_guard| { - let (_file_guard, res) = io_engine::get().metadata(file_guard).await; - res - }) + pub async fn metadata(&self) -> Result { + with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard + .with_std_file(|std_file| std_file.metadata())) } /// Helper function internal to `VirtualFile` that looks up the underlying File, @@ -592,7 +579,7 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 - pub async fn write_all_at, Buf: IoBuf + Send>( + pub async fn write_all_at( &self, buf: B, mut offset: u64, @@ -603,9 +590,8 @@ impl VirtualFile { } let mut buf = buf.slice(0..buf_len); while !buf.is_empty() { - let res; - (buf, res) = self.write_at(buf, offset).await; - match res { + // TODO: push `buf` further down + match self.write_at(&buf, offset).await { Ok(0) => { return ( Slice::into_inner(buf), @@ -619,7 +605,7 @@ impl VirtualFile { buf = buf.slice(n..); offset += n as u64; } - Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} Err(e) => return (Slice::into_inner(buf), Err(e)), } } @@ -630,19 +616,15 @@ impl VirtualFile { /// Returns the IoBuf that is underlying the BoundedBuf `buf`. /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in. /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant. - pub async fn write_all, Buf: IoBuf + Send>( - &mut self, - buf: B, - ) -> (B::Buf, Result) { + pub async fn write_all(&mut self, buf: B) -> (B::Buf, Result) { let nbytes = buf.bytes_init(); if nbytes == 0 { return (Slice::into_inner(buf.slice_full()), Ok(0)); } let mut buf = buf.slice(0..nbytes); while !buf.is_empty() { - let res; - (buf, res) = self.write(buf).await; - match res { + // TODO: push `Slice` further down + match self.write(&buf).await { Ok(0) => { return ( Slice::into_inner(buf), @@ -662,18 +644,11 @@ impl VirtualFile { (Slice::into_inner(buf), Ok(nbytes)) } - async fn write( - &mut self, - buf: Slice, - ) -> (Slice, Result) { + async fn write(&mut self, buf: &[u8]) -> Result { let pos = self.pos; - let (buf, res) = self.write_at(buf, pos).await; - let n = match res { - Ok(n) => n, - Err(e) => return (buf, Err(e)), - }; + let n = self.write_at(buf, pos).await?; self.pos += n as u64; - (buf, Ok(n)) + Ok(n) } pub(crate) async fn read_at(&self, buf: B, offset: u64) -> (B, Result) @@ -701,30 +676,16 @@ impl VirtualFile { }) } - async fn write_at( - &self, - buf: Slice, - offset: u64, - ) -> (Slice, Result) { - let file_guard = match self.lock_file().await { - Ok(file_guard) => file_guard, - Err(e) => return (buf, Err(e)), - }; - observe_duration!(StorageIoOperation::Write, { - let ((_file_guard, buf), result) = - io_engine::get().write_at(file_guard, offset, buf).await; - if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&[ - "write", - &self.tenant_id, - &self.shard_id, - &self.timeline_id, - ]) - .add(size as i64); - } - (buf, result) - }) + async fn write_at(&self, buf: &[u8], offset: u64) -> Result { + let result = with_file!(self, StorageIoOperation::Write, |file_guard| { + file_guard.with_std_file(|std_file| std_file.write_at(buf, offset)) + }); + if let Ok(size) = result { + STORAGE_IO_SIZE + .with_label_values(&["write", &self.tenant_id, &self.shard_id, &self.timeline_id]) + .add(size as i64); + } + result } } @@ -1122,7 +1083,6 @@ mod tests { use rand::Rng; use std::future::Future; use std::io::Write; - use std::os::unix::fs::FileExt; use std::sync::Arc; enum MaybeVirtualFile { @@ -1143,11 +1103,7 @@ mod tests { MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), } } - async fn write_all_at, Buf: IoBuf + Send>( - &self, - buf: B, - offset: u64, - ) -> Result<(), Error> { + async fn write_all_at(&self, buf: B, offset: u64) -> Result<(), Error> { match self { MaybeVirtualFile::VirtualFile(file) => { let (_buf, res) = file.write_all_at(buf, offset).await; @@ -1168,10 +1124,7 @@ mod tests { MaybeVirtualFile::File(file) => file.seek(pos), } } - async fn write_all, Buf: IoBuf + Send>( - &mut self, - buf: B, - ) -> Result<(), Error> { + async fn write_all(&mut self, buf: B) -> Result<(), Error> { match self { MaybeVirtualFile::VirtualFile(file) => { let (_buf, res) = file.write_all(buf).await; diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index e369d2871100..9a512b97bef0 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -64,7 +64,6 @@ pub(super) fn init(engine_kind: IoEngineKind) { set(engine_kind); } -/// Longer-term, this API should only be used by [`super::VirtualFile`]. pub(crate) fn get() -> IoEngine { let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap(); if cfg!(test) { @@ -102,17 +101,7 @@ use std::{ sync::atomic::{AtomicU8, Ordering}, }; -use super::{FileGuard, Metadata}; - -#[cfg(target_os = "linux")] -fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error) -> std::io::Error { - match e { - tokio_epoll_uring::Error::Op(e) => e, - tokio_epoll_uring::Error::System(system) => { - std::io::Error::new(std::io::ErrorKind::Other, system) - } - } -} +use super::FileGuard; impl IoEngine { pub(super) async fn read_at( @@ -147,85 +136,18 @@ impl IoEngine { IoEngine::TokioEpollUring => { let system = tokio_epoll_uring::thread_local_system().await; let (resources, res) = system.read(file_guard, offset, buf).await; - (resources, res.map_err(epoll_uring_error_to_std)) - } - } - } - pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) { - match self { - IoEngine::NotSet => panic!("not initialized"), - IoEngine::StdFs => { - let res = file_guard.with_std_file(|std_file| std_file.sync_all()); - (file_guard, res) - } - #[cfg(target_os = "linux")] - IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; - let (resources, res) = system.fsync(file_guard).await; - (resources, res.map_err(epoll_uring_error_to_std)) - } - } - } - pub(super) async fn sync_data( - &self, - file_guard: FileGuard, - ) -> (FileGuard, std::io::Result<()>) { - match self { - IoEngine::NotSet => panic!("not initialized"), - IoEngine::StdFs => { - let res = file_guard.with_std_file(|std_file| std_file.sync_data()); - (file_guard, res) - } - #[cfg(target_os = "linux")] - IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; - let (resources, res) = system.fdatasync(file_guard).await; - (resources, res.map_err(epoll_uring_error_to_std)) - } - } - } - pub(super) async fn metadata( - &self, - file_guard: FileGuard, - ) -> (FileGuard, std::io::Result) { - match self { - IoEngine::NotSet => panic!("not initialized"), - IoEngine::StdFs => { - let res = - file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from)); - (file_guard, res) - } - #[cfg(target_os = "linux")] - IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; - let (resources, res) = system.statx(file_guard).await; ( resources, - res.map_err(epoll_uring_error_to_std).map(Metadata::from), + res.map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + }), ) } } } - pub(super) async fn write_at( - &self, - file_guard: FileGuard, - offset: u64, - buf: Slice, - ) -> ((FileGuard, Slice), std::io::Result) { - match self { - IoEngine::NotSet => panic!("not initialized"), - IoEngine::StdFs => { - let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset)); - ((file_guard, buf), result) - } - #[cfg(target_os = "linux")] - IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; - let (resources, res) = system.write(file_guard, offset, buf).await; - (resources, res.map_err(epoll_uring_error_to_std)) - } - } - } /// If we switch a user of [`tokio::fs`] to use [`super::io_engine`], /// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured diff --git a/pageserver/src/virtual_file/metadata.rs b/pageserver/src/virtual_file/metadata.rs deleted file mode 100644 index f530c509883e..000000000000 --- a/pageserver/src/virtual_file/metadata.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::fs; - -pub enum Metadata { - StdFs(fs::Metadata), - #[cfg(target_os = "linux")] - TokioEpollUring(Box), -} - -#[cfg(target_os = "linux")] -impl From> for Metadata { - fn from(value: Box) -> Self { - Metadata::TokioEpollUring(value) - } -} - -impl From for Metadata { - fn from(value: std::fs::Metadata) -> Self { - Metadata::StdFs(value) - } -} - -impl Metadata { - pub fn len(&self) -> u64 { - match self { - Metadata::StdFs(metadata) => metadata.len(), - #[cfg(target_os = "linux")] - Metadata::TokioEpollUring(statx) => statx.stx_size, - } - } -}