From 3da410c8fee05b0cd65a5c0b83fffa3d5680cd77 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 5 Mar 2024 10:03:54 +0100 Subject: [PATCH] tokio-epoll-uring: use it on the layer-creating code paths (#6378) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit part of #6663 See that epic for more context & related commits. Problem ------- Before this PR, the layer-file-creating code paths were using VirtualFile, but under the hood these were still blocking system calls. Generally this meant we'd stall the executor thread, unless the caller "knew" and used the following pattern instead: ``` spawn_blocking(|| { Handle::block_on(async { VirtualFile::....().await; }) }).await ``` Solution -------- This PR adopts `tokio-epoll-uring` on the layer-file-creating code paths in pageserver. Note that on-demand downloads still use `tokio::fs`, these will be converted in a future PR. Design: Avoiding Regressions With `std-fs` ------------------------------------------ If we make the VirtualFile write path truly async using `tokio-epoll-uring`, should we then remove the `spawn_blocking` + `Handle::block_on` usage upstack in the same commit? No, because if we’re still using the `std-fs` io engine, we’d then block the executor in those places where previously we were protecting us from that through the `spawn_blocking` . So, if we want to see benefits from `tokio-epoll-uring` on the write path while also preserving the ability to switch between `tokio-epoll-uring` and `std-fs` , where `std-fs` will behave identical to what we have now, we need to ***conditionally* use `spawn_blocking + Handle::block_on`** . I.e., in the places where we use that know, we’ll need to make that conditional based on the currently configured io engine. It boils down to investigating all the places where we do `spawn_blocking(... block_on(... VirtualFile::...))`. Detailed [write-up of that investigation in Notion](https://neondatabase.notion.site/Surveying-VirtualFile-write-path-usage-wrt-tokio-epoll-uring-integration-spawn_blocking-Handle-bl-5dc2270dbb764db7b2e60803f375e015?pvs=4 ), made publicly accessible. tl;dr: Preceding PRs addressed the relevant call sites: - `metadata` file: turns out we could simply remove it (#6777, #6769, #6775) - `create_delta_layer()`: made sensitive to `virtual_file_io_engine` in #6986 NB: once we are switched over to `tokio-epoll-uring` everywhere in production, we can deprecate `std-fs`; to keep macOS support, we can use `tokio::fs` instead. That will remove this whole headache. Code Changes In This PR ----------------------- - VirtualFile API changes - `VirtualFile::write_at` - implement an `ioengine` operation and switch `VirtualFile::write_at` to it - `VirtualFile::metadata()` - curiously, we only use it from the layer writers' `finish()` methods - introduce a wrapper `Metadata` enum because `std::fs::Metadata` cannot be constructed by code outside rust std - `VirtualFile::sync_all()` and for completeness sake, add `VirtualFile::sync_data()` Testing & Rollout ----------------- Before merging this PR, we ran the CI with both io engines. Additionally, the changes will soak in staging. We could have a feature gate / add a new io engine `tokio-epoll-uring-write-path` to do a gradual rollout. However, that's not part of this PR. Future Work ----------- There's still some use of `std::fs` and/or `tokio::fs` for directory namespace operations, e.g. `std::fs::rename`. We're not addressing those in this PR, as we'll need to add the support in tokio-epoll-uring first. Note that rename itself is usually fast if the directory is in the kernel dentry cache, and only the fsync after rename is slow. These fsyncs are using tokio-epoll-uring, so, the impact should be small. --- pageserver/src/tenant/blob_io.rs | 14 ++- pageserver/src/tenant/storage_layer/layer.rs | 1 + pageserver/src/tenant/timeline.rs | 78 +++++++------- pageserver/src/virtual_file.rs | 105 ++++++++++++++----- pageserver/src/virtual_file/io_engine.rs | 96 +++++++++++++++-- pageserver/src/virtual_file/metadata.rs | 30 ++++++ 6 files changed, 246 insertions(+), 78 deletions(-) create mode 100644 pageserver/src/virtual_file/metadata.rs diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index ec70bdc67917..0d33100eadba 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -12,7 +12,7 @@ //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use bytes::{BufMut, BytesMut}; -use tokio_epoll_uring::{BoundedBuf, Slice}; +use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; @@ -127,7 +127,7 @@ impl BlobWriter { /// You need to make sure that the internal buffer is empty, otherwise /// data will be written in wrong order. #[inline(always)] - async fn write_all_unbuffered( + async fn write_all_unbuffered, Buf: IoBuf + Send>( &mut self, src_buf: B, ) -> (B::Buf, Result<(), Error>) { @@ -162,7 +162,10 @@ impl BlobWriter { } /// Internal, possibly buffered, write function - async fn write_all(&mut self, src_buf: B) -> (B::Buf, Result<(), Error>) { + async fn write_all, Buf: IoBuf + Send>( + &mut self, + src_buf: B, + ) -> (B::Buf, Result<(), Error>) { if !BUFFERED { assert!(self.buf.is_empty()); return self.write_all_unbuffered(src_buf).await; @@ -210,7 +213,10 @@ impl BlobWriter { /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob(&mut self, srcbuf: B) -> (B::Buf, Result) { + pub async fn write_blob, Buf: IoBuf + Send>( + &mut self, + srcbuf: B, + ) -> (B::Buf, Result) { let offset = self.offset; let len = srcbuf.bytes_init(); diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 247dd1a8e47b..e14a2f22cf1a 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -195,6 +195,7 @@ impl Layer { let downloaded = resident.expect("just initialized"); // if the rename works, the path is as expected + // TODO: sync system call std::fs::rename(temp_path, owner.local_path()) .with_context(|| format!("rename temporary file as correct path for {owner}"))?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 64c324a5c8f8..1f811155f672 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3410,44 +3410,48 @@ impl Timeline { frozen_layer: &Arc, ctx: &RequestContext, ) -> anyhow::Result { - let span = tracing::info_span!("blocking"); - let new_delta: ResidentLayer = tokio::task::spawn_blocking({ - let self_clone = Arc::clone(self); - let frozen_layer = Arc::clone(frozen_layer); - let ctx = ctx.attached_child(); - move || { - Handle::current().block_on( - async move { - let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?; - // The write_to_disk() above calls writer.finish() which already did the fsync of the inodes. - // We just need to fsync the directory in which these inodes are linked, - // which we know to be the timeline directory. - // - // We use fatal_err() below because the after write_to_disk returns with success, - // the in-memory state of the filesystem already has the layer file in its final place, - // and subsequent pageserver code could think it's durable while it really isn't. - let timeline_dir = - VirtualFile::open(&self_clone.conf.timeline_path( - &self_clone.tenant_shard_id, - &self_clone.timeline_id, - )) - .await - .fatal_err("VirtualFile::open for timeline dir fsync"); - timeline_dir - .sync_all() - .await - .fatal_err("VirtualFile::sync_all timeline dir"); - anyhow::Ok(new_delta) - } - .instrument(span), - ) + let self_clone = Arc::clone(self); + let frozen_layer = Arc::clone(frozen_layer); + let ctx = ctx.attached_child(); + let work = async move { + let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?; + // The write_to_disk() above calls writer.finish() which already did the fsync of the inodes. + // We just need to fsync the directory in which these inodes are linked, + // which we know to be the timeline directory. + // + // We use fatal_err() below because the after write_to_disk returns with success, + // the in-memory state of the filesystem already has the layer file in its final place, + // and subsequent pageserver code could think it's durable while it really isn't. + let timeline_dir = VirtualFile::open( + &self_clone + .conf + .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id), + ) + .await + .fatal_err("VirtualFile::open for timeline dir fsync"); + timeline_dir + .sync_all() + .await + .fatal_err("VirtualFile::sync_all timeline dir"); + anyhow::Ok(new_delta) + }; + // Before tokio-epoll-uring, we ran write_to_disk & the sync_all inside spawn_blocking. + // Preserve that behavior to maintain the same behavior for `virtual_file_io_engine=std-fs`. + use crate::virtual_file::io_engine::IoEngine; + match crate::virtual_file::io_engine::get() { + IoEngine::NotSet => panic!("io engine not set"), + IoEngine::StdFs => { + let span = tracing::info_span!("blocking"); + tokio::task::spawn_blocking({ + move || Handle::current().block_on(work.instrument(span)) + }) + .await + .context("spawn_blocking") + .and_then(|x| x) } - }) - .await - .context("spawn_blocking") - .and_then(|x| x)?; - - Ok(new_delta) + #[cfg(target_os = "linux")] + IoEngine::TokioEpollUring => work.await, + } } async fn repartition( diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index b7112108f273..6d4774cf7522 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -17,20 +17,21 @@ use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; -use std::fs::{self, File}; +use std::fs::File; use std::io::{Error, ErrorKind, Seek, SeekFrom}; use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; -use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; pub use pageserver_api::models::virtual_file as api; pub(crate) mod io_engine; +mod metadata; mod open_options; pub(crate) use io_engine::IoEngineKind; +pub(crate) use metadata::Metadata; pub(crate) use open_options::*; /// @@ -435,13 +436,25 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard - .with_std_file(|std_file| std_file.sync_all())) + with_file!(self, StorageIoOperation::Fsync, |file_guard| { + let (_file_guard, res) = io_engine::get().sync_all(file_guard).await; + res + }) } - pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard - .with_std_file(|std_file| std_file.metadata())) + /// Call File::sync_data() on the underlying File. + pub async fn sync_data(&self) -> Result<(), Error> { + with_file!(self, StorageIoOperation::Fsync, |file_guard| { + let (_file_guard, res) = io_engine::get().sync_data(file_guard).await; + res + }) + } + + pub async fn metadata(&self) -> Result { + with_file!(self, StorageIoOperation::Metadata, |file_guard| { + let (_file_guard, res) = io_engine::get().metadata(file_guard).await; + res + }) } /// Helper function internal to `VirtualFile` that looks up the underlying File, @@ -579,7 +592,7 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 - pub async fn write_all_at( + pub async fn write_all_at, Buf: IoBuf + Send>( &self, buf: B, mut offset: u64, @@ -590,8 +603,9 @@ impl VirtualFile { } let mut buf = buf.slice(0..buf_len); while !buf.is_empty() { - // TODO: push `buf` further down - match self.write_at(&buf, offset).await { + let res; + (buf, res) = self.write_at(buf, offset).await; + match res { Ok(0) => { return ( Slice::into_inner(buf), @@ -605,7 +619,7 @@ impl VirtualFile { buf = buf.slice(n..); offset += n as u64; } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {} Err(e) => return (Slice::into_inner(buf), Err(e)), } } @@ -616,15 +630,19 @@ impl VirtualFile { /// Returns the IoBuf that is underlying the BoundedBuf `buf`. /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in. /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant. - pub async fn write_all(&mut self, buf: B) -> (B::Buf, Result) { + pub async fn write_all, Buf: IoBuf + Send>( + &mut self, + buf: B, + ) -> (B::Buf, Result) { let nbytes = buf.bytes_init(); if nbytes == 0 { return (Slice::into_inner(buf.slice_full()), Ok(0)); } let mut buf = buf.slice(0..nbytes); while !buf.is_empty() { - // TODO: push `Slice` further down - match self.write(&buf).await { + let res; + (buf, res) = self.write(buf).await; + match res { Ok(0) => { return ( Slice::into_inner(buf), @@ -644,11 +662,18 @@ impl VirtualFile { (Slice::into_inner(buf), Ok(nbytes)) } - async fn write(&mut self, buf: &[u8]) -> Result { + async fn write( + &mut self, + buf: Slice, + ) -> (Slice, Result) { let pos = self.pos; - let n = self.write_at(buf, pos).await?; + let (buf, res) = self.write_at(buf, pos).await; + let n = match res { + Ok(n) => n, + Err(e) => return (buf, Err(e)), + }; self.pos += n as u64; - Ok(n) + (buf, Ok(n)) } pub(crate) async fn read_at(&self, buf: B, offset: u64) -> (B, Result) @@ -676,16 +701,30 @@ impl VirtualFile { }) } - async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Write, |file_guard| { - file_guard.with_std_file(|std_file| std_file.write_at(buf, offset)) - }); - if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&["write", &self.tenant_id, &self.shard_id, &self.timeline_id]) - .add(size as i64); - } - result + async fn write_at( + &self, + buf: Slice, + offset: u64, + ) -> (Slice, Result) { + let file_guard = match self.lock_file().await { + Ok(file_guard) => file_guard, + Err(e) => return (buf, Err(e)), + }; + observe_duration!(StorageIoOperation::Write, { + let ((_file_guard, buf), result) = + io_engine::get().write_at(file_guard, offset, buf).await; + if let Ok(size) = result { + STORAGE_IO_SIZE + .with_label_values(&[ + "write", + &self.tenant_id, + &self.shard_id, + &self.timeline_id, + ]) + .add(size as i64); + } + (buf, result) + }) } } @@ -1083,6 +1122,7 @@ mod tests { use rand::Rng; use std::future::Future; use std::io::Write; + use std::os::unix::fs::FileExt; use std::sync::Arc; enum MaybeVirtualFile { @@ -1103,7 +1143,11 @@ mod tests { MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), } } - async fn write_all_at(&self, buf: B, offset: u64) -> Result<(), Error> { + async fn write_all_at, Buf: IoBuf + Send>( + &self, + buf: B, + offset: u64, + ) -> Result<(), Error> { match self { MaybeVirtualFile::VirtualFile(file) => { let (_buf, res) = file.write_all_at(buf, offset).await; @@ -1124,7 +1168,10 @@ mod tests { MaybeVirtualFile::File(file) => file.seek(pos), } } - async fn write_all(&mut self, buf: B) -> Result<(), Error> { + async fn write_all, Buf: IoBuf + Send>( + &mut self, + buf: B, + ) -> Result<(), Error> { match self { MaybeVirtualFile::VirtualFile(file) => { let (_buf, res) = file.write_all(buf).await; diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 892affa326ab..1a8cd9f56238 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -7,6 +7,8 @@ //! //! Then use [`get`] and [`super::OpenOptions`]. +use tokio_epoll_uring::{IoBuf, Slice}; + pub(crate) use super::api::IoEngineKind; #[derive(Clone, Copy)] #[repr(u8)] @@ -61,7 +63,8 @@ pub(super) fn init(engine_kind: IoEngineKind) { set(engine_kind); } -pub(super) fn get() -> IoEngine { +/// Longer-term, this API should only be used by [`super::VirtualFile`]. +pub(crate) fn get() -> IoEngine { let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap(); if cfg!(test) { let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE"; @@ -98,7 +101,17 @@ use std::{ sync::atomic::{AtomicU8, Ordering}, }; -use super::FileGuard; +use super::{FileGuard, Metadata}; + +#[cfg(target_os = "linux")] +fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error) -> std::io::Error { + match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + } +} impl IoEngine { pub(super) async fn read_at( @@ -133,16 +146,83 @@ impl IoEngine { IoEngine::TokioEpollUring => { let system = tokio_epoll_uring::thread_local_system().await; let (resources, res) = system.read(file_guard, offset, buf).await; + (resources, res.map_err(epoll_uring_error_to_std)) + } + } + } + pub(super) async fn sync_all(&self, file_guard: FileGuard) -> (FileGuard, std::io::Result<()>) { + match self { + IoEngine::NotSet => panic!("not initialized"), + IoEngine::StdFs => { + let res = file_guard.with_std_file(|std_file| std_file.sync_all()); + (file_guard, res) + } + #[cfg(target_os = "linux")] + IoEngine::TokioEpollUring => { + let system = tokio_epoll_uring::thread_local_system().await; + let (resources, res) = system.fsync(file_guard).await; + (resources, res.map_err(epoll_uring_error_to_std)) + } + } + } + pub(super) async fn sync_data( + &self, + file_guard: FileGuard, + ) -> (FileGuard, std::io::Result<()>) { + match self { + IoEngine::NotSet => panic!("not initialized"), + IoEngine::StdFs => { + let res = file_guard.with_std_file(|std_file| std_file.sync_data()); + (file_guard, res) + } + #[cfg(target_os = "linux")] + IoEngine::TokioEpollUring => { + let system = tokio_epoll_uring::thread_local_system().await; + let (resources, res) = system.fdatasync(file_guard).await; + (resources, res.map_err(epoll_uring_error_to_std)) + } + } + } + pub(super) async fn metadata( + &self, + file_guard: FileGuard, + ) -> (FileGuard, std::io::Result) { + match self { + IoEngine::NotSet => panic!("not initialized"), + IoEngine::StdFs => { + let res = + file_guard.with_std_file(|std_file| std_file.metadata().map(Metadata::from)); + (file_guard, res) + } + #[cfg(target_os = "linux")] + IoEngine::TokioEpollUring => { + let system = tokio_epoll_uring::thread_local_system().await; + let (resources, res) = system.statx(file_guard).await; ( resources, - res.map_err(|e| match e { - tokio_epoll_uring::Error::Op(e) => e, - tokio_epoll_uring::Error::System(system) => { - std::io::Error::new(std::io::ErrorKind::Other, system) - } - }), + res.map_err(epoll_uring_error_to_std).map(Metadata::from), ) } } } + pub(super) async fn write_at( + &self, + file_guard: FileGuard, + offset: u64, + buf: Slice, + ) -> ((FileGuard, Slice), std::io::Result) { + match self { + IoEngine::NotSet => panic!("not initialized"), + IoEngine::StdFs => { + let result = file_guard.with_std_file(|std_file| std_file.write_at(&buf, offset)); + ((file_guard, buf), result) + } + #[cfg(target_os = "linux")] + IoEngine::TokioEpollUring => { + let system = tokio_epoll_uring::thread_local_system().await; + let (resources, res) = system.write(file_guard, offset, buf).await; + (resources, res.map_err(epoll_uring_error_to_std)) + } + } + } } diff --git a/pageserver/src/virtual_file/metadata.rs b/pageserver/src/virtual_file/metadata.rs new file mode 100644 index 000000000000..f530c509883e --- /dev/null +++ b/pageserver/src/virtual_file/metadata.rs @@ -0,0 +1,30 @@ +use std::fs; + +pub enum Metadata { + StdFs(fs::Metadata), + #[cfg(target_os = "linux")] + TokioEpollUring(Box), +} + +#[cfg(target_os = "linux")] +impl From> for Metadata { + fn from(value: Box) -> Self { + Metadata::TokioEpollUring(value) + } +} + +impl From for Metadata { + fn from(value: std::fs::Metadata) -> Self { + Metadata::StdFs(value) + } +} + +impl Metadata { + pub fn len(&self) -> u64 { + match self { + Metadata::StdFs(metadata) => metadata.len(), + #[cfg(target_os = "linux")] + Metadata::TokioEpollUring(statx) => statx.stx_size, + } + } +}