diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index 5e9fff0e6a18..a8526fc6b1f3 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -217,7 +217,9 @@ jobs: ${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_FEATURES for io_engine in std-fs tokio-epoll-uring ; do - NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES + for io_buffer_alignment in 0 1 512 ; do + NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT=$io_buffer_alignment ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES + done done # Run separate tests for real S3 diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index bd99f5289dac..f450f46efac5 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use camino::Utf8PathBuf; use criterion::{criterion_group, criterion_main, Criterion}; use pageserver::{ - config::PageServerConf, + config::{defaults::DEFAULT_IO_BUFFER_ALIGNMENT, PageServerConf}, context::{DownloadBehavior, RequestContext}, l0_flush::{L0FlushConfig, L0FlushGlobalState}, page_cache, @@ -164,7 +164,11 @@ fn criterion_benchmark(c: &mut Criterion) { let conf: &'static PageServerConf = Box::leak(Box::new( pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()), )); - virtual_file::init(16384, virtual_file::io_engine_for_bench()); + virtual_file::init( + 16384, + virtual_file::io_engine_for_bench(), + DEFAULT_IO_BUFFER_ALIGNMENT, + ); page_cache::init(conf.page_cache_size); { diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index ac3ff1bb896a..71d36f31131a 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -506,6 +506,16 @@ impl Client { .map_err(Error::ReceiveBody) } + /// Configs io buffer alignment at runtime. + pub async fn put_io_alignment(&self, align: usize) -> Result<()> { + let uri = format!("{}/v1/io_alignment", self.mgmt_api_endpoint); + self.request(Method::PUT, uri, align) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } + pub async fn get_utilization(&self) -> Result { let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint); self.get(uri) diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index b4bb239f4469..8092c203c320 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -4,6 +4,7 @@ use anyhow::Result; use camino::{Utf8Path, Utf8PathBuf}; +use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use pageserver::context::{DownloadBehavior, RequestContext}; use pageserver::task_mgr::TaskKind; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; @@ -144,7 +145,11 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. - pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + DEFAULT_IO_BUFFER_ALIGNMENT, + ); pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 3611b0baab2c..a183a3968d90 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use camino::{Utf8Path, Utf8PathBuf}; use clap::Subcommand; +use pageserver::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use pageserver::context::{DownloadBehavior, RequestContext}; use pageserver::task_mgr::TaskKind; use pageserver::tenant::block_io::BlockCursor; @@ -59,7 +60,7 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); - virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs, 1); page_cache::init(100); let file = VirtualFile::open(path, ctx).await?; let file_id = page_cache::next_file_id(); @@ -189,7 +190,11 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { new_tenant_id, new_timeline_id, } => { - pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + DEFAULT_IO_BUFFER_ALIGNMENT, + ); pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 3fabf629875e..7a6c7675bbf6 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -20,6 +20,7 @@ use clap::{Parser, Subcommand}; use index_part::IndexPartCmd; use layers::LayerCmd; use pageserver::{ + config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT, context::{DownloadBehavior, RequestContext}, page_cache, task_mgr::TaskKind, @@ -205,7 +206,11 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup - virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + virtual_file::init( + 10, + virtual_file::api::IoEngineKind::StdFs, + DEFAULT_IO_BUFFER_ALIGNMENT, + ); page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 4992f37465c7..ac4a732377b1 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -58,6 +58,11 @@ pub(crate) struct Args { /// [`pageserver_api::models::virtual_file::IoEngineKind`]. #[clap(long)] set_io_engine: Option, + + /// Before starting the benchmark, live-reconfigure the pageserver to use specified alignment for io buffers. + #[clap(long)] + set_io_alignment: Option, + targets: Option>, } @@ -124,6 +129,10 @@ async fn main_impl( mgmt_api_client.put_io_engine(engine_str).await?; } + if let Some(align) = args.set_io_alignment { + mgmt_api_client.put_io_alignment(align).await?; + } + // discover targets let timelines: Vec = crate::util::cli::targets::discover( &mgmt_api_client, diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 7d404e50a543..850bd87b9587 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -125,6 +125,7 @@ fn main() -> anyhow::Result<()> { info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings"); info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access"); + info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment"); // The tenants directory contains all the pageserver local disk state. // Create if not exists and make sure all the contents are durable before proceeding. @@ -182,7 +183,11 @@ fn main() -> anyhow::Result<()> { let scenario = failpoint_support::init(); // Basic initialization of things that don't change after startup - virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine); + virtual_file::init( + conf.max_file_descriptors, + conf.virtual_file_io_engine, + conf.io_buffer_alignment, + ); page_cache::init(conf.page_cache_size); start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 0ebaf788400c..ae473bcc5fcd 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -95,6 +95,8 @@ pub mod defaults { pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; + pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 0; + /// /// Default built-in configuration file. /// @@ -289,6 +291,8 @@ pub struct PageServerConf { /// Direct IO settings pub virtual_file_direct_io: virtual_file::DirectIoMode, + + pub io_buffer_alignment: usize, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -393,6 +397,8 @@ struct PageServerConfigBuilder { compact_level0_phase1_value_access: BuilderValue, virtual_file_direct_io: BuilderValue, + + io_buffer_alignment: BuilderValue, } impl PageServerConfigBuilder { @@ -481,6 +487,7 @@ impl PageServerConfigBuilder { l0_flush: Set(L0FlushConfig::default()), compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()), virtual_file_direct_io: Set(virtual_file::DirectIoMode::default()), + io_buffer_alignment: Set(DEFAULT_IO_BUFFER_ALIGNMENT), } } } @@ -660,6 +667,10 @@ impl PageServerConfigBuilder { self.virtual_file_direct_io = BuilderValue::Set(value); } + pub fn io_buffer_alignment(&mut self, value: usize) { + self.io_buffer_alignment = BuilderValue::Set(value); + } + pub fn build(self, id: NodeId) -> anyhow::Result { let default = Self::default_values(); @@ -716,6 +727,7 @@ impl PageServerConfigBuilder { l0_flush, compact_level0_phase1_value_access, virtual_file_direct_io, + io_buffer_alignment, } CUSTOM LOGIC { @@ -985,6 +997,9 @@ impl PageServerConf { "virtual_file_direct_io" => { builder.virtual_file_direct_io(utils::toml_edit_ext::deserialize_item(item).context("virtual_file_direct_io")?) } + "io_buffer_alignment" => { + builder.io_buffer_alignment(parse_toml_u64("io_buffer_alignment", item)? as usize) + } _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -1068,6 +1083,7 @@ impl PageServerConf { l0_flush: L0FlushConfig::default(), compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(), virtual_file_direct_io: virtual_file::DirectIoMode::default(), + io_buffer_alignment: defaults::DEFAULT_IO_BUFFER_ALIGNMENT, } } } @@ -1308,6 +1324,7 @@ background_task_maximum_delay = '334 s' l0_flush: L0FlushConfig::default(), compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(), virtual_file_direct_io: virtual_file::DirectIoMode::default(), + io_buffer_alignment: defaults::DEFAULT_IO_BUFFER_ALIGNMENT, }, "Correct defaults should be used when no config values are provided" ); @@ -1381,6 +1398,7 @@ background_task_maximum_delay = '334 s' l0_flush: L0FlushConfig::default(), compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(), virtual_file_direct_io: virtual_file::DirectIoMode::default(), + io_buffer_alignment: defaults::DEFAULT_IO_BUFFER_ALIGNMENT, }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index cbcc162b325f..a126136d20c5 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2344,6 +2344,20 @@ async fn put_io_engine_handler( json_response(StatusCode::OK, ()) } +async fn put_io_alignment_handler( + mut r: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + check_permission(&r, None)?; + let align: usize = json_request(&mut r).await?; + crate::virtual_file::set_io_buffer_alignment(align).map_err(|align| { + ApiError::PreconditionFailed( + format!("Requested io alignment ({align}) is not a power of two").into(), + ) + })?; + json_response(StatusCode::OK, ()) +} + /// Polled by control plane. /// /// See [`crate::utilization`]. @@ -3031,6 +3045,9 @@ pub fn make_router( |r| api_handler(r, timeline_collect_keyspace), ) .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler)) + .put("/v1/io_alignment", |r| { + api_handler(r, put_io_alignment_handler) + }) .put( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch", |r| api_handler(r, force_aux_policy_switch_handler), diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index f4a29579723d..c0508e13c05b 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -40,7 +40,7 @@ use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, - VectoredReadPlanner, + VectoredReadCoalesceMode, VectoredReadPlanner, }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; @@ -1205,6 +1205,7 @@ impl DeltaLayerInner { let mut prev: Option<(Key, Lsn, BlobRef)> = None; let mut read_builder: Option = None; + let read_mode = VectoredReadCoalesceMode::get(); let max_read_size = self .max_vectored_read_bytes @@ -1253,6 +1254,7 @@ impl DeltaLayerInner { offsets.end.pos(), meta, max_read_size, + read_mode, )) } } else { @@ -2281,7 +2283,7 @@ pub(crate) mod test { .await .unwrap(); let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap(); - for max_read_size in [1, 1024] { + for max_read_size in [1, 2048] { for batch_size in [1, 2, 4, 8, 3, 7, 13] { println!("running with batch_size={batch_size} max_read_size={max_read_size}"); // Test if the batch size is correctly determined diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 3cb2b1c83a6a..38411e9d9e46 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -1367,7 +1367,7 @@ mod test { .await .unwrap(); let img_layer = resident_layer.get_as_image(&ctx).await.unwrap(); - for max_read_size in [1, 1024] { + for max_read_size in [1, 2048] { for batch_size in [1, 2, 4, 8, 3, 7, 13] { println!("running with batch_size={batch_size} max_read_size={max_read_size}"); // Test if the batch size is correctly determined diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 54a3ad789b9f..80bc56092dbf 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -25,9 +25,10 @@ use tokio_epoll_uring::BoundedBuf; use utils::lsn::Lsn; use utils::vec_map::VecMap; +use crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use crate::context::RequestContext; use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK}; -use crate::virtual_file::VirtualFile; +use crate::virtual_file::{self, VirtualFile}; #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct MaxVectoredReadBytes(pub NonZeroUsize); @@ -60,7 +61,7 @@ pub struct VectoredBlobsBuf { pub struct VectoredRead { pub start: u64, pub end: u64, - /// Starting offsets and metadata for each blob in this read + /// Start offset and metadata for each blob in this read pub blobs_at: VecMap, } @@ -76,14 +77,109 @@ pub(crate) enum VectoredReadExtended { No, } -pub(crate) struct VectoredReadBuilder { +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum VectoredReadCoalesceMode { + /// Only coalesce exactly adjacent reads. + AdjacentOnly, + /// In addition to adjacent reads, also consider reads whose corresponding + /// `end` and `start` offsets reside at the same chunk. + Chunked(usize), +} + +impl VectoredReadCoalesceMode { + /// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0, + /// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher. + pub(crate) fn get() -> Self { + let align = virtual_file::get_io_buffer_alignment_raw(); + if align == DEFAULT_IO_BUFFER_ALIGNMENT { + VectoredReadCoalesceMode::AdjacentOnly + } else { + VectoredReadCoalesceMode::Chunked(align) + } + } +} + +pub(crate) enum VectoredReadBuilder { + Adjacent(AdjacentVectoredReadBuilder), + Chunked(ChunkedVectoredReadBuilder), +} + +impl VectoredReadBuilder { + fn new_impl( + start_offset: u64, + end_offset: u64, + meta: BlobMeta, + max_read_size: Option, + mode: VectoredReadCoalesceMode, + ) -> Self { + match mode { + VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent( + AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size), + ), + VectoredReadCoalesceMode::Chunked(chunk_size) => { + Self::Chunked(ChunkedVectoredReadBuilder::new( + start_offset, + end_offset, + meta, + max_read_size, + chunk_size, + )) + } + } + } + + pub(crate) fn new( + start_offset: u64, + end_offset: u64, + meta: BlobMeta, + max_read_size: usize, + mode: VectoredReadCoalesceMode, + ) -> Self { + Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode) + } + + pub(crate) fn new_streaming( + start_offset: u64, + end_offset: u64, + meta: BlobMeta, + mode: VectoredReadCoalesceMode, + ) -> Self { + Self::new_impl(start_offset, end_offset, meta, None, mode) + } + + pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended { + match self { + VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta), + VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta), + } + } + + pub(crate) fn build(self) -> VectoredRead { + match self { + VectoredReadBuilder::Adjacent(builder) => builder.build(), + VectoredReadBuilder::Chunked(builder) => builder.build(), + } + } + + pub(crate) fn size(&self) -> usize { + match self { + VectoredReadBuilder::Adjacent(builder) => builder.size(), + VectoredReadBuilder::Chunked(builder) => builder.size(), + } + } +} + +pub(crate) struct AdjacentVectoredReadBuilder { + /// Start offset of the read. start: u64, + // End offset of the read. end: u64, + /// Start offset and metadata for each blob in this read blobs_at: VecMap, max_read_size: Option, } -impl VectoredReadBuilder { +impl AdjacentVectoredReadBuilder { /// Start building a new vectored read. /// /// Note that by design, this does not check against reading more than `max_read_size` to @@ -93,7 +189,7 @@ impl VectoredReadBuilder { start_offset: u64, end_offset: u64, meta: BlobMeta, - max_read_size: usize, + max_read_size: Option, ) -> Self { let mut blobs_at = VecMap::default(); blobs_at @@ -104,7 +200,7 @@ impl VectoredReadBuilder { start: start_offset, end: end_offset, blobs_at, - max_read_size: Some(max_read_size), + max_read_size, } } /// Attempt to extend the current read with a new blob if the start @@ -113,13 +209,15 @@ impl VectoredReadBuilder { pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended { tracing::trace!(start, end, "trying to extend"); let size = (end - start) as usize; - if self.end == start && { + let not_limited_by_max_read_size = { if let Some(max_read_size) = self.max_read_size { self.size() + size <= max_read_size } else { true } - } { + }; + + if self.end == start && not_limited_by_max_read_size { self.end = end; self.blobs_at .append(start, meta) @@ -144,6 +242,107 @@ impl VectoredReadBuilder { } } +pub(crate) struct ChunkedVectoredReadBuilder { + /// Start block number + start_blk_no: usize, + /// End block number (exclusive). + end_blk_no: usize, + /// Start offset and metadata for each blob in this read + blobs_at: VecMap, + max_read_size: Option, + /// Chunk size reads are coalesced into. + chunk_size: usize, +} + +/// Computes x / d rounded up. +fn div_round_up(x: usize, d: usize) -> usize { + (x + (d - 1)) / d +} + +impl ChunkedVectoredReadBuilder { + /// Start building a new vectored read. + /// + /// Note that by design, this does not check against reading more than `max_read_size` to + /// support reading larger blobs than the configuration value. The builder will be single use + /// however after that. + pub(crate) fn new( + start_offset: u64, + end_offset: u64, + meta: BlobMeta, + max_read_size: Option, + chunk_size: usize, + ) -> Self { + let mut blobs_at = VecMap::default(); + blobs_at + .append(start_offset, meta) + .expect("First insertion always succeeds"); + + let start_blk_no = start_offset as usize / chunk_size; + let end_blk_no = div_round_up(end_offset as usize, chunk_size); + Self { + start_blk_no, + end_blk_no, + blobs_at, + max_read_size, + chunk_size, + } + } + + /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk. + /// + /// The resulting size also must be below the max read size. + pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended { + tracing::trace!(start, end, "trying to extend"); + let start_blk_no = start as usize / self.chunk_size; + let end_blk_no = div_round_up(end as usize, self.chunk_size); + + let not_limited_by_max_read_size = { + if let Some(max_read_size) = self.max_read_size { + let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size; + coalesced_size <= max_read_size + } else { + true + } + }; + + // True if the second block starts in the same block or the immediate next block where the first block ended. + // + // Note: This automatically handles the case where two blocks are adjacent to each other, + // whether they starts on chunk size boundary or not. + let is_adjacent_chunk_read = { + // 1. first.end & second.start are in the same block + self.end_blk_no == start_blk_no + 1 || + // 2. first.end ends one block before second.start + self.end_blk_no == start_blk_no + }; + + if is_adjacent_chunk_read && not_limited_by_max_read_size { + self.end_blk_no = end_blk_no; + self.blobs_at + .append(start, meta) + .expect("LSNs are ordered within vectored reads"); + + return VectoredReadExtended::Yes; + } + + VectoredReadExtended::No + } + + pub(crate) fn size(&self) -> usize { + (self.end_blk_no - self.start_blk_no) * self.chunk_size + } + + pub(crate) fn build(self) -> VectoredRead { + let start = (self.start_blk_no * self.chunk_size) as u64; + let end = (self.end_blk_no * self.chunk_size) as u64; + VectoredRead { + start, + end, + blobs_at: self.blobs_at, + } + } +} + #[derive(Copy, Clone, Debug)] pub enum BlobFlag { None, @@ -166,14 +365,18 @@ pub struct VectoredReadPlanner { prev: Option<(Key, Lsn, u64, BlobFlag)>, max_read_size: usize, + + mode: VectoredReadCoalesceMode, } impl VectoredReadPlanner { pub fn new(max_read_size: usize) -> Self { + let mode = VectoredReadCoalesceMode::get(); Self { blobs: BTreeMap::new(), prev: None, max_read_size, + mode, } } @@ -252,6 +455,7 @@ impl VectoredReadPlanner { end_offset, BlobMeta { key, lsn }, self.max_read_size, + self.mode, ); let prev_read_builder = current_read_builder.replace(next_read_builder); @@ -303,6 +507,18 @@ impl<'a> VectoredBlobReader<'a> { read.size(), buf.capacity() ); + + if cfg!(debug_assertions) { + let align = virtual_file::get_io_buffer_alignment() as u64; + debug_assert_eq!( + read.start % align, + 0, + "Read start at {} does not satisfy the required io buffer alignment ({} bytes)", + read.start, + align + ); + } + let mut buf = self .file .read_exact_at(buf.slice(0..read.size()), read.start, ctx) @@ -310,27 +526,20 @@ impl<'a> VectoredBlobReader<'a> { .into_inner(); let blobs_at = read.blobs_at.as_slice(); - let start_offset = blobs_at.first().expect("VectoredRead is never empty").0; - let mut metas = Vec::with_capacity(blobs_at.len()); + let start_offset = read.start; + let mut metas = Vec::with_capacity(blobs_at.len()); // Blobs in `read` only provide their starting offset. The end offset // of a blob is implicit: the start of the next blob if one exists // or the end of the read. - let pairs = blobs_at.iter().zip( - blobs_at - .iter() - .map(Some) - .skip(1) - .chain(std::iter::once(None)), - ); // Some scratch space, put here for reusing the allocation let mut decompressed_vec = Vec::new(); - for ((offset, meta), next) in pairs { - let offset_in_buf = offset - start_offset; - let first_len_byte = buf[offset_in_buf as usize]; + for (blob_start, meta) in blobs_at { + let blob_start_in_buf = blob_start - start_offset; + let first_len_byte = buf[blob_start_in_buf as usize]; // Each blob is prefixed by a header containing its size and compression information. // Extract the size and skip that header to find the start of the data. @@ -340,7 +549,7 @@ impl<'a> VectoredBlobReader<'a> { (1, first_len_byte as u64, BYTE_UNCOMPRESSED) } else { let mut blob_size_buf = [0u8; 4]; - let offset_in_buf = offset_in_buf as usize; + let offset_in_buf = blob_start_in_buf as usize; blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]); blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK; @@ -353,12 +562,8 @@ impl<'a> VectoredBlobReader<'a> { ) }; - let start_raw = offset_in_buf + size_length; - let end_raw = match next { - Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset, - None => start_raw + blob_size, - }; - assert_eq!(end_raw - start_raw, blob_size); + let start_raw = blob_start_in_buf + size_length; + let end_raw = start_raw + blob_size; let (start, end); if compression_bits == BYTE_UNCOMPRESSED { start = start_raw as usize; @@ -407,18 +612,22 @@ pub struct StreamingVectoredReadPlanner { max_cnt: usize, /// Size of the current batch cnt: usize, + + mode: VectoredReadCoalesceMode, } impl StreamingVectoredReadPlanner { pub fn new(max_read_size: u64, max_cnt: usize) -> Self { assert!(max_cnt > 0); assert!(max_read_size > 0); + let mode = VectoredReadCoalesceMode::get(); Self { read_builder: None, prev: None, max_cnt, max_read_size, cnt: 0, + mode, } } @@ -467,17 +676,12 @@ impl StreamingVectoredReadPlanner { } None => { self.read_builder = { - let mut blobs_at = VecMap::default(); - blobs_at - .append(start_offset, BlobMeta { key, lsn }) - .expect("First insertion always succeeds"); - - Some(VectoredReadBuilder { - start: start_offset, - end: end_offset, - blobs_at, - max_read_size: None, - }) + Some(VectoredReadBuilder::new_streaming( + start_offset, + end_offset, + BlobMeta { key, lsn }, + self.mode, + )) }; } } @@ -511,7 +715,9 @@ mod tests { use super::*; fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) { - assert_eq!(read.start, offset_range.first().unwrap().2); + let align = virtual_file::get_io_buffer_alignment() as u64; + assert_eq!(read.start % align, 0); + assert_eq!(read.start / align, offset_range.first().unwrap().2 / align); let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect(); @@ -525,6 +731,63 @@ mod tests { assert_eq!(expected_offsets_in_read, offsets_in_read); } + #[test] + fn planner_chunked_coalesce_all_test() { + use crate::virtual_file; + + const CHUNK_SIZE: u64 = 512; + virtual_file::set_io_buffer_alignment(CHUNK_SIZE as usize).unwrap(); + let max_read_size = CHUNK_SIZE as usize * 8; + let key = Key::MIN; + let lsn = Lsn(0); + + let blob_descriptions = [ + (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN + (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap + (key, lsn, CHUNK_SIZE / 2, BlobFlag::None), + (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap + (key, lsn, CHUNK_SIZE, BlobFlag::None), + (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None), + (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap + (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None), + (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None), + (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce. + (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None), + (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size) + (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk + (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce) + ]; + + let ranges = [ + &[ + blob_descriptions[0], + blob_descriptions[2], + blob_descriptions[4], + blob_descriptions[5], + blob_descriptions[7], + blob_descriptions[8], + blob_descriptions[10], + ], + &blob_descriptions[11..12], + &blob_descriptions[13..], + ]; + + let mut planner = VectoredReadPlanner::new(max_read_size); + for (key, lsn, offset, flag) in blob_descriptions { + planner.handle(key, lsn, offset, flag); + } + + planner.handle_range_end(652 * 1024); + + let reads = planner.finish(); + + assert_eq!(reads.len(), ranges.len()); + + for (idx, read) in reads.iter().enumerate() { + validate_read(read, ranges[idx]); + } + } + #[test] fn planner_max_read_size_test() { let max_read_size = 128 * 1024; @@ -737,6 +1000,7 @@ mod tests { let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16; let mut buf = BytesMut::with_capacity(reserved_bytes); + let mode = VectoredReadCoalesceMode::get(); let vectored_blob_reader = VectoredBlobReader::new(&file); let meta = BlobMeta { key: Key::MIN, @@ -748,7 +1012,7 @@ mod tests { if idx + 1 == offsets.len() { continue; } - let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096); + let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode); let read = read_builder.build(); let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?; assert_eq!(result.blobs.len(), 1); @@ -784,4 +1048,12 @@ mod tests { round_trip_test_compressed(&blobs, true).await?; Ok(()) } + + #[test] + fn test_div_round_up() { + const CHUNK_SIZE: usize = 512; + assert_eq!(1, div_round_up(200, CHUNK_SIZE)); + assert_eq!(1, div_round_up(CHUNK_SIZE, CHUNK_SIZE)); + assert_eq!(2, div_round_up(CHUNK_SIZE + 1, CHUNK_SIZE)); + } } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index c0017280fdef..4b11dc1a947e 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -10,6 +10,7 @@ //! This is similar to PostgreSQL's virtual file descriptor facility in //! src/backend/storage/file/fd.c //! +use crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; use crate::context::RequestContext; use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; @@ -1140,10 +1141,13 @@ impl OpenFiles { /// server startup. /// #[cfg(not(test))] -pub fn init(num_slots: usize, engine: IoEngineKind) { +pub fn init(num_slots: usize, engine: IoEngineKind, io_buffer_alignment: usize) { if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() { panic!("virtual_file::init called twice"); } + if set_io_buffer_alignment(io_buffer_alignment).is_err() { + panic!("IO buffer alignment ({io_buffer_alignment}) is not a power of two"); + } io_engine::init(engine); crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64); } @@ -1167,6 +1171,61 @@ fn get_open_files() -> &'static OpenFiles { } } +static IO_BUFFER_ALIGNMENT: AtomicUsize = AtomicUsize::new(DEFAULT_IO_BUFFER_ALIGNMENT); + +/// Returns true if `x` is zero or a power of two. +fn is_zero_or_power_of_two(x: usize) -> bool { + (x == 0) || ((x & (x - 1)) == 0) +} + +#[allow(unused)] +pub(crate) fn set_io_buffer_alignment(align: usize) -> Result<(), usize> { + if is_zero_or_power_of_two(align) { + IO_BUFFER_ALIGNMENT.store(align, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } else { + Err(align) + } +} + +/// Gets the io buffer alignment requirement. Returns 0 if there is no requirement specified. +/// +/// This function should be used to check the raw config value. +pub(crate) fn get_io_buffer_alignment_raw() -> usize { + let align = IO_BUFFER_ALIGNMENT.load(std::sync::atomic::Ordering::Relaxed); + + if cfg!(test) { + let env_var_name = "NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT"; + if align == DEFAULT_IO_BUFFER_ALIGNMENT { + if let Some(test_align) = utils::env::var(env_var_name) { + if is_zero_or_power_of_two(test_align) { + test_align + } else { + panic!("IO buffer alignment ({test_align}) is not a power of two"); + } + } else { + crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT + } + } else { + align + } + } else { + align + } +} + +/// Gets the io buffer alignment requirement. Returns 1 if the alignment config is set to zero. +/// +/// This function should be used for getting the actual alignment value to use. +pub(crate) fn get_io_buffer_alignment() -> usize { + let align = get_io_buffer_alignment_raw(); + if align == DEFAULT_IO_BUFFER_ALIGNMENT { + 1 + } else { + align + } +} + #[cfg(test)] mod tests { use crate::context::DownloadBehavior; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 92febfec9b4e..69a42346178d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -496,6 +496,7 @@ def __init__( pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None, safekeeper_extra_opts: Optional[list[str]] = None, storage_controller_port_override: Optional[int] = None, + pageserver_io_buffer_alignment: Optional[int] = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -550,6 +551,8 @@ def __init__( self.storage_controller_port_override = storage_controller_port_override + self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -1123,6 +1126,7 @@ def __init__(self, config: NeonEnvBuilder): self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine self.pageserver_aux_file_policy = config.pageserver_aux_file_policy + self.pageserver_io_buffer_alignment = config.pageserver_io_buffer_alignment # Create the neon_local's `NeonLocalInitConf` cfg: Dict[str, Any] = { @@ -1184,6 +1188,8 @@ def __init__(self, config: NeonEnvBuilder): for key, value in override.items(): ps_cfg[key] = value + ps_cfg["io_buffer_alignment"] = self.pageserver_io_buffer_alignment + # Create a corresponding NeonPageserver object self.pageservers.append( NeonPageserver( @@ -1425,6 +1431,7 @@ def _shared_simple_env( pageserver_virtual_file_io_engine: str, pageserver_aux_file_policy: Optional[AuxFileStore], pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]], + pageserver_io_buffer_alignment: Optional[int], ) -> Iterator[NeonEnv]: """ # Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES @@ -1457,6 +1464,7 @@ def _shared_simple_env( pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, pageserver_aux_file_policy=pageserver_aux_file_policy, pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm, + pageserver_io_buffer_alignment=pageserver_io_buffer_alignment, ) as builder: env = builder.init_start() @@ -1499,6 +1507,7 @@ def neon_env_builder( pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]], pageserver_aux_file_policy: Optional[AuxFileStore], record_property: Callable[[str, object], None], + pageserver_io_buffer_alignment: Optional[int], ) -> Iterator[NeonEnvBuilder]: """ Fixture to create a Neon environment for test. @@ -1534,6 +1543,7 @@ def neon_env_builder( test_overlay_dir=test_overlay_dir, pageserver_aux_file_policy=pageserver_aux_file_policy, pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm, + pageserver_io_buffer_alignment=pageserver_io_buffer_alignment, ) as builder: yield builder # Propogate `preserve_database_files` to make it possible to use in other fixtures, diff --git a/test_runner/fixtures/parametrize.py b/test_runner/fixtures/parametrize.py index 92c98763e30a..e2dd51802ce7 100644 --- a/test_runner/fixtures/parametrize.py +++ b/test_runner/fixtures/parametrize.py @@ -34,6 +34,11 @@ def pageserver_virtual_file_io_engine() -> Optional[str]: return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE") +@pytest.fixture(scope="function", autouse=True) +def pageserver_io_buffer_alignment() -> Optional[int]: + return None + + @pytest.fixture(scope="function", autouse=True) def pageserver_aux_file_policy() -> Optional[AuxFileStore]: return None