From 49d5e56c084f1fd694cde75d56c2d8ed9049c06e Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Mon, 21 Oct 2024 11:01:25 -0400 Subject: [PATCH] pageserver: use direct IO for delta and image layer reads (#9326) Part of #8130 ## Problem Pageserver previously goes through the kernel page cache for all the IOs. The kernel page cache makes light-loaded pageserver have deceptive fast performance. Using direct IO would offer predictable latencies of our virtual file IO operations. In particular for reads, the data pages also have an extremely low temporal locality because the most frequently accessed pages are cached on the compute side. ## Summary of changes This PR enables pageserver to use direct IO for delta layer and image layer reads. We can ship them separately because these layers are write-once, read-many, so we will not be mixing buffered IO with direct IO. - implement `IoBufferMut`, an buffer type with aligned allocation (currently set to 512). - use `IoBufferMut` at all places we are doing reads on image + delta layers. - leverage Rust type system and use `IoBufAlignedMut` marker trait to guarantee that the input buffers for the IO operations are aligned. - page cache allocation is also made aligned. _* in-memory layer reads and the write path will be shipped separately._ ## Testing Integration test suite run with O_DIRECT enabled: https://github.com/neondatabase/neon/pull/9350 ## Performance We evaluated performance based on the `get-page-at-latest-lsn` benchmark. The results demonstrate a decrease in the number of IOps, no sigificant change in the latency mean, and an slight improvement on the p99.9 and p99.99 latencies. [Benchmark](https://www.notion.so/neondatabase/Benchmark-O_DIRECT-for-image-and-delta-layers-2024-10-01-112f189e00478092a195ea5a0137e706?pvs=4) ## Rollout We will add `virtual_file_io_mode=direct` region by region to enable direct IO on image + delta layers. Signed-off-by: Yuchen Liang --- pageserver/benches/bench_ingest.rs | 6 +- pageserver/ctl/src/layer_map_analyzer.rs | 7 +- pageserver/ctl/src/layers.rs | 13 +- pageserver/ctl/src/main.rs | 8 +- pageserver/src/bin/pageserver.rs | 6 +- pageserver/src/page_cache.rs | 16 +- pageserver/src/tenant/block_io.rs | 6 +- pageserver/src/tenant/ephemeral_file.rs | 28 +- .../src/tenant/storage_layer/delta_layer.rs | 15 +- .../src/tenant/storage_layer/image_layer.rs | 19 +- .../tenant/storage_layer/inmemory_layer.rs | 8 +- .../inmemory_layer/vectored_dio_read.rs | 23 +- pageserver/src/tenant/vectored_blob_io.rs | 9 +- pageserver/src/virtual_file.rs | 40 +- .../owned_buffers_io/aligned_buffer.rs | 9 + .../aligned_buffer/alignment.rs | 26 ++ .../owned_buffers_io/aligned_buffer/buffer.rs | 124 +++++++ .../aligned_buffer/buffer_mut.rs | 347 ++++++++++++++++++ .../owned_buffers_io/aligned_buffer/raw.rs | 216 +++++++++++ .../owned_buffers_io/aligned_buffer/slice.rs | 40 ++ .../owned_buffers_io/io_buf_aligned.rs | 9 + .../owned_buffers_io/io_buf_ext.rs | 3 + 22 files changed, 899 insertions(+), 79 deletions(-) create mode 100644 pageserver/src/virtual_file/owned_buffers_io/aligned_buffer.rs create mode 100644 pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs create mode 100644 pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs create mode 100644 pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs create mode 100644 pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs create mode 100644 pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/slice.rs create mode 100644 pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 821c8008a950..d98b23acced5 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -164,7 +164,11 @@ fn criterion_benchmark(c: &mut Criterion) { let conf: &'static PageServerConf = Box::leak(Box::new( pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()), )); - virtual_file::init(16384, virtual_file::io_engine_for_bench()); + virtual_file::init( + 16384, + virtual_file::io_engine_for_bench(), + conf.virtual_file_io_mode, + ); page_cache::init(conf.page_cache_size); { diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 151b94cf62d3..7dd2a5d05cbf 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -7,6 +7,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use pageserver::context::{DownloadBehavior, RequestContext}; use pageserver::task_mgr::TaskKind; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; +use pageserver::virtual_file::api::IoMode; use std::cmp::Ordering; use std::collections::BinaryHeap; use std::ops::Range; @@ -152,7 +153,11 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. - pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + IoMode::preferred(), + ); pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index fd948bf2efed..c0b2b6ae890a 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -11,6 +11,7 @@ use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary}; use pageserver::tenant::storage_layer::{delta_layer, image_layer}; use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer}; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; +use pageserver::virtual_file::api::IoMode; use pageserver::{page_cache, virtual_file}; use pageserver::{ repository::{Key, KEY_SIZE}, @@ -59,7 +60,11 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); - virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + IoMode::preferred(), + ); page_cache::init(100); let file = VirtualFile::open(path, ctx).await?; let file_id = page_cache::next_file_id(); @@ -190,7 +195,11 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { new_tenant_id, new_timeline_id, } => { - pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + IoMode::preferred(), + ); pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index c96664d346cb..f506caec5b06 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -24,7 +24,7 @@ use pageserver::{ page_cache, task_mgr::TaskKind, tenant::{dump_layerfile_from_path, metadata::TimelineMetadata}, - virtual_file, + virtual_file::{self, api::IoMode}, }; use pageserver_api::shard::TenantShardId; use postgres_ffi::ControlFileData; @@ -205,7 +205,11 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup - virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + IoMode::preferred(), + ); page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index f71a3d26531c..c6659345f94c 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -167,7 +167,11 @@ fn main() -> anyhow::Result<()> { let scenario = failpoint_support::init(); // Basic initialization of things that don't change after startup - virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine); + virtual_file::init( + conf.max_file_descriptors, + conf.virtual_file_io_engine, + conf.virtual_file_io_mode, + ); page_cache::init(conf.page_cache_size); start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index f386c825b848..45bf02362aa9 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -82,6 +82,7 @@ use once_cell::sync::OnceCell; use crate::{ context::RequestContext, metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics}, + virtual_file::{IoBufferMut, IoPageSlice}, }; static PAGE_CACHE: OnceCell = OnceCell::new(); @@ -144,7 +145,7 @@ struct SlotInner { key: Option, // for `coalesce_readers_permit` permit: std::sync::Mutex>, - buf: &'static mut [u8; PAGE_SZ], + buf: IoPageSlice<'static>, } impl Slot { @@ -234,13 +235,13 @@ impl std::ops::Deref for PageReadGuard<'_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.slot_guard.buf + self.slot_guard.buf.deref() } } impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { fn as_ref(&self) -> &[u8; PAGE_SZ] { - self.slot_guard.buf + self.slot_guard.buf.as_ref() } } @@ -266,7 +267,7 @@ enum PageWriteGuardState<'i> { impl std::ops::DerefMut for PageWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { match &mut self.state { - PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(), PageWriteGuardState::Downgraded => unreachable!(), } } @@ -277,7 +278,7 @@ impl std::ops::Deref for PageWriteGuard<'_> { fn deref(&self) -> &Self::Target { match &self.state { - PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(), PageWriteGuardState::Downgraded => unreachable!(), } } @@ -643,7 +644,7 @@ impl PageCache { // We could use Vec::leak here, but that potentially also leaks // uninitialized reserved capacity. With into_boxed_slice and Box::leak // this is avoided. - let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice()); + let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak(); let size_metrics = &crate::metrics::PAGE_CACHE_SIZE; size_metrics.max_bytes.set_page_sz(num_pages); @@ -652,7 +653,8 @@ impl PageCache { let slots = page_buffer .chunks_exact_mut(PAGE_SZ) .map(|chunk| { - let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); + // SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned. + let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) }; Slot { inner: tokio::sync::RwLock::new(SlotInner { diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 1c82e5454de9..2bd7f2d619aa 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,6 +5,8 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; +#[cfg(test)] +use crate::virtual_file::IoBufferMut; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::Deref; @@ -40,7 +42,7 @@ pub enum BlockLease<'a> { #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), #[cfg(test)] - Vec(Vec), + IoBufferMut(IoBufferMut), } impl From> for BlockLease<'static> { @@ -67,7 +69,7 @@ impl Deref for BlockLease<'_> { #[cfg(test)] BlockLease::Arc(v) => v.deref(), #[cfg(test)] - BlockLease::Vec(v) => { + BlockLease::IoBufferMut(v) => { TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ") } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index a62a47f9a760..de0abab4c0c7 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -6,10 +6,11 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; +use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; use crate::virtual_file::owned_buffers_io::write::Buffer; -use crate::virtual_file::{self, owned_buffers_io, VirtualFile}; +use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile}; use bytes::BytesMut; use camino::Utf8PathBuf; use num_traits::Num; @@ -107,15 +108,18 @@ impl EphemeralFile { self.page_cache_file_id } - pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { + pub(crate) async fn load_to_io_buf( + &self, + ctx: &RequestContext, + ) -> Result { let size = self.len().into_usize(); - let vec = Vec::with_capacity(size); - let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?; + let buf = IoBufferMut::with_capacity(size); + let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?; assert_eq!(nread, size); - let vec = slice.into_inner(); - assert_eq!(vec.len(), nread); - assert_eq!(vec.capacity(), size, "we shouldn't be reallocating"); - Ok(vec) + let buf = slice.into_inner(); + assert_eq!(buf.len(), nread); + assert_eq!(buf.capacity(), size, "we shouldn't be reallocating"); + Ok(buf) } /// Returns the offset at which the first byte of the input was written, for use @@ -158,7 +162,7 @@ impl EphemeralFile { } impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile { - async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: tokio_epoll_uring::Slice, @@ -345,7 +349,7 @@ mod tests { assert!(file.len() as usize == write_nbytes); for i in 0..write_nbytes { assert_eq!(value_offsets[i], i.into_u64()); - let buf = Vec::with_capacity(1); + let buf = IoBufferMut::with_capacity(1); let (buf_slice, nread) = file .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx) .await @@ -385,7 +389,7 @@ mod tests { // assert the state is as this test expects it to be assert_eq!( - &file.load_to_vec(&ctx).await.unwrap(), + &file.load_to_io_buf(&ctx).await.unwrap(), &content[0..cap + cap / 2] ); let md = file @@ -440,7 +444,7 @@ mod tests { let (buf, nread) = file .read_exact_at_eof_ok( start.into_u64(), - Vec::with_capacity(len).slice_full(), + IoBufferMut::with_capacity(len).slice_full(), ctx, ) .await diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 6332d36dc343..ceae1d4b1a2a 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -44,11 +44,11 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::BytesMut; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; @@ -1002,7 +1002,7 @@ impl DeltaLayerInner { .0 .into(); let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes); - let mut buf = Some(BytesMut::with_capacity(buf_size)); + let mut buf = Some(IoBufferMut::with_capacity(buf_size)); // Note that reads are processed in reverse order (from highest key+lsn). // This is the order that `ReconstructState` requires such that it can @@ -1029,7 +1029,7 @@ impl DeltaLayerInner { // We have "lost" the buffer since the lower level IO api // doesn't return the buffer on error. Allocate a new one. - buf = Some(BytesMut::with_capacity(buf_size)); + buf = Some(IoBufferMut::with_capacity(buf_size)); continue; } @@ -1203,7 +1203,7 @@ impl DeltaLayerInner { .map(|x| x.0.get()) .unwrap_or(8192); - let mut buffer = Some(BytesMut::with_capacity(max_read_size)); + let mut buffer = Some(IoBufferMut::with_capacity(max_read_size)); // FIXME: buffering of DeltaLayerWriter let mut per_blob_copy = Vec::new(); @@ -1561,12 +1561,11 @@ impl<'a> DeltaLayerIterator<'a> { let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file); let mut next_batch = std::collections::VecDeque::new(); let buf_size = plan.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let blob_read = meta.read(&view).await?; let value = Value::des(&blob_read)?; @@ -1941,7 +1940,7 @@ pub(crate) mod test { &vectored_reads, constants::MAX_VECTORED_READ_BYTES, ); - let mut buf = Some(BytesMut::with_capacity(buf_size)); + let mut buf = Some(IoBufferMut::with_capacity(buf_size)); for read in vectored_reads { let blobs_buf = vectored_blob_reader diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b1f2557038d4..fa058833d404 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -41,10 +41,11 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; @@ -547,10 +548,10 @@ impl ImageLayerInner { for read in plan.into_iter() { let buf_size = read.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await?; @@ -609,13 +610,12 @@ impl ImageLayerInner { } } - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await; match res { Ok(blobs_buf) => { - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await; @@ -1069,12 +1069,11 @@ impl<'a> ImageLayerIterator<'a> { let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file); let mut next_batch = std::collections::VecDeque::new(); let buf_size = plan.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await?; next_batch.push_back(( diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index e487bee1f2d9..7573ddb5ccc0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -14,7 +14,6 @@ use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Context, Result}; -use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; use pageserver_api::keyspace::KeySpace; @@ -809,9 +808,8 @@ impl InMemoryLayer { match l0_flush_global_state { l0_flush::Inner::Direct { .. } => { - let file_contents: Vec = inner.file.load_to_vec(ctx).await?; - - let file_contents = Bytes::from(file_contents); + let file_contents = inner.file.load_to_io_buf(ctx).await?; + let file_contents = file_contents.freeze(); for (key, vec_map) in inner.index.iter() { // Write all page versions @@ -825,7 +823,7 @@ impl InMemoryLayer { len, will_init, } = entry; - let buf = Bytes::slice(&file_contents, pos as usize..(pos + len) as usize); + let buf = file_contents.slice(pos as usize..(pos + len) as usize); let (_buf, res) = delta_layer_writer .put_value_bytes( Key::from_compact(*key), diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index 0683e15659dc..a4bb3a6bfc5d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -9,6 +9,7 @@ use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; use crate::{ assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}, context::RequestContext, + virtual_file::{owned_buffers_io::io_buf_aligned::IoBufAlignedMut, IoBufferMut}, }; /// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`]. @@ -24,7 +25,7 @@ pub trait File: Send { /// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`. /// /// No guarantees are made about the remaining bytes in `dst` in case of a short read. - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: Slice, @@ -227,7 +228,7 @@ where // Execute physical reads and fill the logical read buffers // TODO: pipelined reads; prefetch; - let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE); + let get_io_buffer = |nchunks| IoBufferMut::with_capacity(nchunks * DIO_CHUNK_SIZE); for PhysicalRead { start_chunk_no, nchunks, @@ -459,7 +460,7 @@ mod tests { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let file = InMemoryFile::new_random(10); let test_read = |pos, len| { - let buf = vec![0; len]; + let buf = IoBufferMut::with_capacity_zeroed(len); let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx); use futures::FutureExt; let (slice, nread) = fut @@ -470,9 +471,9 @@ mod tests { buf.truncate(nread); buf }; - assert_eq!(test_read(0, 1), &file.content[0..1]); - assert_eq!(test_read(1, 2), &file.content[1..3]); - assert_eq!(test_read(9, 2), &file.content[9..]); + assert_eq!(&test_read(0, 1), &file.content[0..1]); + assert_eq!(&test_read(1, 2), &file.content[1..3]); + assert_eq!(&test_read(9, 2), &file.content[9..]); assert!(test_read(10, 2).is_empty()); assert!(test_read(11, 2).is_empty()); } @@ -609,7 +610,7 @@ mod tests { } impl<'x> File for RecorderFile<'x> { - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: Slice, @@ -782,7 +783,7 @@ mod tests { 2048, 1024 => Err("foo".to_owned()), }; - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity(512); let (buf, nread) = mock_file .read_exact_at_eof_ok(0, buf.slice_full(), &ctx) .await @@ -790,7 +791,7 @@ mod tests { assert_eq!(nread, 512); assert_eq!(&buf.into_inner()[..nread], &[0; 512]); - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity(512); let (buf, nread) = mock_file .read_exact_at_eof_ok(512, buf.slice_full(), &ctx) .await @@ -798,7 +799,7 @@ mod tests { assert_eq!(nread, 512); assert_eq!(&buf.into_inner()[..nread], &[1; 512]); - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity(512); let (buf, nread) = mock_file .read_exact_at_eof_ok(1024, buf.slice_full(), &ctx) .await @@ -806,7 +807,7 @@ mod tests { assert_eq!(nread, 10); assert_eq!(&buf.into_inner()[..nread], &[2; 10]); - let buf = Vec::with_capacity(1024); + let buf = IoBufferMut::with_capacity(1024); let err = mock_file .read_exact_at_eof_ok(2048, buf.slice_full(), &ctx) .await diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 0c037910344c..dfe2352310bd 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use std::ops::Deref; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -27,6 +27,7 @@ use utils::vec_map::VecMap; use crate::context::RequestContext; use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK}; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, VirtualFile}; /// Metadata bundled with the start and end offset of a blob. @@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob { /// Return type of [`VectoredBlobReader::read_blobs`] pub struct VectoredBlobsBuf { /// Buffer for all blobs in this read - pub buf: BytesMut, + pub buf: IoBufferMut, /// Offsets into the buffer and metadata for all blobs in this read pub blobs: Vec, } @@ -441,7 +442,7 @@ impl<'a> VectoredBlobReader<'a> { pub async fn read_blobs( &self, read: &VectoredRead, - buf: BytesMut, + buf: IoBufferMut, ctx: &RequestContext, ) -> Result { assert!(read.size() > 0); @@ -916,7 +917,7 @@ mod tests { // Multiply by two (compressed data might need more space), and add a few bytes for the header let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16; - let mut buf = BytesMut::with_capacity(reserved_bytes); + let mut buf = IoBufferMut::with_capacity(reserved_bytes); let vectored_blob_reader = VectoredBlobReader::new(&file); let meta = BlobMeta { diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 5a364b7aaf03..daa8b99ab0f1 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -18,6 +18,9 @@ use crate::page_cache::{PageWriteGuard, PAGE_SZ}; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; +use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer; +use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign}; +use owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use pageserver_api::shard::TenantShardId; @@ -55,6 +58,8 @@ pub(crate) mod owned_buffers_io { //! but for the time being we're proving out the primitives in the neon.git repo //! for faster iteration. + pub(crate) mod aligned_buffer; + pub(crate) mod io_buf_aligned; pub(crate) mod io_buf_ext; pub(crate) mod slice; pub(crate) mod write; @@ -196,7 +201,7 @@ impl VirtualFile { ctx: &RequestContext, ) -> Result, Error> where - Buf: IoBufMut + Send, + Buf: IoBufAlignedMut + Send, { self.inner.read_exact_at(slice, offset, ctx).await } @@ -771,7 +776,7 @@ impl VirtualFileInner { ctx: &RequestContext, ) -> Result, Error> where - Buf: IoBufMut + Send, + Buf: IoBufAlignedMut + Send, { let assert_we_return_original_bounds = if cfg!(debug_assertions) { Some((slice.stable_ptr() as usize, slice.bytes_total())) @@ -1222,12 +1227,14 @@ impl VirtualFileInner { ctx: &RequestContext, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; - let slice = Vec::with_capacity(PAGE_SZ).slice_full(); + let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full(); assert_eq!(slice.bytes_total(), PAGE_SZ); let slice = self .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx) .await?; - Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner())) + Ok(crate::tenant::block_io::BlockLease::IoBufferMut( + slice.into_inner(), + )) } async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { @@ -1325,10 +1332,11 @@ impl OpenFiles { /// server startup. /// #[cfg(not(test))] -pub fn init(num_slots: usize, engine: IoEngineKind) { +pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode) { if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() { panic!("virtual_file::init called twice"); } + set_io_mode(mode); io_engine::init(engine); crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64); } @@ -1357,6 +1365,11 @@ pub(crate) const fn get_io_buffer_alignment() -> usize { DEFAULT_IO_BUFFER_ALIGNMENT } +pub(crate) type IoBufferMut = AlignedBufferMut>; +pub(crate) type IoBuffer = AlignedBuffer>; +pub(crate) type IoPageSlice<'a> = + AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>; + static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8); pub(crate) fn set_io_mode(mode: IoMode) { @@ -1395,10 +1408,10 @@ mod tests { impl MaybeVirtualFile { async fn read_exact_at( &self, - mut slice: tokio_epoll_uring::Slice>, + mut slice: tokio_epoll_uring::Slice, offset: u64, ctx: &RequestContext, - ) -> Result>, Error> { + ) -> Result, Error> { match self { MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await, MaybeVirtualFile::File(file) => { @@ -1466,12 +1479,13 @@ mod tests { len: usize, ctx: &RequestContext, ) -> Result { - let slice = Vec::with_capacity(len).slice_full(); + let slice = IoBufferMut::with_capacity(len).slice_full(); assert_eq!(slice.bytes_total(), len); let slice = self.read_exact_at(slice, pos, ctx).await?; - let vec = slice.into_inner(); - assert_eq!(vec.len(), len); - Ok(String::from_utf8(vec).unwrap()) + let buf = slice.into_inner(); + assert_eq!(buf.len(), len); + + Ok(String::from_utf8(buf.to_vec()).unwrap()) } } @@ -1695,7 +1709,7 @@ mod tests { let files = files.clone(); let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error); let hdl = rt.spawn(async move { - let mut buf = vec![0u8; SIZE]; + let mut buf = IoBufferMut::with_capacity_zeroed(SIZE); let mut rng = rand::rngs::OsRng; for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())]; @@ -1704,7 +1718,7 @@ mod tests { .await .unwrap() .into_inner(); - assert!(buf == SAMPLE); + assert!(buf[..] == SAMPLE); } }); hdls.push(hdl); diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer.rs new file mode 100644 index 000000000000..8ffc29b93d33 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer.rs @@ -0,0 +1,9 @@ +pub mod alignment; +pub mod buffer; +pub mod buffer_mut; +pub mod raw; +pub mod slice; + +pub use alignment::*; +pub use buffer_mut::AlignedBufferMut; +pub use slice::AlignedSlice; diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs new file mode 100644 index 000000000000..933b78a13b70 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs @@ -0,0 +1,26 @@ +pub trait Alignment: std::marker::Unpin + 'static { + /// Returns the required alignments. + fn align(&self) -> usize; +} + +/// Alignment at compile time. +#[derive(Debug)] +pub struct ConstAlign; + +impl Alignment for ConstAlign { + fn align(&self) -> usize { + A + } +} + +/// Alignment at run time. +#[derive(Debug)] +pub struct RuntimeAlign { + align: usize, +} + +impl Alignment for RuntimeAlign { + fn align(&self) -> usize { + self.align + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs new file mode 100644 index 000000000000..2fba6d699b28 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs @@ -0,0 +1,124 @@ +use std::{ + ops::{Deref, Range, RangeBounds}, + sync::Arc, +}; + +use super::{alignment::Alignment, raw::RawAlignedBuffer}; + +/// An shared, immutable aligned buffer type. +pub struct AlignedBuffer { + /// Shared raw buffer. + raw: Arc>, + /// Range that specifies the current slice. + range: Range, +} + +impl AlignedBuffer { + /// Creates an immutable `IoBuffer` from the raw buffer + pub(super) fn from_raw(raw: RawAlignedBuffer, range: Range) -> Self { + AlignedBuffer { + raw: Arc::new(raw), + range, + } + } + + /// Returns the number of bytes in the buffer, also referred to as its 'length'. + #[inline] + pub fn len(&self) -> usize { + self.range.len() + } + + /// Returns the alignment of the buffer. + #[inline] + pub fn align(&self) -> usize { + self.raw.align() + } + + #[inline] + fn as_ptr(&self) -> *const u8 { + // SAFETY: `self.range.start` is guaranteed to be within [0, self.len()). + unsafe { self.raw.as_ptr().add(self.range.start) } + } + + /// Extracts a slice containing the entire buffer. + /// + /// Equivalent to `&s[..]`. + #[inline] + fn as_slice(&self) -> &[u8] { + &self.raw.as_slice()[self.range.start..self.range.end] + } + + /// Returns a slice of self for the index range `[begin..end)`. + pub fn slice(&self, range: impl RangeBounds) -> Self { + use core::ops::Bound; + let len = self.len(); + + let begin = match range.start_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n.checked_add(1).expect("out of range"), + Bound::Unbounded => 0, + }; + + let end = match range.end_bound() { + Bound::Included(&n) => n.checked_add(1).expect("out of range"), + Bound::Excluded(&n) => n, + Bound::Unbounded => len, + }; + + assert!( + begin <= end, + "range start must not be greater than end: {:?} <= {:?}", + begin, + end, + ); + assert!( + end <= len, + "range end out of bounds: {:?} <= {:?}", + end, + len, + ); + + let begin = self.range.start + begin; + let end = self.range.start + end; + + AlignedBuffer { + raw: Arc::clone(&self.raw), + range: begin..end, + } + } +} + +impl Deref for AlignedBuffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl AsRef<[u8]> for AlignedBuffer { + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} + +impl PartialEq<[u8]> for AlignedBuffer { + fn eq(&self, other: &[u8]) -> bool { + self.as_slice().eq(other) + } +} + +/// SAFETY: the underlying buffer references a stable memory region. +unsafe impl tokio_epoll_uring::IoBuf for AlignedBuffer { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn bytes_total(&self) -> usize { + self.len() + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs new file mode 100644 index 000000000000..b3675d1aeabb --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs @@ -0,0 +1,347 @@ +use std::ops::{Deref, DerefMut}; + +use super::{ + alignment::{Alignment, ConstAlign}, + buffer::AlignedBuffer, + raw::RawAlignedBuffer, +}; + +/// A mutable aligned buffer type. +#[derive(Debug)] +pub struct AlignedBufferMut { + raw: RawAlignedBuffer, +} + +impl AlignedBufferMut> { + /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment. + /// + /// The buffer will be able to hold at most `capacity` elements and will never resize. + /// + /// + /// # Panics + /// + /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met: + /// * `align` must not be zero, + /// + /// * `align` must be a power of two, + /// + /// * `capacity`, when rounded up to the nearest multiple of `align`, + /// must not overflow isize (i.e., the rounded value must be + /// less than or equal to `isize::MAX`). + pub fn with_capacity(capacity: usize) -> Self { + AlignedBufferMut { + raw: RawAlignedBuffer::with_capacity(capacity), + } + } + + /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros. + pub fn with_capacity_zeroed(capacity: usize) -> Self { + use bytes::BufMut; + let mut buf = Self::with_capacity(capacity); + buf.put_bytes(0, capacity); + // SAFETY: `put_bytes` filled the entire buffer. + unsafe { buf.set_len(capacity) }; + buf + } +} + +impl AlignedBufferMut { + /// Returns the total number of bytes the buffer can hold. + #[inline] + pub fn capacity(&self) -> usize { + self.raw.capacity() + } + + /// Returns the alignment of the buffer. + #[inline] + pub fn align(&self) -> usize { + self.raw.align() + } + + /// Returns the number of bytes in the buffer, also referred to as its 'length'. + #[inline] + pub fn len(&self) -> usize { + self.raw.len() + } + + /// Force the length of the buffer to `new_len`. + #[inline] + unsafe fn set_len(&mut self, new_len: usize) { + self.raw.set_len(new_len) + } + + #[inline] + fn as_ptr(&self) -> *const u8 { + self.raw.as_ptr() + } + + #[inline] + fn as_mut_ptr(&mut self) -> *mut u8 { + self.raw.as_mut_ptr() + } + + /// Extracts a slice containing the entire buffer. + /// + /// Equivalent to `&s[..]`. + #[inline] + fn as_slice(&self) -> &[u8] { + self.raw.as_slice() + } + + /// Extracts a mutable slice of the entire buffer. + /// + /// Equivalent to `&mut s[..]`. + fn as_mut_slice(&mut self) -> &mut [u8] { + self.raw.as_mut_slice() + } + + /// Drops the all the contents of the buffer, setting its length to `0`. + #[inline] + pub fn clear(&mut self) { + self.raw.clear() + } + + /// Reserves capacity for at least `additional` more bytes to be inserted + /// in the given `IoBufferMut`. The collection may reserve more space to + /// speculatively avoid frequent reallocations. After calling `reserve`, + /// capacity will be greater than or equal to `self.len() + additional`. + /// Does nothing if capacity is already sufficient. + /// + /// # Panics + /// + /// Panics if the new capacity exceeds `isize::MAX` _bytes_. + pub fn reserve(&mut self, additional: usize) { + self.raw.reserve(additional); + } + + /// Shortens the buffer, keeping the first len bytes. + pub fn truncate(&mut self, len: usize) { + self.raw.truncate(len); + } + + /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8]. + pub fn leak<'a>(self) -> &'a mut [u8] { + self.raw.leak() + } + + pub fn freeze(self) -> AlignedBuffer { + let len = self.len(); + AlignedBuffer::from_raw(self.raw, 0..len) + } +} + +impl Deref for AlignedBufferMut { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl DerefMut for AlignedBufferMut { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut_slice() + } +} + +impl AsRef<[u8]> for AlignedBufferMut { + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} + +impl AsMut<[u8]> for AlignedBufferMut { + fn as_mut(&mut self) -> &mut [u8] { + self.as_mut_slice() + } +} + +impl PartialEq<[u8]> for AlignedBufferMut { + fn eq(&self, other: &[u8]) -> bool { + self.as_slice().eq(other) + } +} + +/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized. +unsafe impl bytes::BufMut for AlignedBufferMut { + #[inline] + fn remaining_mut(&self) -> usize { + // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`. + // Thus, it can have at most `self.capacity` bytes. + self.capacity() - self.len() + } + + // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized. + #[inline] + unsafe fn advance_mut(&mut self, cnt: usize) { + let len = self.len(); + let remaining = self.remaining_mut(); + + if remaining < cnt { + panic_advance(cnt, remaining); + } + + // Addition will not overflow since the sum is at most the capacity. + self.set_len(len + cnt); + } + + #[inline] + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + let cap = self.capacity(); + let len = self.len(); + + // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be + // valid for `cap - len` bytes. The subtraction will not underflow since + // `len <= cap`. + unsafe { + bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len) + } + } +} + +/// Panic with a nice error message. +#[cold] +fn panic_advance(idx: usize, len: usize) -> ! { + panic!( + "advance out of bounds: the len is {} but advancing by {}", + len, idx + ); +} + +/// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer, +/// and the underlying pointer remains stable while io-uring is owning the buffer. +/// The tokio-epoll-uring crate itself will not resize the buffer and will respect +/// [`tokio_epoll_uring::IoBuf::bytes_total`]. +unsafe impl tokio_epoll_uring::IoBuf for AlignedBufferMut { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn bytes_total(&self) -> usize { + self.capacity() + } +} + +// SAFETY: See above. +unsafe impl tokio_epoll_uring::IoBufMut for AlignedBufferMut { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.as_mut_ptr() + } + + unsafe fn set_init(&mut self, init_len: usize) { + if self.len() < init_len { + self.set_len(init_len); + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + const ALIGN: usize = 4 * 1024; + type TestIoBufferMut = AlignedBufferMut>; + + #[test] + fn test_with_capacity() { + let v = TestIoBufferMut::with_capacity(ALIGN * 4); + assert_eq!(v.len(), 0); + assert_eq!(v.capacity(), ALIGN * 4); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + + let v = TestIoBufferMut::with_capacity(ALIGN / 2); + assert_eq!(v.len(), 0); + assert_eq!(v.capacity(), ALIGN / 2); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + } + + #[test] + fn test_with_capacity_zeroed() { + let v = TestIoBufferMut::with_capacity_zeroed(ALIGN); + assert_eq!(v.len(), ALIGN); + assert_eq!(v.capacity(), ALIGN); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + assert_eq!(&v[..], &[0; ALIGN]) + } + + #[test] + fn test_reserve() { + use bytes::BufMut; + let mut v = TestIoBufferMut::with_capacity(ALIGN); + let capacity = v.capacity(); + v.reserve(capacity); + assert_eq!(v.capacity(), capacity); + let data = [b'a'; ALIGN]; + v.put(&data[..]); + v.reserve(capacity); + assert!(v.capacity() >= capacity * 2); + assert_eq!(&v[..], &data[..]); + let capacity = v.capacity(); + v.clear(); + v.reserve(capacity); + assert_eq!(capacity, v.capacity()); + } + + #[test] + fn test_bytes_put() { + use bytes::BufMut; + let mut v = TestIoBufferMut::with_capacity(ALIGN * 4); + let x = [b'a'; ALIGN]; + + for _ in 0..2 { + for _ in 0..4 { + v.put(&x[..]); + } + assert_eq!(v.len(), ALIGN * 4); + assert_eq!(v.capacity(), ALIGN * 4); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + v.clear() + } + assert_eq!(v.len(), 0); + assert_eq!(v.capacity(), ALIGN * 4); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + } + + #[test] + #[should_panic] + fn test_bytes_put_panic() { + use bytes::BufMut; + const ALIGN: usize = 4 * 1024; + let mut v = TestIoBufferMut::with_capacity(ALIGN * 4); + let x = [b'a'; ALIGN]; + for _ in 0..5 { + v.put_slice(&x[..]); + } + } + + #[test] + fn test_io_buf_put_slice() { + use tokio_epoll_uring::BoundedBufMut; + const ALIGN: usize = 4 * 1024; + let mut v = TestIoBufferMut::with_capacity(ALIGN); + let x = [b'a'; ALIGN]; + + for _ in 0..2 { + v.put_slice(&x[..]); + assert_eq!(v.len(), ALIGN); + assert_eq!(v.capacity(), ALIGN); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + v.clear() + } + assert_eq!(v.len(), 0); + assert_eq!(v.capacity(), ALIGN); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs new file mode 100644 index 000000000000..6c26dec0db22 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs @@ -0,0 +1,216 @@ +use core::slice; +use std::{ + alloc::{self, Layout}, + cmp, + mem::ManuallyDrop, +}; + +use super::alignment::{Alignment, ConstAlign}; + +#[derive(Debug)] +struct AlignedBufferPtr(*mut u8); + +// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer. +unsafe impl Send for AlignedBufferPtr {} + +// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer. +unsafe impl Sync for AlignedBufferPtr {} + +/// An aligned buffer type. +#[derive(Debug)] +pub struct RawAlignedBuffer { + ptr: AlignedBufferPtr, + capacity: usize, + len: usize, + align: A, +} + +impl RawAlignedBuffer> { + /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment. + /// + /// The buffer will be able to hold at most `capacity` elements and will never resize. + /// + /// + /// # Panics + /// + /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met: + /// * `align` must not be zero, + /// + /// * `align` must be a power of two, + /// + /// * `capacity`, when rounded up to the nearest multiple of `align`, + /// must not overflow isize (i.e., the rounded value must be + /// less than or equal to `isize::MAX`). + pub fn with_capacity(capacity: usize) -> Self { + let align = ConstAlign::; + let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout"); + + // SAFETY: Making an allocation with a sized and aligned layout. The memory is manually freed with the same layout. + let ptr = unsafe { + let ptr = alloc::alloc(layout); + if ptr.is_null() { + alloc::handle_alloc_error(layout); + } + AlignedBufferPtr(ptr) + }; + + RawAlignedBuffer { + ptr, + capacity, + len: 0, + align, + } + } +} + +impl RawAlignedBuffer { + /// Returns the total number of bytes the buffer can hold. + #[inline] + pub fn capacity(&self) -> usize { + self.capacity + } + + /// Returns the alignment of the buffer. + #[inline] + pub fn align(&self) -> usize { + self.align.align() + } + + /// Returns the number of bytes in the buffer, also referred to as its 'length'. + #[inline] + pub fn len(&self) -> usize { + self.len + } + + /// Force the length of the buffer to `new_len`. + #[inline] + pub unsafe fn set_len(&mut self, new_len: usize) { + debug_assert!(new_len <= self.capacity()); + self.len = new_len; + } + + #[inline] + pub fn as_ptr(&self) -> *const u8 { + self.ptr.0 + } + + #[inline] + pub fn as_mut_ptr(&mut self) -> *mut u8 { + self.ptr.0 + } + + /// Extracts a slice containing the entire buffer. + /// + /// Equivalent to `&s[..]`. + #[inline] + pub fn as_slice(&self) -> &[u8] { + // SAFETY: The pointer is valid and `len` bytes are initialized. + unsafe { slice::from_raw_parts(self.as_ptr(), self.len) } + } + + /// Extracts a mutable slice of the entire buffer. + /// + /// Equivalent to `&mut s[..]`. + pub fn as_mut_slice(&mut self) -> &mut [u8] { + // SAFETY: The pointer is valid and `len` bytes are initialized. + unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) } + } + + /// Drops the all the contents of the buffer, setting its length to `0`. + #[inline] + pub fn clear(&mut self) { + self.len = 0; + } + + /// Reserves capacity for at least `additional` more bytes to be inserted + /// in the given `IoBufferMut`. The collection may reserve more space to + /// speculatively avoid frequent reallocations. After calling `reserve`, + /// capacity will be greater than or equal to `self.len() + additional`. + /// Does nothing if capacity is already sufficient. + /// + /// # Panics + /// + /// Panics if the new capacity exceeds `isize::MAX` _bytes_. + pub fn reserve(&mut self, additional: usize) { + if additional > self.capacity() - self.len() { + self.reserve_inner(additional); + } + } + + fn reserve_inner(&mut self, additional: usize) { + let Some(required_cap) = self.len().checked_add(additional) else { + capacity_overflow() + }; + + let old_capacity = self.capacity(); + let align = self.align(); + // This guarantees exponential growth. The doubling cannot overflow + // because `cap <= isize::MAX` and the type of `cap` is `usize`. + let cap = cmp::max(old_capacity * 2, required_cap); + + if !is_valid_alloc(cap) { + capacity_overflow() + } + let new_layout = Layout::from_size_align(cap, self.align()).expect("Invalid layout"); + + let old_ptr = self.as_mut_ptr(); + + // SAFETY: old allocation was allocated with std::alloc::alloc with the same layout, + // and we panics on null pointer. + let (ptr, cap) = unsafe { + let old_layout = Layout::from_size_align_unchecked(old_capacity, align); + let ptr = alloc::realloc(old_ptr, old_layout, new_layout.size()); + if ptr.is_null() { + alloc::handle_alloc_error(new_layout); + } + (AlignedBufferPtr(ptr), cap) + }; + + self.ptr = ptr; + self.capacity = cap; + } + + /// Shortens the buffer, keeping the first len bytes. + pub fn truncate(&mut self, len: usize) { + if len > self.len { + return; + } + self.len = len; + } + + /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8]. + pub fn leak<'a>(self) -> &'a mut [u8] { + let mut buf = ManuallyDrop::new(self); + // SAFETY: leaking the buffer as intended. + unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len) } + } +} + +fn capacity_overflow() -> ! { + panic!("capacity overflow") +} + +// We need to guarantee the following: +// * We don't ever allocate `> isize::MAX` byte-size objects. +// * We don't overflow `usize::MAX` and actually allocate too little. +// +// On 64-bit we just need to check for overflow since trying to allocate +// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add +// an extra guard for this in case we're running on a platform which can use +// all 4GB in user-space, e.g., PAE or x32. +#[inline] +fn is_valid_alloc(alloc_size: usize) -> bool { + !(usize::BITS < 64 && alloc_size > isize::MAX as usize) +} + +impl Drop for RawAlignedBuffer { + fn drop(&mut self) { + // SAFETY: memory was allocated with std::alloc::alloc with the same layout. + unsafe { + alloc::dealloc( + self.as_mut_ptr(), + Layout::from_size_align_unchecked(self.capacity, self.align.align()), + ) + } + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/slice.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/slice.rs new file mode 100644 index 000000000000..6cecf34c1cd5 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/slice.rs @@ -0,0 +1,40 @@ +use std::ops::{Deref, DerefMut}; + +use super::alignment::{Alignment, ConstAlign}; + +/// Newtype for an aligned slice. +pub struct AlignedSlice<'a, const N: usize, A: Alignment> { + /// underlying byte slice + buf: &'a mut [u8; N], + /// alignment marker + _align: A, +} + +impl<'a, const N: usize, const A: usize> AlignedSlice<'a, N, ConstAlign> { + /// Create a new aligned slice from a mutable byte slice. The input must already satisify the alignment. + pub unsafe fn new_unchecked(buf: &'a mut [u8; N]) -> Self { + let _align = ConstAlign::; + assert_eq!(buf.as_ptr().align_offset(_align.align()), 0); + AlignedSlice { buf, _align } + } +} + +impl<'a, const N: usize, A: Alignment> Deref for AlignedSlice<'a, N, A> { + type Target = [u8; N]; + + fn deref(&self) -> &Self::Target { + self.buf + } +} + +impl<'a, const N: usize, A: Alignment> DerefMut for AlignedSlice<'a, N, A> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.buf + } +} + +impl<'a, const N: usize, A: Alignment> AsRef<[u8; N]> for AlignedSlice<'a, N, A> { + fn as_ref(&self) -> &[u8; N] { + self.buf + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs new file mode 100644 index 000000000000..dba695196ebb --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs @@ -0,0 +1,9 @@ +use tokio_epoll_uring::IoBufMut; + +use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf}; + +pub trait IoBufAlignedMut: IoBufMut {} + +impl IoBufAlignedMut for IoBufferMut {} + +impl IoBufAlignedMut for PageWriteGuardBuf {} diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs index 7c773b6b2103..c3940cf6cea2 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -1,5 +1,6 @@ //! See [`FullSlice`]. +use crate::virtual_file::{IoBuffer, IoBufferMut}; use bytes::{Bytes, BytesMut}; use std::ops::{Deref, Range}; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; @@ -76,3 +77,5 @@ macro_rules! impl_io_buf_ext { impl_io_buf_ext!(Bytes); impl_io_buf_ext!(BytesMut); impl_io_buf_ext!(Vec); +impl_io_buf_ext!(IoBufferMut); +impl_io_buf_ext!(IoBuffer);