diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 821c8008a950..d98b23acced5 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -164,7 +164,11 @@ fn criterion_benchmark(c: &mut Criterion) { let conf: &'static PageServerConf = Box::leak(Box::new( pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()), )); - virtual_file::init(16384, virtual_file::io_engine_for_bench()); + virtual_file::init( + 16384, + virtual_file::io_engine_for_bench(), + conf.virtual_file_io_mode, + ); page_cache::init(conf.page_cache_size); { diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 151b94cf62d3..7dd2a5d05cbf 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -7,6 +7,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use pageserver::context::{DownloadBehavior, RequestContext}; use pageserver::task_mgr::TaskKind; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; +use pageserver::virtual_file::api::IoMode; use std::cmp::Ordering; use std::collections::BinaryHeap; use std::ops::Range; @@ -152,7 +153,11 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. - pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + IoMode::preferred(), + ); pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index fd948bf2efed..c0b2b6ae890a 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -11,6 +11,7 @@ use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary}; use pageserver::tenant::storage_layer::{delta_layer, image_layer}; use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer}; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; +use pageserver::virtual_file::api::IoMode; use pageserver::{page_cache, virtual_file}; use pageserver::{ repository::{Key, KEY_SIZE}, @@ -59,7 +60,11 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); - virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + IoMode::preferred(), + ); page_cache::init(100); let file = VirtualFile::open(path, ctx).await?; let file_id = page_cache::next_file_id(); @@ -190,7 +195,11 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { new_tenant_id, new_timeline_id, } => { - pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + IoMode::preferred(), + ); pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index c96664d346cb..f506caec5b06 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -24,7 +24,7 @@ use pageserver::{ page_cache, task_mgr::TaskKind, tenant::{dump_layerfile_from_path, metadata::TimelineMetadata}, - virtual_file, + virtual_file::{self, api::IoMode}, }; use pageserver_api::shard::TenantShardId; use postgres_ffi::ControlFileData; @@ -205,7 +205,11 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup - virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + IoMode::preferred(), + ); page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index f71a3d26531c..c6659345f94c 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -167,7 +167,11 @@ fn main() -> anyhow::Result<()> { let scenario = failpoint_support::init(); // Basic initialization of things that don't change after startup - virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine); + virtual_file::init( + conf.max_file_descriptors, + conf.virtual_file_io_engine, + conf.virtual_file_io_mode, + ); page_cache::init(conf.page_cache_size); start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index f386c825b848..45bf02362aa9 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -82,6 +82,7 @@ use once_cell::sync::OnceCell; use crate::{ context::RequestContext, metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics}, + virtual_file::{IoBufferMut, IoPageSlice}, }; static PAGE_CACHE: OnceCell = OnceCell::new(); @@ -144,7 +145,7 @@ struct SlotInner { key: Option, // for `coalesce_readers_permit` permit: std::sync::Mutex>, - buf: &'static mut [u8; PAGE_SZ], + buf: IoPageSlice<'static>, } impl Slot { @@ -234,13 +235,13 @@ impl std::ops::Deref for PageReadGuard<'_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.slot_guard.buf + self.slot_guard.buf.deref() } } impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { fn as_ref(&self) -> &[u8; PAGE_SZ] { - self.slot_guard.buf + self.slot_guard.buf.as_ref() } } @@ -266,7 +267,7 @@ enum PageWriteGuardState<'i> { impl std::ops::DerefMut for PageWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { match &mut self.state { - PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(), PageWriteGuardState::Downgraded => unreachable!(), } } @@ -277,7 +278,7 @@ impl std::ops::Deref for PageWriteGuard<'_> { fn deref(&self) -> &Self::Target { match &self.state { - PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(), PageWriteGuardState::Downgraded => unreachable!(), } } @@ -643,7 +644,7 @@ impl PageCache { // We could use Vec::leak here, but that potentially also leaks // uninitialized reserved capacity. With into_boxed_slice and Box::leak // this is avoided. - let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice()); + let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak(); let size_metrics = &crate::metrics::PAGE_CACHE_SIZE; size_metrics.max_bytes.set_page_sz(num_pages); @@ -652,7 +653,8 @@ impl PageCache { let slots = page_buffer .chunks_exact_mut(PAGE_SZ) .map(|chunk| { - let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); + // SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned. + let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) }; Slot { inner: tokio::sync::RwLock::new(SlotInner { diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 1c82e5454de9..2bd7f2d619aa 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,6 +5,8 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; +#[cfg(test)] +use crate::virtual_file::IoBufferMut; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::Deref; @@ -40,7 +42,7 @@ pub enum BlockLease<'a> { #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), #[cfg(test)] - Vec(Vec), + IoBufferMut(IoBufferMut), } impl From> for BlockLease<'static> { @@ -67,7 +69,7 @@ impl Deref for BlockLease<'_> { #[cfg(test)] BlockLease::Arc(v) => v.deref(), #[cfg(test)] - BlockLease::Vec(v) => { + BlockLease::IoBufferMut(v) => { TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ") } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index a62a47f9a760..de0abab4c0c7 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -6,10 +6,11 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; +use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; use crate::virtual_file::owned_buffers_io::write::Buffer; -use crate::virtual_file::{self, owned_buffers_io, VirtualFile}; +use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile}; use bytes::BytesMut; use camino::Utf8PathBuf; use num_traits::Num; @@ -107,15 +108,18 @@ impl EphemeralFile { self.page_cache_file_id } - pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { + pub(crate) async fn load_to_io_buf( + &self, + ctx: &RequestContext, + ) -> Result { let size = self.len().into_usize(); - let vec = Vec::with_capacity(size); - let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?; + let buf = IoBufferMut::with_capacity(size); + let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?; assert_eq!(nread, size); - let vec = slice.into_inner(); - assert_eq!(vec.len(), nread); - assert_eq!(vec.capacity(), size, "we shouldn't be reallocating"); - Ok(vec) + let buf = slice.into_inner(); + assert_eq!(buf.len(), nread); + assert_eq!(buf.capacity(), size, "we shouldn't be reallocating"); + Ok(buf) } /// Returns the offset at which the first byte of the input was written, for use @@ -158,7 +162,7 @@ impl EphemeralFile { } impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile { - async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: tokio_epoll_uring::Slice, @@ -345,7 +349,7 @@ mod tests { assert!(file.len() as usize == write_nbytes); for i in 0..write_nbytes { assert_eq!(value_offsets[i], i.into_u64()); - let buf = Vec::with_capacity(1); + let buf = IoBufferMut::with_capacity(1); let (buf_slice, nread) = file .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx) .await @@ -385,7 +389,7 @@ mod tests { // assert the state is as this test expects it to be assert_eq!( - &file.load_to_vec(&ctx).await.unwrap(), + &file.load_to_io_buf(&ctx).await.unwrap(), &content[0..cap + cap / 2] ); let md = file @@ -440,7 +444,7 @@ mod tests { let (buf, nread) = file .read_exact_at_eof_ok( start.into_u64(), - Vec::with_capacity(len).slice_full(), + IoBufferMut::with_capacity(len).slice_full(), ctx, ) .await diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 6332d36dc343..ceae1d4b1a2a 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -44,11 +44,11 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::BytesMut; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; @@ -1002,7 +1002,7 @@ impl DeltaLayerInner { .0 .into(); let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes); - let mut buf = Some(BytesMut::with_capacity(buf_size)); + let mut buf = Some(IoBufferMut::with_capacity(buf_size)); // Note that reads are processed in reverse order (from highest key+lsn). // This is the order that `ReconstructState` requires such that it can @@ -1029,7 +1029,7 @@ impl DeltaLayerInner { // We have "lost" the buffer since the lower level IO api // doesn't return the buffer on error. Allocate a new one. - buf = Some(BytesMut::with_capacity(buf_size)); + buf = Some(IoBufferMut::with_capacity(buf_size)); continue; } @@ -1203,7 +1203,7 @@ impl DeltaLayerInner { .map(|x| x.0.get()) .unwrap_or(8192); - let mut buffer = Some(BytesMut::with_capacity(max_read_size)); + let mut buffer = Some(IoBufferMut::with_capacity(max_read_size)); // FIXME: buffering of DeltaLayerWriter let mut per_blob_copy = Vec::new(); @@ -1561,12 +1561,11 @@ impl<'a> DeltaLayerIterator<'a> { let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file); let mut next_batch = std::collections::VecDeque::new(); let buf_size = plan.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let blob_read = meta.read(&view).await?; let value = Value::des(&blob_read)?; @@ -1941,7 +1940,7 @@ pub(crate) mod test { &vectored_reads, constants::MAX_VECTORED_READ_BYTES, ); - let mut buf = Some(BytesMut::with_capacity(buf_size)); + let mut buf = Some(IoBufferMut::with_capacity(buf_size)); for read in vectored_reads { let blobs_buf = vectored_blob_reader diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b1f2557038d4..fa058833d404 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -41,10 +41,11 @@ use crate::tenant::vectored_blob_io::{ }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, MaybeFatalIo, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; @@ -547,10 +548,10 @@ impl ImageLayerInner { for read in plan.into_iter() { let buf_size = read.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await?; @@ -609,13 +610,12 @@ impl ImageLayerInner { } } - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await; match res { Ok(blobs_buf) => { - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await; @@ -1069,12 +1069,11 @@ impl<'a> ImageLayerIterator<'a> { let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file); let mut next_batch = std::collections::VecDeque::new(); let buf_size = plan.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity(buf_size); let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await?; next_batch.push_back(( diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index e487bee1f2d9..7573ddb5ccc0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -14,7 +14,6 @@ use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Context, Result}; -use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; use pageserver_api::keyspace::KeySpace; @@ -809,9 +808,8 @@ impl InMemoryLayer { match l0_flush_global_state { l0_flush::Inner::Direct { .. } => { - let file_contents: Vec = inner.file.load_to_vec(ctx).await?; - - let file_contents = Bytes::from(file_contents); + let file_contents = inner.file.load_to_io_buf(ctx).await?; + let file_contents = file_contents.freeze(); for (key, vec_map) in inner.index.iter() { // Write all page versions @@ -825,7 +823,7 @@ impl InMemoryLayer { len, will_init, } = entry; - let buf = Bytes::slice(&file_contents, pos as usize..(pos + len) as usize); + let buf = file_contents.slice(pos as usize..(pos + len) as usize); let (_buf, res) = delta_layer_writer .put_value_bytes( Key::from_compact(*key), diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index 0683e15659dc..a4bb3a6bfc5d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -9,6 +9,7 @@ use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; use crate::{ assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}, context::RequestContext, + virtual_file::{owned_buffers_io::io_buf_aligned::IoBufAlignedMut, IoBufferMut}, }; /// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`]. @@ -24,7 +25,7 @@ pub trait File: Send { /// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`. /// /// No guarantees are made about the remaining bytes in `dst` in case of a short read. - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: Slice, @@ -227,7 +228,7 @@ where // Execute physical reads and fill the logical read buffers // TODO: pipelined reads; prefetch; - let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE); + let get_io_buffer = |nchunks| IoBufferMut::with_capacity(nchunks * DIO_CHUNK_SIZE); for PhysicalRead { start_chunk_no, nchunks, @@ -459,7 +460,7 @@ mod tests { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let file = InMemoryFile::new_random(10); let test_read = |pos, len| { - let buf = vec![0; len]; + let buf = IoBufferMut::with_capacity_zeroed(len); let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx); use futures::FutureExt; let (slice, nread) = fut @@ -470,9 +471,9 @@ mod tests { buf.truncate(nread); buf }; - assert_eq!(test_read(0, 1), &file.content[0..1]); - assert_eq!(test_read(1, 2), &file.content[1..3]); - assert_eq!(test_read(9, 2), &file.content[9..]); + assert_eq!(&test_read(0, 1), &file.content[0..1]); + assert_eq!(&test_read(1, 2), &file.content[1..3]); + assert_eq!(&test_read(9, 2), &file.content[9..]); assert!(test_read(10, 2).is_empty()); assert!(test_read(11, 2).is_empty()); } @@ -609,7 +610,7 @@ mod tests { } impl<'x> File for RecorderFile<'x> { - async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>( &'b self, start: u64, dst: Slice, @@ -782,7 +783,7 @@ mod tests { 2048, 1024 => Err("foo".to_owned()), }; - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity(512); let (buf, nread) = mock_file .read_exact_at_eof_ok(0, buf.slice_full(), &ctx) .await @@ -790,7 +791,7 @@ mod tests { assert_eq!(nread, 512); assert_eq!(&buf.into_inner()[..nread], &[0; 512]); - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity(512); let (buf, nread) = mock_file .read_exact_at_eof_ok(512, buf.slice_full(), &ctx) .await @@ -798,7 +799,7 @@ mod tests { assert_eq!(nread, 512); assert_eq!(&buf.into_inner()[..nread], &[1; 512]); - let buf = Vec::with_capacity(512); + let buf = IoBufferMut::with_capacity(512); let (buf, nread) = mock_file .read_exact_at_eof_ok(1024, buf.slice_full(), &ctx) .await @@ -806,7 +807,7 @@ mod tests { assert_eq!(nread, 10); assert_eq!(&buf.into_inner()[..nread], &[2; 10]); - let buf = Vec::with_capacity(1024); + let buf = IoBufferMut::with_capacity(1024); let err = mock_file .read_exact_at_eof_ok(2048, buf.slice_full(), &ctx) .await diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 0c037910344c..dfe2352310bd 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use std::ops::Deref; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -27,6 +27,7 @@ use utils::vec_map::VecMap; use crate::context::RequestContext; use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK}; +use crate::virtual_file::IoBufferMut; use crate::virtual_file::{self, VirtualFile}; /// Metadata bundled with the start and end offset of a blob. @@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob { /// Return type of [`VectoredBlobReader::read_blobs`] pub struct VectoredBlobsBuf { /// Buffer for all blobs in this read - pub buf: BytesMut, + pub buf: IoBufferMut, /// Offsets into the buffer and metadata for all blobs in this read pub blobs: Vec, } @@ -441,7 +442,7 @@ impl<'a> VectoredBlobReader<'a> { pub async fn read_blobs( &self, read: &VectoredRead, - buf: BytesMut, + buf: IoBufferMut, ctx: &RequestContext, ) -> Result { assert!(read.size() > 0); @@ -916,7 +917,7 @@ mod tests { // Multiply by two (compressed data might need more space), and add a few bytes for the header let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16; - let mut buf = BytesMut::with_capacity(reserved_bytes); + let mut buf = IoBufferMut::with_capacity(reserved_bytes); let vectored_blob_reader = VectoredBlobReader::new(&file); let meta = BlobMeta { diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 5a364b7aaf03..daa8b99ab0f1 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -18,6 +18,9 @@ use crate::page_cache::{PageWriteGuard, PAGE_SZ}; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; +use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer; +use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign}; +use owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use pageserver_api::shard::TenantShardId; @@ -55,6 +58,8 @@ pub(crate) mod owned_buffers_io { //! but for the time being we're proving out the primitives in the neon.git repo //! for faster iteration. + pub(crate) mod aligned_buffer; + pub(crate) mod io_buf_aligned; pub(crate) mod io_buf_ext; pub(crate) mod slice; pub(crate) mod write; @@ -196,7 +201,7 @@ impl VirtualFile { ctx: &RequestContext, ) -> Result, Error> where - Buf: IoBufMut + Send, + Buf: IoBufAlignedMut + Send, { self.inner.read_exact_at(slice, offset, ctx).await } @@ -771,7 +776,7 @@ impl VirtualFileInner { ctx: &RequestContext, ) -> Result, Error> where - Buf: IoBufMut + Send, + Buf: IoBufAlignedMut + Send, { let assert_we_return_original_bounds = if cfg!(debug_assertions) { Some((slice.stable_ptr() as usize, slice.bytes_total())) @@ -1222,12 +1227,14 @@ impl VirtualFileInner { ctx: &RequestContext, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; - let slice = Vec::with_capacity(PAGE_SZ).slice_full(); + let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full(); assert_eq!(slice.bytes_total(), PAGE_SZ); let slice = self .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx) .await?; - Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner())) + Ok(crate::tenant::block_io::BlockLease::IoBufferMut( + slice.into_inner(), + )) } async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { @@ -1325,10 +1332,11 @@ impl OpenFiles { /// server startup. /// #[cfg(not(test))] -pub fn init(num_slots: usize, engine: IoEngineKind) { +pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode) { if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() { panic!("virtual_file::init called twice"); } + set_io_mode(mode); io_engine::init(engine); crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64); } @@ -1357,6 +1365,11 @@ pub(crate) const fn get_io_buffer_alignment() -> usize { DEFAULT_IO_BUFFER_ALIGNMENT } +pub(crate) type IoBufferMut = AlignedBufferMut>; +pub(crate) type IoBuffer = AlignedBuffer>; +pub(crate) type IoPageSlice<'a> = + AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>; + static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8); pub(crate) fn set_io_mode(mode: IoMode) { @@ -1395,10 +1408,10 @@ mod tests { impl MaybeVirtualFile { async fn read_exact_at( &self, - mut slice: tokio_epoll_uring::Slice>, + mut slice: tokio_epoll_uring::Slice, offset: u64, ctx: &RequestContext, - ) -> Result>, Error> { + ) -> Result, Error> { match self { MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await, MaybeVirtualFile::File(file) => { @@ -1466,12 +1479,13 @@ mod tests { len: usize, ctx: &RequestContext, ) -> Result { - let slice = Vec::with_capacity(len).slice_full(); + let slice = IoBufferMut::with_capacity(len).slice_full(); assert_eq!(slice.bytes_total(), len); let slice = self.read_exact_at(slice, pos, ctx).await?; - let vec = slice.into_inner(); - assert_eq!(vec.len(), len); - Ok(String::from_utf8(vec).unwrap()) + let buf = slice.into_inner(); + assert_eq!(buf.len(), len); + + Ok(String::from_utf8(buf.to_vec()).unwrap()) } } @@ -1695,7 +1709,7 @@ mod tests { let files = files.clone(); let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error); let hdl = rt.spawn(async move { - let mut buf = vec![0u8; SIZE]; + let mut buf = IoBufferMut::with_capacity_zeroed(SIZE); let mut rng = rand::rngs::OsRng; for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())]; @@ -1704,7 +1718,7 @@ mod tests { .await .unwrap() .into_inner(); - assert!(buf == SAMPLE); + assert!(buf[..] == SAMPLE); } }); hdls.push(hdl); diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer.rs new file mode 100644 index 000000000000..8ffc29b93d33 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer.rs @@ -0,0 +1,9 @@ +pub mod alignment; +pub mod buffer; +pub mod buffer_mut; +pub mod raw; +pub mod slice; + +pub use alignment::*; +pub use buffer_mut::AlignedBufferMut; +pub use slice::AlignedSlice; diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs new file mode 100644 index 000000000000..933b78a13b70 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/alignment.rs @@ -0,0 +1,26 @@ +pub trait Alignment: std::marker::Unpin + 'static { + /// Returns the required alignments. + fn align(&self) -> usize; +} + +/// Alignment at compile time. +#[derive(Debug)] +pub struct ConstAlign; + +impl Alignment for ConstAlign { + fn align(&self) -> usize { + A + } +} + +/// Alignment at run time. +#[derive(Debug)] +pub struct RuntimeAlign { + align: usize, +} + +impl Alignment for RuntimeAlign { + fn align(&self) -> usize { + self.align + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs new file mode 100644 index 000000000000..2fba6d699b28 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer.rs @@ -0,0 +1,124 @@ +use std::{ + ops::{Deref, Range, RangeBounds}, + sync::Arc, +}; + +use super::{alignment::Alignment, raw::RawAlignedBuffer}; + +/// An shared, immutable aligned buffer type. +pub struct AlignedBuffer { + /// Shared raw buffer. + raw: Arc>, + /// Range that specifies the current slice. + range: Range, +} + +impl AlignedBuffer { + /// Creates an immutable `IoBuffer` from the raw buffer + pub(super) fn from_raw(raw: RawAlignedBuffer, range: Range) -> Self { + AlignedBuffer { + raw: Arc::new(raw), + range, + } + } + + /// Returns the number of bytes in the buffer, also referred to as its 'length'. + #[inline] + pub fn len(&self) -> usize { + self.range.len() + } + + /// Returns the alignment of the buffer. + #[inline] + pub fn align(&self) -> usize { + self.raw.align() + } + + #[inline] + fn as_ptr(&self) -> *const u8 { + // SAFETY: `self.range.start` is guaranteed to be within [0, self.len()). + unsafe { self.raw.as_ptr().add(self.range.start) } + } + + /// Extracts a slice containing the entire buffer. + /// + /// Equivalent to `&s[..]`. + #[inline] + fn as_slice(&self) -> &[u8] { + &self.raw.as_slice()[self.range.start..self.range.end] + } + + /// Returns a slice of self for the index range `[begin..end)`. + pub fn slice(&self, range: impl RangeBounds) -> Self { + use core::ops::Bound; + let len = self.len(); + + let begin = match range.start_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n.checked_add(1).expect("out of range"), + Bound::Unbounded => 0, + }; + + let end = match range.end_bound() { + Bound::Included(&n) => n.checked_add(1).expect("out of range"), + Bound::Excluded(&n) => n, + Bound::Unbounded => len, + }; + + assert!( + begin <= end, + "range start must not be greater than end: {:?} <= {:?}", + begin, + end, + ); + assert!( + end <= len, + "range end out of bounds: {:?} <= {:?}", + end, + len, + ); + + let begin = self.range.start + begin; + let end = self.range.start + end; + + AlignedBuffer { + raw: Arc::clone(&self.raw), + range: begin..end, + } + } +} + +impl Deref for AlignedBuffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl AsRef<[u8]> for AlignedBuffer { + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} + +impl PartialEq<[u8]> for AlignedBuffer { + fn eq(&self, other: &[u8]) -> bool { + self.as_slice().eq(other) + } +} + +/// SAFETY: the underlying buffer references a stable memory region. +unsafe impl tokio_epoll_uring::IoBuf for AlignedBuffer { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn bytes_total(&self) -> usize { + self.len() + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs new file mode 100644 index 000000000000..b3675d1aeabb --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/buffer_mut.rs @@ -0,0 +1,347 @@ +use std::ops::{Deref, DerefMut}; + +use super::{ + alignment::{Alignment, ConstAlign}, + buffer::AlignedBuffer, + raw::RawAlignedBuffer, +}; + +/// A mutable aligned buffer type. +#[derive(Debug)] +pub struct AlignedBufferMut { + raw: RawAlignedBuffer, +} + +impl AlignedBufferMut> { + /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment. + /// + /// The buffer will be able to hold at most `capacity` elements and will never resize. + /// + /// + /// # Panics + /// + /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met: + /// * `align` must not be zero, + /// + /// * `align` must be a power of two, + /// + /// * `capacity`, when rounded up to the nearest multiple of `align`, + /// must not overflow isize (i.e., the rounded value must be + /// less than or equal to `isize::MAX`). + pub fn with_capacity(capacity: usize) -> Self { + AlignedBufferMut { + raw: RawAlignedBuffer::with_capacity(capacity), + } + } + + /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros. + pub fn with_capacity_zeroed(capacity: usize) -> Self { + use bytes::BufMut; + let mut buf = Self::with_capacity(capacity); + buf.put_bytes(0, capacity); + // SAFETY: `put_bytes` filled the entire buffer. + unsafe { buf.set_len(capacity) }; + buf + } +} + +impl AlignedBufferMut { + /// Returns the total number of bytes the buffer can hold. + #[inline] + pub fn capacity(&self) -> usize { + self.raw.capacity() + } + + /// Returns the alignment of the buffer. + #[inline] + pub fn align(&self) -> usize { + self.raw.align() + } + + /// Returns the number of bytes in the buffer, also referred to as its 'length'. + #[inline] + pub fn len(&self) -> usize { + self.raw.len() + } + + /// Force the length of the buffer to `new_len`. + #[inline] + unsafe fn set_len(&mut self, new_len: usize) { + self.raw.set_len(new_len) + } + + #[inline] + fn as_ptr(&self) -> *const u8 { + self.raw.as_ptr() + } + + #[inline] + fn as_mut_ptr(&mut self) -> *mut u8 { + self.raw.as_mut_ptr() + } + + /// Extracts a slice containing the entire buffer. + /// + /// Equivalent to `&s[..]`. + #[inline] + fn as_slice(&self) -> &[u8] { + self.raw.as_slice() + } + + /// Extracts a mutable slice of the entire buffer. + /// + /// Equivalent to `&mut s[..]`. + fn as_mut_slice(&mut self) -> &mut [u8] { + self.raw.as_mut_slice() + } + + /// Drops the all the contents of the buffer, setting its length to `0`. + #[inline] + pub fn clear(&mut self) { + self.raw.clear() + } + + /// Reserves capacity for at least `additional` more bytes to be inserted + /// in the given `IoBufferMut`. The collection may reserve more space to + /// speculatively avoid frequent reallocations. After calling `reserve`, + /// capacity will be greater than or equal to `self.len() + additional`. + /// Does nothing if capacity is already sufficient. + /// + /// # Panics + /// + /// Panics if the new capacity exceeds `isize::MAX` _bytes_. + pub fn reserve(&mut self, additional: usize) { + self.raw.reserve(additional); + } + + /// Shortens the buffer, keeping the first len bytes. + pub fn truncate(&mut self, len: usize) { + self.raw.truncate(len); + } + + /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8]. + pub fn leak<'a>(self) -> &'a mut [u8] { + self.raw.leak() + } + + pub fn freeze(self) -> AlignedBuffer { + let len = self.len(); + AlignedBuffer::from_raw(self.raw, 0..len) + } +} + +impl Deref for AlignedBufferMut { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl DerefMut for AlignedBufferMut { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut_slice() + } +} + +impl AsRef<[u8]> for AlignedBufferMut { + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} + +impl AsMut<[u8]> for AlignedBufferMut { + fn as_mut(&mut self) -> &mut [u8] { + self.as_mut_slice() + } +} + +impl PartialEq<[u8]> for AlignedBufferMut { + fn eq(&self, other: &[u8]) -> bool { + self.as_slice().eq(other) + } +} + +/// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized. +unsafe impl bytes::BufMut for AlignedBufferMut { + #[inline] + fn remaining_mut(&self) -> usize { + // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`. + // Thus, it can have at most `self.capacity` bytes. + self.capacity() - self.len() + } + + // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized. + #[inline] + unsafe fn advance_mut(&mut self, cnt: usize) { + let len = self.len(); + let remaining = self.remaining_mut(); + + if remaining < cnt { + panic_advance(cnt, remaining); + } + + // Addition will not overflow since the sum is at most the capacity. + self.set_len(len + cnt); + } + + #[inline] + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + let cap = self.capacity(); + let len = self.len(); + + // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be + // valid for `cap - len` bytes. The subtraction will not underflow since + // `len <= cap`. + unsafe { + bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len) + } + } +} + +/// Panic with a nice error message. +#[cold] +fn panic_advance(idx: usize, len: usize) -> ! { + panic!( + "advance out of bounds: the len is {} but advancing by {}", + len, idx + ); +} + +/// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer, +/// and the underlying pointer remains stable while io-uring is owning the buffer. +/// The tokio-epoll-uring crate itself will not resize the buffer and will respect +/// [`tokio_epoll_uring::IoBuf::bytes_total`]. +unsafe impl tokio_epoll_uring::IoBuf for AlignedBufferMut { + fn stable_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn bytes_total(&self) -> usize { + self.capacity() + } +} + +// SAFETY: See above. +unsafe impl tokio_epoll_uring::IoBufMut for AlignedBufferMut { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.as_mut_ptr() + } + + unsafe fn set_init(&mut self, init_len: usize) { + if self.len() < init_len { + self.set_len(init_len); + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + const ALIGN: usize = 4 * 1024; + type TestIoBufferMut = AlignedBufferMut>; + + #[test] + fn test_with_capacity() { + let v = TestIoBufferMut::with_capacity(ALIGN * 4); + assert_eq!(v.len(), 0); + assert_eq!(v.capacity(), ALIGN * 4); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + + let v = TestIoBufferMut::with_capacity(ALIGN / 2); + assert_eq!(v.len(), 0); + assert_eq!(v.capacity(), ALIGN / 2); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + } + + #[test] + fn test_with_capacity_zeroed() { + let v = TestIoBufferMut::with_capacity_zeroed(ALIGN); + assert_eq!(v.len(), ALIGN); + assert_eq!(v.capacity(), ALIGN); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + assert_eq!(&v[..], &[0; ALIGN]) + } + + #[test] + fn test_reserve() { + use bytes::BufMut; + let mut v = TestIoBufferMut::with_capacity(ALIGN); + let capacity = v.capacity(); + v.reserve(capacity); + assert_eq!(v.capacity(), capacity); + let data = [b'a'; ALIGN]; + v.put(&data[..]); + v.reserve(capacity); + assert!(v.capacity() >= capacity * 2); + assert_eq!(&v[..], &data[..]); + let capacity = v.capacity(); + v.clear(); + v.reserve(capacity); + assert_eq!(capacity, v.capacity()); + } + + #[test] + fn test_bytes_put() { + use bytes::BufMut; + let mut v = TestIoBufferMut::with_capacity(ALIGN * 4); + let x = [b'a'; ALIGN]; + + for _ in 0..2 { + for _ in 0..4 { + v.put(&x[..]); + } + assert_eq!(v.len(), ALIGN * 4); + assert_eq!(v.capacity(), ALIGN * 4); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + v.clear() + } + assert_eq!(v.len(), 0); + assert_eq!(v.capacity(), ALIGN * 4); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + } + + #[test] + #[should_panic] + fn test_bytes_put_panic() { + use bytes::BufMut; + const ALIGN: usize = 4 * 1024; + let mut v = TestIoBufferMut::with_capacity(ALIGN * 4); + let x = [b'a'; ALIGN]; + for _ in 0..5 { + v.put_slice(&x[..]); + } + } + + #[test] + fn test_io_buf_put_slice() { + use tokio_epoll_uring::BoundedBufMut; + const ALIGN: usize = 4 * 1024; + let mut v = TestIoBufferMut::with_capacity(ALIGN); + let x = [b'a'; ALIGN]; + + for _ in 0..2 { + v.put_slice(&x[..]); + assert_eq!(v.len(), ALIGN); + assert_eq!(v.capacity(), ALIGN); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + v.clear() + } + assert_eq!(v.len(), 0); + assert_eq!(v.capacity(), ALIGN); + assert_eq!(v.align(), ALIGN); + assert_eq!(v.as_ptr().align_offset(ALIGN), 0); + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs new file mode 100644 index 000000000000..6c26dec0db22 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/raw.rs @@ -0,0 +1,216 @@ +use core::slice; +use std::{ + alloc::{self, Layout}, + cmp, + mem::ManuallyDrop, +}; + +use super::alignment::{Alignment, ConstAlign}; + +#[derive(Debug)] +struct AlignedBufferPtr(*mut u8); + +// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer. +unsafe impl Send for AlignedBufferPtr {} + +// SAFETY: We gurantees no one besides `IoBufferPtr` itself has the raw pointer. +unsafe impl Sync for AlignedBufferPtr {} + +/// An aligned buffer type. +#[derive(Debug)] +pub struct RawAlignedBuffer { + ptr: AlignedBufferPtr, + capacity: usize, + len: usize, + align: A, +} + +impl RawAlignedBuffer> { + /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment. + /// + /// The buffer will be able to hold at most `capacity` elements and will never resize. + /// + /// + /// # Panics + /// + /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met: + /// * `align` must not be zero, + /// + /// * `align` must be a power of two, + /// + /// * `capacity`, when rounded up to the nearest multiple of `align`, + /// must not overflow isize (i.e., the rounded value must be + /// less than or equal to `isize::MAX`). + pub fn with_capacity(capacity: usize) -> Self { + let align = ConstAlign::; + let layout = Layout::from_size_align(capacity, align.align()).expect("Invalid layout"); + + // SAFETY: Making an allocation with a sized and aligned layout. The memory is manually freed with the same layout. + let ptr = unsafe { + let ptr = alloc::alloc(layout); + if ptr.is_null() { + alloc::handle_alloc_error(layout); + } + AlignedBufferPtr(ptr) + }; + + RawAlignedBuffer { + ptr, + capacity, + len: 0, + align, + } + } +} + +impl RawAlignedBuffer { + /// Returns the total number of bytes the buffer can hold. + #[inline] + pub fn capacity(&self) -> usize { + self.capacity + } + + /// Returns the alignment of the buffer. + #[inline] + pub fn align(&self) -> usize { + self.align.align() + } + + /// Returns the number of bytes in the buffer, also referred to as its 'length'. + #[inline] + pub fn len(&self) -> usize { + self.len + } + + /// Force the length of the buffer to `new_len`. + #[inline] + pub unsafe fn set_len(&mut self, new_len: usize) { + debug_assert!(new_len <= self.capacity()); + self.len = new_len; + } + + #[inline] + pub fn as_ptr(&self) -> *const u8 { + self.ptr.0 + } + + #[inline] + pub fn as_mut_ptr(&mut self) -> *mut u8 { + self.ptr.0 + } + + /// Extracts a slice containing the entire buffer. + /// + /// Equivalent to `&s[..]`. + #[inline] + pub fn as_slice(&self) -> &[u8] { + // SAFETY: The pointer is valid and `len` bytes are initialized. + unsafe { slice::from_raw_parts(self.as_ptr(), self.len) } + } + + /// Extracts a mutable slice of the entire buffer. + /// + /// Equivalent to `&mut s[..]`. + pub fn as_mut_slice(&mut self) -> &mut [u8] { + // SAFETY: The pointer is valid and `len` bytes are initialized. + unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) } + } + + /// Drops the all the contents of the buffer, setting its length to `0`. + #[inline] + pub fn clear(&mut self) { + self.len = 0; + } + + /// Reserves capacity for at least `additional` more bytes to be inserted + /// in the given `IoBufferMut`. The collection may reserve more space to + /// speculatively avoid frequent reallocations. After calling `reserve`, + /// capacity will be greater than or equal to `self.len() + additional`. + /// Does nothing if capacity is already sufficient. + /// + /// # Panics + /// + /// Panics if the new capacity exceeds `isize::MAX` _bytes_. + pub fn reserve(&mut self, additional: usize) { + if additional > self.capacity() - self.len() { + self.reserve_inner(additional); + } + } + + fn reserve_inner(&mut self, additional: usize) { + let Some(required_cap) = self.len().checked_add(additional) else { + capacity_overflow() + }; + + let old_capacity = self.capacity(); + let align = self.align(); + // This guarantees exponential growth. The doubling cannot overflow + // because `cap <= isize::MAX` and the type of `cap` is `usize`. + let cap = cmp::max(old_capacity * 2, required_cap); + + if !is_valid_alloc(cap) { + capacity_overflow() + } + let new_layout = Layout::from_size_align(cap, self.align()).expect("Invalid layout"); + + let old_ptr = self.as_mut_ptr(); + + // SAFETY: old allocation was allocated with std::alloc::alloc with the same layout, + // and we panics on null pointer. + let (ptr, cap) = unsafe { + let old_layout = Layout::from_size_align_unchecked(old_capacity, align); + let ptr = alloc::realloc(old_ptr, old_layout, new_layout.size()); + if ptr.is_null() { + alloc::handle_alloc_error(new_layout); + } + (AlignedBufferPtr(ptr), cap) + }; + + self.ptr = ptr; + self.capacity = cap; + } + + /// Shortens the buffer, keeping the first len bytes. + pub fn truncate(&mut self, len: usize) { + if len > self.len { + return; + } + self.len = len; + } + + /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8]. + pub fn leak<'a>(self) -> &'a mut [u8] { + let mut buf = ManuallyDrop::new(self); + // SAFETY: leaking the buffer as intended. + unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len) } + } +} + +fn capacity_overflow() -> ! { + panic!("capacity overflow") +} + +// We need to guarantee the following: +// * We don't ever allocate `> isize::MAX` byte-size objects. +// * We don't overflow `usize::MAX` and actually allocate too little. +// +// On 64-bit we just need to check for overflow since trying to allocate +// `> isize::MAX` bytes will surely fail. On 32-bit and 16-bit we need to add +// an extra guard for this in case we're running on a platform which can use +// all 4GB in user-space, e.g., PAE or x32. +#[inline] +fn is_valid_alloc(alloc_size: usize) -> bool { + !(usize::BITS < 64 && alloc_size > isize::MAX as usize) +} + +impl Drop for RawAlignedBuffer { + fn drop(&mut self) { + // SAFETY: memory was allocated with std::alloc::alloc with the same layout. + unsafe { + alloc::dealloc( + self.as_mut_ptr(), + Layout::from_size_align_unchecked(self.capacity, self.align.align()), + ) + } + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/slice.rs b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/slice.rs new file mode 100644 index 000000000000..6cecf34c1cd5 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/aligned_buffer/slice.rs @@ -0,0 +1,40 @@ +use std::ops::{Deref, DerefMut}; + +use super::alignment::{Alignment, ConstAlign}; + +/// Newtype for an aligned slice. +pub struct AlignedSlice<'a, const N: usize, A: Alignment> { + /// underlying byte slice + buf: &'a mut [u8; N], + /// alignment marker + _align: A, +} + +impl<'a, const N: usize, const A: usize> AlignedSlice<'a, N, ConstAlign> { + /// Create a new aligned slice from a mutable byte slice. The input must already satisify the alignment. + pub unsafe fn new_unchecked(buf: &'a mut [u8; N]) -> Self { + let _align = ConstAlign::; + assert_eq!(buf.as_ptr().align_offset(_align.align()), 0); + AlignedSlice { buf, _align } + } +} + +impl<'a, const N: usize, A: Alignment> Deref for AlignedSlice<'a, N, A> { + type Target = [u8; N]; + + fn deref(&self) -> &Self::Target { + self.buf + } +} + +impl<'a, const N: usize, A: Alignment> DerefMut for AlignedSlice<'a, N, A> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.buf + } +} + +impl<'a, const N: usize, A: Alignment> AsRef<[u8; N]> for AlignedSlice<'a, N, A> { + fn as_ref(&self) -> &[u8; N] { + self.buf + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs new file mode 100644 index 000000000000..dba695196ebb --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_aligned.rs @@ -0,0 +1,9 @@ +use tokio_epoll_uring::IoBufMut; + +use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf}; + +pub trait IoBufAlignedMut: IoBufMut {} + +impl IoBufAlignedMut for IoBufferMut {} + +impl IoBufAlignedMut for PageWriteGuardBuf {} diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs index 7c773b6b2103..c3940cf6cea2 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -1,5 +1,6 @@ //! See [`FullSlice`]. +use crate::virtual_file::{IoBuffer, IoBufferMut}; use bytes::{Bytes, BytesMut}; use std::ops::{Deref, Range}; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; @@ -76,3 +77,5 @@ macro_rules! impl_io_buf_ext { impl_io_buf_ext!(Bytes); impl_io_buf_ext!(BytesMut); impl_io_buf_ext!(Vec); +impl_io_buf_ext!(IoBufferMut); +impl_io_buf_ext!(IoBuffer);