From 9627747d35dd9a5b7ceec099eb8f9604a95408dc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 28 Aug 2024 20:31:41 +0200 Subject: [PATCH] bypass `PageCache` for `InMemoryLayer` + avoid `Value::deser` on L0 flush (#8537) Part of [Epic: Bypass PageCache for user data blocks](https://github.com/neondatabase/neon/issues/7386). # Problem `InMemoryLayer` still uses the `PageCache` for all data stored in the `VirtualFile` that underlies the `EphemeralFile`. # Background Before this PR, `EphemeralFile` is a fancy and (code-bloated) buffered writer around a `VirtualFile` that supports `blob_io`. The `InMemoryLayerInner::index` stores offsets into the `EphemeralFile`. At those offset, we find a varint length followed by the serialized `Value`. Vectored reads (`get_values_reconstruct_data`) are not in fact vectored - each `Value` that needs to be read is read sequentially. The `will_init` bit of information which we use to early-exit the `get_values_reconstruct_data` for a given key is stored in the serialized `Value`, meaning we have to read & deserialize the `Value` from the `EphemeralFile`. The L0 flushing **also** needs to re-determine the `will_init` bit of information, by deserializing each value during L0 flush. # Changes 1. Store the value length and `will_init` information in the `InMemoryLayer::index`. The `EphemeralFile` thus only needs to store the values. 2. For `get_values_reconstruct_data`: - Use the in-memory `index` figures out which values need to be read. Having the `will_init` stored in the index enables us to do that. - View the EphemeralFile as a byte array of "DIO chunks", each 512 bytes in size (adjustable constant). A "DIO chunk" is the minimal unit that we can read under direct IO. - Figure out which chunks need to be read to retrieve the serialized bytes for thes values we need to read. - Coalesce chunk reads such that each DIO chunk is only read once to serve all value reads that need data from that chunk. - Merge adjacent chunk reads into larger `EphemeralFile::read_exact_at_eof_ok` of up to 128k (adjustable constant). 3. The new `EphemeralFile::read_exact_at_eof_ok` fills the IO buffer from the underlying VirtualFile and/or its in-memory buffer. 4. The L0 flush code is changed to use the `index` directly, `blob_io` 5. We can remove the `ephemeral_file::page_caching` construct now. The `get_values_reconstruct_data` changes seem like a bit overkill but they are necessary so we issue the equivalent amount of read system calls compared to before this PR where it was highly likely that even if the first PageCache access was a miss, remaining reads within the same `get_values_reconstruct_data` call from the same `EphemeralFile` page were a hit. The "DIO chunk" stuff is truly unnecessary for page cache bypass, but, since we're working on [direct IO](https://github.com/neondatabase/neon/issues/8130) and https://github.com/neondatabase/neon/issues/8719 specifically, we need to do _something_ like this anyways in the near future. # Alternative Design The original plan was to use the `vectored_blob_io` code it relies on the invariant of Delta&Image layers that `index order == values order`. Further, `vectored_blob_io` code's strategy for merging IOs is limited to adjacent reads. However, with direct IO, there is another level of merging that should be done, specifically, if multiple reads map to the same "DIO chunk" (=alignment-requirement-sized and -aligned region of the file), then it's "free" to read the chunk into an IO buffer and serve the two reads from that buffer. => https://github.com/neondatabase/neon/issues/8719 # Testing / Performance Correctness of the IO merging code is ensured by unit tests. Additionally, minimal tests are added for the `EphemeralFile` implementation and the bit-packed `InMemoryLayerIndexValue`. Performance testing results are presented below. All pref testing done on my M2 MacBook Pro, running a Linux VM. It's a release build without `--features testing`. We see definitive improvement in ingest performance microbenchmark and an ad-hoc microbenchmark for getpage against InMemoryLayer. ``` baseline: commit 7c74112b2a6e23c07bfd9cc62c240cd6bbdd3bd9 origin/main HEAD: ef1c55c52e0c313be4d302794d29534591f9cdc5 ```
``` cargo bench --bench bench_ingest -- 'ingest 128MB/100b seq, no delta' baseline ingest-small-values/ingest 128MB/100b seq, no delta time: [483.50 ms 498.73 ms 522.53 ms] thrpt: [244.96 MiB/s 256.65 MiB/s 264.73 MiB/s] HEAD ingest-small-values/ingest 128MB/100b seq, no delta time: [479.22 ms 482.92 ms 487.35 ms] thrpt: [262.64 MiB/s 265.06 MiB/s 267.10 MiB/s] ```
We don't have a micro-benchmark for InMemoryLayer and it's quite cumbersome to add one. So, I did manual testing in `neon_local`.
``` ./target/release/neon_local stop rm -rf .neon ./target/release/neon_local init ./target/release/neon_local start ./target/release/neon_local tenant create --set-default ./target/release/neon_local endpoint create foo ./target/release/neon_local endpoint start foo psql 'postgresql://cloud_admin@127.0.0.1:55432/postgres' psql (13.16 (Debian 13.16-0+deb11u1), server 15.7) CREATE TABLE wal_test ( id SERIAL PRIMARY KEY, data TEXT ); DO $$ DECLARE i INTEGER := 1; BEGIN WHILE i <= 500000 LOOP INSERT INTO wal_test (data) VALUES ('data'); i := i + 1; END LOOP; END $$; -- => result is one L0 from initdb and one 137M-sized ephemeral-2 DO $$ DECLARE i INTEGER := 1; random_id INTEGER; random_record wal_test%ROWTYPE; start_time TIMESTAMP := clock_timestamp(); selects_completed INTEGER := 0; min_id INTEGER := 1; -- Minimum ID value max_id INTEGER := 100000; -- Maximum ID value, based on your insert range iters INTEGER := 100000000; -- Number of iterations to run BEGIN WHILE i <= iters LOOP -- Generate a random ID within the known range random_id := min_id + floor(random() * (max_id - min_id + 1))::int; -- Select the row with the generated random ID SELECT * INTO random_record FROM wal_test WHERE id = random_id; -- Increment the select counter selects_completed := selects_completed + 1; -- Check if a second has passed IF EXTRACT(EPOCH FROM clock_timestamp() - start_time) >= 1 THEN -- Print the number of selects completed in the last second RAISE NOTICE 'Selects completed in last second: %', selects_completed; -- Reset counters for the next second selects_completed := 0; start_time := clock_timestamp(); END IF; -- Increment the loop counter i := i + 1; END LOOP; END $$; ./target/release/neon_local stop baseline: commit 7c74112b2a6e23c07bfd9cc62c240cd6bbdd3bd9 origin/main NOTICE: Selects completed in last second: 1864 NOTICE: Selects completed in last second: 1850 NOTICE: Selects completed in last second: 1851 NOTICE: Selects completed in last second: 1918 NOTICE: Selects completed in last second: 1911 NOTICE: Selects completed in last second: 1879 NOTICE: Selects completed in last second: 1858 NOTICE: Selects completed in last second: 1827 NOTICE: Selects completed in last second: 1933 ours NOTICE: Selects completed in last second: 1915 NOTICE: Selects completed in last second: 1928 NOTICE: Selects completed in last second: 1913 NOTICE: Selects completed in last second: 1932 NOTICE: Selects completed in last second: 1846 NOTICE: Selects completed in last second: 1955 NOTICE: Selects completed in last second: 1991 NOTICE: Selects completed in last second: 1973 ``` NB: the ephemeral file sizes differ by ca 1MiB, ours being 1MiB smaller.
# Rollout This PR changes the code in-place and is not gated by a feature flag. --- Cargo.lock | 14 + Cargo.toml | 2 + pageserver/Cargo.toml | 2 + pageserver/benches/bench_ingest.rs | 4 +- pageserver/src/assert_u64_eq_usize.rs | 39 + pageserver/src/config.rs | 10 + pageserver/src/lib.rs | 1 + pageserver/src/tenant.rs | 6 + pageserver/src/tenant/blob_io.rs | 4 +- pageserver/src/tenant/block_io.rs | 23 - pageserver/src/tenant/ephemeral_file.rs | 428 +++++--- .../src/tenant/ephemeral_file/page_caching.rs | 153 --- .../ephemeral_file/zero_padded_read_write.rs | 145 --- .../zero_padded_read_write/zero_padded.rs | 110 -- .../src/tenant/storage_layer/delta_layer.rs | 6 +- .../tenant/storage_layer/inmemory_layer.rs | 511 ++++++++-- .../inmemory_layer/vectored_dio_read.rs | 937 ++++++++++++++++++ pageserver/src/tenant/timeline.rs | 6 +- .../virtual_file/owned_buffers_io/write.rs | 1 + .../regress/test_pageserver_layer_rolling.py | 9 +- 20 files changed, 1757 insertions(+), 654 deletions(-) create mode 100644 pageserver/src/assert_u64_eq_usize.rs delete mode 100644 pageserver/src/tenant/ephemeral_file/page_caching.rs delete mode 100644 pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs delete mode 100644 pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs create mode 100644 pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs diff --git a/Cargo.lock b/Cargo.lock index 441ca1ff868d..c514625518f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -936,6 +936,12 @@ dependencies = [ "which", ] +[[package]] +name = "bit_field" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61" + [[package]] name = "bitflags" version = "1.3.2" @@ -3683,6 +3689,7 @@ dependencies = [ "async-compression", "async-stream", "async-trait", + "bit_field", "byteorder", "bytes", "camino", @@ -3732,6 +3739,7 @@ dependencies = [ "reqwest 0.12.4", "rpds", "scopeguard", + "send-future", "serde", "serde_json", "serde_path_to_error", @@ -5455,6 +5463,12 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +[[package]] +name = "send-future" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224e328af6e080cddbab3c770b1cf50f0351ba0577091ef2410c3951d835ff87" + [[package]] name = "sentry" version = "0.32.3" diff --git a/Cargo.toml b/Cargo.toml index e038c0b4ffc3..7bd9a26394bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ axum = { version = "0.6.20", features = ["ws"] } base64 = "0.13.0" bincode = "1.3" bindgen = "0.65" +bit_field = "0.10.2" bstr = "1.0" byteorder = "1.4" bytes = "1.0" @@ -145,6 +146,7 @@ rustls-split = "0.3" scopeguard = "1.1" sysinfo = "0.29.2" sd-notify = "0.4.1" +send-future = "0.1.0" sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0e748ee3db7c..85c5e24afc09 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -16,6 +16,7 @@ arc-swap.workspace = true async-compression.workspace = true async-stream.workspace = true async-trait.workspace = true +bit_field.workspace = true byteorder.workspace = true bytes.workspace = true camino.workspace = true @@ -52,6 +53,7 @@ rand.workspace = true range-set-blaze = { version = "0.1.16", features = ["alloc"] } regex.workspace = true scopeguard.workspace = true +send-future.workspace = true serde.workspace = true serde_json = { workspace = true, features = ["raw_value"] } serde_path_to_error.workspace = true diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index f450f46efac5..1be4391d819a 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -103,13 +103,13 @@ async fn ingest( batch.push((key.to_compact(), lsn, data_ser_size, data.clone())); if batch.len() >= BATCH_SIZE { let this_batch = std::mem::take(&mut batch); - let serialized = SerializedBatch::from_values(this_batch); + let serialized = SerializedBatch::from_values(this_batch).unwrap(); layer.put_batch(serialized, &ctx).await?; } } if !batch.is_empty() { let this_batch = std::mem::take(&mut batch); - let serialized = SerializedBatch::from_values(this_batch); + let serialized = SerializedBatch::from_values(this_batch).unwrap(); layer.put_batch(serialized, &ctx).await?; } layer.freeze(lsn + 1).await; diff --git a/pageserver/src/assert_u64_eq_usize.rs b/pageserver/src/assert_u64_eq_usize.rs new file mode 100644 index 000000000000..66ca7fd0575a --- /dev/null +++ b/pageserver/src/assert_u64_eq_usize.rs @@ -0,0 +1,39 @@ +//! `u64`` and `usize`` aren't guaranteed to be identical in Rust, but life is much simpler if that's the case. + +pub(crate) const _ASSERT_U64_EQ_USIZE: () = { + if std::mem::size_of::() != std::mem::size_of::() { + panic!("the traits defined in this module assume that usize and u64 can be converted to each other without loss of information"); + } +}; + +pub(crate) trait U64IsUsize { + fn into_usize(self) -> usize; +} + +impl U64IsUsize for u64 { + #[inline(always)] + fn into_usize(self) -> usize { + #[allow(clippy::let_unit_value)] + let _ = _ASSERT_U64_EQ_USIZE; + self as usize + } +} + +pub(crate) trait UsizeIsU64 { + fn into_u64(self) -> u64; +} + +impl UsizeIsU64 for usize { + #[inline(always)] + fn into_u64(self) -> u64 { + #[allow(clippy::let_unit_value)] + let _ = _ASSERT_U64_EQ_USIZE; + self as u64 + } +} + +pub const fn u64_to_usize(x: u64) -> usize { + #[allow(clippy::let_unit_value)] + let _ = _ASSERT_U64_EQ_USIZE; + x as usize +} diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index ae473bcc5fcd..994075bef669 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -31,6 +31,7 @@ use utils::{ use crate::l0_flush::L0FlushConfig; use crate::tenant::config::TenantConfOpt; +use crate::tenant::storage_layer::inmemory_layer::IndexEntry; use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess; use crate::tenant::vectored_blob_io::MaxVectoredReadBytes; use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; @@ -1020,6 +1021,15 @@ impl PageServerConf { conf.default_tenant_conf = t_conf.merge(TenantConf::default()); + IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance) + .map_err(|msg| anyhow::anyhow!("{msg}")) + .with_context(|| { + format!( + "effective checkpoint distance is unsupported: {}", + conf.default_tenant_conf.checkpoint_distance + ) + })?; + Ok(conf) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index dbfc9f35442b..7a9cf495c7dd 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -16,6 +16,7 @@ pub mod l0_flush; use futures::{stream::FuturesUnordered, StreamExt}; pub use pageserver_api::keyspace; use tokio_util::sync::CancellationToken; +mod assert_u64_eq_usize; pub mod aux_file; pub mod metrics; pub mod page_cache; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0364d521b697..60ab242ffc7c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -877,6 +877,12 @@ impl Tenant { }); }; + // TODO: should also be rejecting tenant conf changes that violate this check. + if let Err(e) = crate::tenant::storage_layer::inmemory_layer::IndexEntry::validate_checkpoint_distance(tenant_clone.get_checkpoint_distance()) { + make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error); + return Ok(()); + } + let mut init_order = init_order; // take the completion because initial tenant loading will complete when all of // these tasks complete. diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index a245c99a88fa..dd70f6bbff8c 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -148,7 +148,7 @@ pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0; /// The maximum size of blobs we support. The highest few bits /// are reserved for compression and other further uses. -const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff; +pub(crate) const MAX_SUPPORTED_BLOB_LEN: usize = 0x0fff_ffff; pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80; pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10; @@ -326,7 +326,7 @@ impl BlobWriter { (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) } else { // Write a 4-byte length header - if len > MAX_SUPPORTED_LEN { + if len > MAX_SUPPORTED_BLOB_LEN { return ( ( io_buf.slice_len(), diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 601b09515519..3afa3a86b948 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -2,7 +2,6 @@ //! Low-level Block-oriented I/O functions //! -use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; @@ -81,9 +80,7 @@ impl<'a> Deref for BlockLease<'a> { /// Unlike traits, we also support the read function to be async though. pub(crate) enum BlockReaderRef<'a> { FileBlockReader(&'a FileBlockReader<'a>), - EphemeralFile(&'a EphemeralFile), Adapter(Adapter<&'a DeltaLayerInner>), - Slice(&'a [u8]), #[cfg(test)] TestDisk(&'a super::disk_btree::tests::TestDisk), #[cfg(test)] @@ -100,9 +97,7 @@ impl<'a> BlockReaderRef<'a> { use BlockReaderRef::*; match self { FileBlockReader(r) => r.read_blk(blknum, ctx).await, - EphemeralFile(r) => r.read_blk(blknum, ctx).await, Adapter(r) => r.read_blk(blknum, ctx).await, - Slice(s) => Self::read_blk_slice(s, blknum), #[cfg(test)] TestDisk(r) => r.read_blk(blknum), #[cfg(test)] @@ -111,24 +106,6 @@ impl<'a> BlockReaderRef<'a> { } } -impl<'a> BlockReaderRef<'a> { - fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result { - let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap(); - let end = start.checked_add(PAGE_SZ).unwrap(); - if end > slice.len() { - return Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - format!("slice too short, len={} end={}", slice.len(), end), - )); - } - let slice = &slice[start..end]; - let page_sized: &[u8; PAGE_SZ] = slice - .try_into() - .expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ"); - Ok(BlockLease::Slice(page_sized)) - } -} - /// /// A "cursor" for efficiently reading multiple pages from a BlockReader /// diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 44f0fc7ab1d9..5324e1807d9c 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -1,13 +1,21 @@ //! Implementation of append-only file data structure //! used to keep in-memory layers spilled on disk. +use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; -use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; -use crate::virtual_file::{self, VirtualFile}; +use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; +use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; +use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; +use crate::virtual_file::owned_buffers_io::write::Buffer; +use crate::virtual_file::{self, owned_buffers_io, VirtualFile}; +use bytes::BytesMut; use camino::Utf8PathBuf; +use num_traits::Num; use pageserver_api::shard::TenantShardId; +use tokio_epoll_uring::{BoundedBuf, Slice}; +use tracing::error; use std::io; use std::sync::atomic::AtomicU64; @@ -16,12 +24,17 @@ use utils::id::TimelineId; pub struct EphemeralFile { _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, - - rw: page_caching::RW, + page_cache_file_id: page_cache::FileId, + bytes_written: u64, + buffered_writer: owned_buffers_io::write::BufferedWriter< + BytesMut, + size_tracking_writer::Writer, + >, + /// Gate guard is held on as long as we need to do operations in the path (delete on drop) + _gate_guard: utils::sync::gate::GateGuard, } -mod page_caching; -mod zero_padded_read_write; +const TAIL_SZ: usize = 64 * 1024; impl EphemeralFile { pub async fn create( @@ -51,72 +64,175 @@ impl EphemeralFile { ) .await?; + let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore + Ok(EphemeralFile { _tenant_shard_id: tenant_shard_id, _timeline_id: timeline_id, - rw: page_caching::RW::new(file, gate_guard), + page_cache_file_id, + bytes_written: 0, + buffered_writer: owned_buffers_io::write::BufferedWriter::new( + size_tracking_writer::Writer::new(file), + BytesMut::with_capacity(TAIL_SZ), + ), + _gate_guard: gate_guard, }) } +} + +impl Drop for EphemeralFile { + fn drop(&mut self) { + // unlink the file + // we are clear to do this, because we have entered a gate + let path = &self.buffered_writer.as_inner().as_inner().path; + let res = std::fs::remove_file(path); + if let Err(e) = res { + if e.kind() != std::io::ErrorKind::NotFound { + // just never log the not found errors, we cannot do anything for them; on detach + // the tenant directory is already gone. + // + // not found files might also be related to https://github.com/neondatabase/neon/issues/2442 + error!("could not remove ephemeral file '{path}': {e}"); + } + } + } +} +impl EphemeralFile { pub(crate) fn len(&self) -> u64 { - self.rw.bytes_written() + self.bytes_written } pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId { - self.rw.page_cache_file_id() + self.page_cache_file_id } - /// See [`self::page_caching::RW::load_to_vec`]. pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { - self.rw.load_to_vec(ctx).await - } - - pub(crate) async fn read_blk( - &self, - blknum: u32, - ctx: &RequestContext, - ) -> Result { - self.rw.read_blk(blknum, ctx).await + let size = self.len().into_usize(); + let vec = Vec::with_capacity(size); + let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?; + assert_eq!(nread, size); + let vec = slice.into_inner(); + assert_eq!(vec.len(), nread); + assert_eq!(vec.capacity(), size, "we shouldn't be reallocating"); + Ok(vec) } - #[cfg(test)] - // This is a test helper: outside of tests, we are always written to via a pre-serialized batch. - pub(crate) async fn write_blob( + /// Returns the offset at which the first byte of the input was written, for use + /// in constructing indices over the written value. + /// + /// Panics if the write is short because there's no way we can recover from that. + /// TODO: make upstack handle this as an error. + pub(crate) async fn write_raw( &mut self, srcbuf: &[u8], ctx: &RequestContext, - ) -> Result { - let pos = self.rw.bytes_written(); + ) -> std::io::Result { + let pos = self.bytes_written; + + let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}", + srcbuf_len = srcbuf.len(), + ), + ) + })?; - let mut len_bytes = std::io::Cursor::new(Vec::new()); - crate::tenant::storage_layer::inmemory_layer::SerializedBatch::write_blob_length( + // Write the payload + let nwritten = self + .buffered_writer + .write_buffered_borrowed(srcbuf, ctx) + .await?; + assert_eq!( + nwritten, srcbuf.len(), - &mut len_bytes, + "buffered writer has no short writes" ); - let len_bytes = len_bytes.into_inner(); - - // Write the length field - self.rw.write_all_borrowed(&len_bytes, ctx).await?; - // Write the payload - self.rw.write_all_borrowed(srcbuf, ctx).await?; + self.bytes_written = new_bytes_written; Ok(pos) } +} - /// Returns the offset at which the first byte of the input was written, for use - /// in constructing indices over the written value. - pub(crate) async fn write_raw( - &mut self, - srcbuf: &[u8], - ctx: &RequestContext, - ) -> Result { - let pos = self.rw.bytes_written(); - - // Write the payload - self.rw.write_all_borrowed(srcbuf, ctx).await?; - - Ok(pos) +impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile { + async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>( + &'b self, + start: u64, + dst: tokio_epoll_uring::Slice, + ctx: &'a RequestContext, + ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { + let file_size_tracking_writer = self.buffered_writer.as_inner(); + let flushed_offset = file_size_tracking_writer.bytes_written(); + + let buffer = self.buffered_writer.inspect_buffer(); + let buffered = &buffer[0..buffer.pending()]; + + let dst_cap = dst.bytes_total().into_u64(); + let end = { + // saturating_add is correct here because the max file size is u64::MAX, so, + // if start + dst.len() > u64::MAX, then we know it will be a short read + let mut end: u64 = start.saturating_add(dst_cap); + if end > self.bytes_written { + end = self.bytes_written; + } + end + }; + + // inclusive, exclusive + #[derive(Debug)] + struct Range(N, N); + impl Range { + fn len(&self) -> N { + if self.0 > self.1 { + N::zero() + } else { + self.1 - self.0 + } + } + } + let written_range = Range(start, std::cmp::min(end, flushed_offset)); + let buffered_range = Range(std::cmp::max(start, flushed_offset), end); + + let dst = if written_range.len() > 0 { + let file: &VirtualFile = file_size_tracking_writer.as_inner(); + let bounds = dst.bounds(); + let slice = file + .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx) + .await?; + Slice::from_buf_bounds(Slice::into_inner(slice), bounds) + } else { + dst + }; + + let dst = if buffered_range.len() > 0 { + let offset_in_buffer = buffered_range + .0 + .checked_sub(flushed_offset) + .unwrap() + .into_usize(); + let to_copy = + &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())]; + let bounds = dst.bounds(); + let mut view = dst.slice({ + let start = written_range.len().into_usize(); + let end = start + .checked_add(buffered_range.len().into_usize()) + .unwrap(); + start..end + }); + view.as_mut_rust_slice_full_zeroed() + .copy_from_slice(to_copy); + Slice::from_buf_bounds(Slice::into_inner(view), bounds) + } else { + dst + }; + + // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs + + Ok((dst, (end - start).into_usize())) } } @@ -129,19 +245,13 @@ pub fn is_ephemeral_file(filename: &str) -> bool { } } -impl BlockReader for EphemeralFile { - fn block_cursor(&self) -> super::block_io::BlockCursor<'_> { - BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self)) - } -} - #[cfg(test)] mod tests { + use rand::Rng; + use super::*; use crate::context::DownloadBehavior; use crate::task_mgr::TaskKind; - use crate::tenant::block_io::BlockReaderRef; - use rand::{thread_rng, RngCore}; use std::fs; use std::str::FromStr; @@ -172,69 +282,6 @@ mod tests { Ok((conf, tenant_shard_id, timeline_id, ctx)) } - #[tokio::test] - async fn test_ephemeral_blobs() -> Result<(), io::Error> { - let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?; - - let gate = utils::sync::gate::Gate::default(); - - let entered = gate.enter().unwrap(); - - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?; - - let pos_foo = file.write_blob(b"foo", &ctx).await?; - assert_eq!( - b"foo", - file.block_cursor() - .read_blob(pos_foo, &ctx) - .await? - .as_slice() - ); - let pos_bar = file.write_blob(b"bar", &ctx).await?; - assert_eq!( - b"foo", - file.block_cursor() - .read_blob(pos_foo, &ctx) - .await? - .as_slice() - ); - assert_eq!( - b"bar", - file.block_cursor() - .read_blob(pos_bar, &ctx) - .await? - .as_slice() - ); - - let mut blobs = Vec::new(); - for i in 0..10000 { - let data = Vec::from(format!("blob{}", i).as_bytes()); - let pos = file.write_blob(&data, &ctx).await?; - blobs.push((pos, data)); - } - // also test with a large blobs - for i in 0..100 { - let data = format!("blob{}", i).as_bytes().repeat(100); - let pos = file.write_blob(&data, &ctx).await?; - blobs.push((pos, data)); - } - - let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file)); - for (pos, expected) in blobs { - let actual = cursor.read_blob(pos, &ctx).await?; - assert_eq!(actual, expected); - } - - // Test a large blob that spans multiple pages - let mut large_data = vec![0; 20000]; - thread_rng().fill_bytes(&mut large_data); - let pos_large = file.write_blob(&large_data, &ctx).await?; - let result = file.block_cursor().read_blob(pos_large, &ctx).await?; - assert_eq!(result, large_data); - - Ok(()) - } - #[tokio::test] async fn ephemeral_file_holds_gate_open() { const FOREVER: std::time::Duration = std::time::Duration::from_secs(5); @@ -268,4 +315,151 @@ mod tests { .expect("closing completes right away") .expect("closing does not panic"); } + + #[tokio::test] + async fn test_ephemeral_file_basics() { + let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap(); + + let gate = utils::sync::gate::Gate::default(); + + let mut file = + EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) + .await + .unwrap(); + + let cap = file.buffered_writer.inspect_buffer().capacity(); + + let write_nbytes = cap + cap / 2; + + let content: Vec = rand::thread_rng() + .sample_iter(rand::distributions::Standard) + .take(write_nbytes) + .collect(); + + let mut value_offsets = Vec::new(); + for i in 0..write_nbytes { + let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap(); + value_offsets.push(off); + } + + assert!(file.len() as usize == write_nbytes); + for i in 0..write_nbytes { + assert_eq!(value_offsets[i], i.into_u64()); + let buf = Vec::with_capacity(1); + let (buf_slice, nread) = file + .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx) + .await + .unwrap(); + let buf = buf_slice.into_inner(); + assert_eq!(nread, 1); + assert_eq!(&buf, &content[i..i + 1]); + } + + let file_contents = + std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap(); + assert_eq!(file_contents, &content[0..cap]); + + let buffer_contents = file.buffered_writer.inspect_buffer(); + assert_eq!(buffer_contents, &content[cap..write_nbytes]); + } + + #[tokio::test] + async fn test_flushes_do_happen() { + let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap(); + + let gate = utils::sync::gate::Gate::default(); + + let mut file = + EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) + .await + .unwrap(); + + let cap = file.buffered_writer.inspect_buffer().capacity(); + + let content: Vec = rand::thread_rng() + .sample_iter(rand::distributions::Standard) + .take(cap + cap / 2) + .collect(); + + file.write_raw(&content, &ctx).await.unwrap(); + + // assert the state is as this test expects it to be + assert_eq!( + &file.load_to_vec(&ctx).await.unwrap(), + &content[0..cap + cap / 2] + ); + let md = file + .buffered_writer + .as_inner() + .as_inner() + .path + .metadata() + .unwrap(); + assert_eq!( + md.len(), + cap.into_u64(), + "buffered writer does one write if we write 1.5x buffer capacity" + ); + assert_eq!( + &file.buffered_writer.inspect_buffer()[0..cap / 2], + &content[cap..cap + cap / 2] + ); + } + + #[tokio::test] + async fn test_read_split_across_file_and_buffer() { + // This test exercises the logic on the read path that splits the logical read + // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer. + // + // This test build on the assertions in test_flushes_do_happen + + let (conf, tenant_id, timeline_id, ctx) = + harness("test_read_split_across_file_and_buffer").unwrap(); + + let gate = utils::sync::gate::Gate::default(); + + let mut file = + EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx) + .await + .unwrap(); + + let cap = file.buffered_writer.inspect_buffer().capacity(); + + let content: Vec = rand::thread_rng() + .sample_iter(rand::distributions::Standard) + .take(cap + cap / 2) + .collect(); + + file.write_raw(&content, &ctx).await.unwrap(); + + let test_read = |start: usize, len: usize| { + let file = &file; + let ctx = &ctx; + let content = &content; + async move { + let (buf, nread) = file + .read_exact_at_eof_ok( + start.into_u64(), + Vec::with_capacity(len).slice_full(), + ctx, + ) + .await + .unwrap(); + assert_eq!(nread, len); + assert_eq!(&buf.into_inner(), &content[start..(start + len)]); + } + }; + + // completely within the file range + assert!(20 < cap, "test assumption"); + test_read(10, 10).await; + // border onto edge of file + test_read(cap - 10, 10).await; + // read across file and buffer + test_read(cap - 10, 20).await; + // stay from start of buffer + test_read(cap, 10).await; + // completely within buffer + test_read(cap + 10, 10).await; + } } diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs deleted file mode 100644 index 48926354f1c4..000000000000 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ /dev/null @@ -1,153 +0,0 @@ -//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the -//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`]. -//! -//! Subject to removal in - -use crate::context::RequestContext; -use crate::page_cache::{self, PAGE_SZ}; -use crate::tenant::block_io::BlockLease; -use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; -use crate::virtual_file::VirtualFile; - -use std::io::{self}; -use tokio_epoll_uring::BoundedBuf; -use tracing::*; - -use super::zero_padded_read_write; - -/// See module-level comment. -pub struct RW { - page_cache_file_id: page_cache::FileId, - rw: super::zero_padded_read_write::RW>, - /// Gate guard is held on as long as we need to do operations in the path (delete on drop). - _gate_guard: utils::sync::gate::GateGuard, -} - -impl RW { - pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self { - let page_cache_file_id = page_cache::next_file_id(); - Self { - page_cache_file_id, - rw: super::zero_padded_read_write::RW::new(size_tracking_writer::Writer::new(file)), - _gate_guard, - } - } - - pub fn page_cache_file_id(&self) -> page_cache::FileId { - self.page_cache_file_id - } - - pub(crate) async fn write_all_borrowed( - &mut self, - srcbuf: &[u8], - ctx: &RequestContext, - ) -> Result { - // It doesn't make sense to proactively fill the page cache on the Pageserver write path - // because Compute is unlikely to access recently written data. - self.rw.write_all_borrowed(srcbuf, ctx).await - } - - pub(crate) fn bytes_written(&self) -> u64 { - self.rw.bytes_written() - } - - /// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer. - /// - /// This includes the blocks that aren't yet flushed to disk by the internal buffered writer. - /// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`]. - pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { - // round up to the next PAGE_SZ multiple, required by blob_io - let size = { - let s = usize::try_from(self.bytes_written()).unwrap(); - if s % PAGE_SZ == 0 { - s - } else { - s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap() - } - }; - let vec = Vec::with_capacity(size); - - // read from disk what we've already flushed - let file_size_tracking_writer = self.rw.as_writer(); - let flushed_range = 0..usize::try_from(file_size_tracking_writer.bytes_written()).unwrap(); - let mut vec = file_size_tracking_writer - .as_inner() - .read_exact_at( - vec.slice(0..(flushed_range.end - flushed_range.start)), - u64::try_from(flushed_range.start).unwrap(), - ctx, - ) - .await? - .into_inner(); - - // copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk - let buffered = self.rw.get_tail_zero_padded(); - vec.extend_from_slice(buffered); - assert_eq!(vec.len(), size); - assert_eq!(vec.len() % PAGE_SZ, 0); - Ok(vec) - } - - pub(crate) async fn read_blk( - &self, - blknum: u32, - ctx: &RequestContext, - ) -> Result { - match self.rw.read_blk(blknum).await? { - zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => { - let cache = page_cache::get(); - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - // order path before error because error is anyhow::Error => might have many contexts - format!( - "ephemeral file: read immutable page #{}: {}: {:#}", - blknum, - self.rw.as_writer().as_inner().path, - e, - ), - ) - })? { - page_cache::ReadBufResult::Found(guard) => { - return Ok(BlockLease::PageReadGuard(guard)) - } - page_cache::ReadBufResult::NotFound(write_guard) => { - let write_guard = writer - .as_inner() - .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx) - .await?; - let read_guard = write_guard.mark_valid(); - return Ok(BlockLease::PageReadGuard(read_guard)); - } - } - } - zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => { - Ok(BlockLease::EphemeralFileMutableTail(buffer)) - } - } - } -} - -impl Drop for RW { - fn drop(&mut self) { - // There might still be pages in the [`crate::page_cache`] for this file. - // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed. - - // unlink the file - // we are clear to do this, because we have entered a gate - let path = &self.rw.as_writer().as_inner().path; - let res = std::fs::remove_file(path); - if let Err(e) = res { - if e.kind() != std::io::ErrorKind::NotFound { - // just never log the not found errors, we cannot do anything for them; on detach - // the tenant directory is already gone. - // - // not found files might also be related to https://github.com/neondatabase/neon/issues/2442 - error!("could not remove ephemeral file '{path}': {e}"); - } - } - } -} diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs deleted file mode 100644 index fe310acab888..000000000000 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs +++ /dev/null @@ -1,145 +0,0 @@ -//! The heart of how [`super::EphemeralFile`] does its reads and writes. -//! -//! # Writes -//! -//! [`super::EphemeralFile`] writes small, borrowed buffers using [`RW::write_all_borrowed`]. -//! The [`RW`] batches these into [`TAIL_SZ`] bigger writes, using [`owned_buffers_io::write::BufferedWriter`]. -//! -//! # Reads -//! -//! [`super::EphemeralFile`] always reads full [`PAGE_SZ`]ed blocks using [`RW::read_blk`]. -//! -//! The [`RW`] serves these reads either from the buffered writer's in-memory buffer -//! or redirects the caller to read from the underlying [`OwnedAsyncWriter`] -//! if the read is for the prefix that has already been flushed. -//! -//! # Current Usage -//! -//! The current user of this module is [`super::page_caching::RW`]. - -mod zero_padded; - -use crate::{ - context::RequestContext, - page_cache::PAGE_SZ, - virtual_file::owned_buffers_io::{ - self, - write::{Buffer, OwnedAsyncWriter}, - }, -}; - -const TAIL_SZ: usize = 64 * 1024; - -/// See module-level comment. -pub struct RW { - buffered_writer: owned_buffers_io::write::BufferedWriter< - zero_padded::Buffer, - owned_buffers_io::util::size_tracking_writer::Writer, - >, -} - -pub enum ReadResult<'a, W> { - NeedsReadFromWriter { writer: &'a W }, - ServedFromZeroPaddedMutableTail { buffer: &'a [u8; PAGE_SZ] }, -} - -impl RW -where - W: OwnedAsyncWriter, -{ - pub fn new(writer: W) -> Self { - let bytes_flushed_tracker = - owned_buffers_io::util::size_tracking_writer::Writer::new(writer); - let buffered_writer = owned_buffers_io::write::BufferedWriter::new( - bytes_flushed_tracker, - zero_padded::Buffer::default(), - ); - Self { buffered_writer } - } - - pub(crate) fn as_writer(&self) -> &W { - self.buffered_writer.as_inner().as_inner() - } - - pub async fn write_all_borrowed( - &mut self, - buf: &[u8], - ctx: &RequestContext, - ) -> std::io::Result { - self.buffered_writer.write_buffered_borrowed(buf, ctx).await - } - - pub fn bytes_written(&self) -> u64 { - let flushed_offset = self.buffered_writer.as_inner().bytes_written(); - let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); - flushed_offset + u64::try_from(buffer.pending()).unwrap() - } - - /// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`]. - pub fn get_tail_zero_padded(&self) -> &[u8] { - let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); - let buffer_written_up_to = buffer.pending(); - // pad to next page boundary - let read_up_to = if buffer_written_up_to % PAGE_SZ == 0 { - buffer_written_up_to - } else { - buffer_written_up_to - .checked_add(PAGE_SZ - (buffer_written_up_to % PAGE_SZ)) - .unwrap() - }; - &buffer.as_zero_padded_slice()[0..read_up_to] - } - - pub(crate) async fn read_blk(&self, blknum: u32) -> Result, std::io::Error> { - let flushed_offset = self.buffered_writer.as_inner().bytes_written(); - let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); - let buffered_offset = flushed_offset + u64::try_from(buffer.pending()).unwrap(); - let read_offset = (blknum as u64) * (PAGE_SZ as u64); - - // The trailing page ("block") might only be partially filled, - // yet the blob_io code relies on us to return a full PAGE_SZed slice anyway. - // Moreover, it has to be zero-padded, because when we still had - // a write-back page cache, it provided pre-zeroed pages, and blob_io came to rely on it. - // DeltaLayer probably has the same issue, not sure why it needs no special treatment. - // => check here that the read doesn't go beyond this potentially trailing - // => the zero-padding is done in the `else` branch below - let blocks_written = if buffered_offset % (PAGE_SZ as u64) == 0 { - buffered_offset / (PAGE_SZ as u64) - } else { - (buffered_offset / (PAGE_SZ as u64)) + 1 - }; - if (blknum as u64) >= blocks_written { - return Err(std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("read past end of ephemeral_file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}"))); - } - - // assertions for the `if-else` below - assert_eq!( - flushed_offset % (TAIL_SZ as u64), 0, - "we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks" - ); - assert_eq!( - flushed_offset % (PAGE_SZ as u64), - 0, - "the logic below can't handle if the page is spread across the flushed part and the buffer" - ); - - if read_offset < flushed_offset { - assert!(read_offset + (PAGE_SZ as u64) <= flushed_offset); - Ok(ReadResult::NeedsReadFromWriter { - writer: self.as_writer(), - }) - } else { - let read_offset_in_buffer = read_offset - .checked_sub(flushed_offset) - .expect("would have taken `if` branch instead of this one"); - let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap(); - let zero_padded_slice = buffer.as_zero_padded_slice(); - let page = &zero_padded_slice[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)]; - Ok(ReadResult::ServedFromZeroPaddedMutableTail { - buffer: page - .try_into() - .expect("the slice above got it as page-size slice"), - }) - } - } -} diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs deleted file mode 100644 index 2dc0277638e7..000000000000 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs +++ /dev/null @@ -1,110 +0,0 @@ -//! A [`crate::virtual_file::owned_buffers_io::write::Buffer`] whose -//! unwritten range is guaranteed to be zero-initialized. -//! This is used by [`crate::tenant::ephemeral_file::zero_padded_read_write::RW::read_blk`] -//! to serve page-sized reads of the trailing page when the trailing page has only been partially filled. - -use std::mem::MaybeUninit; - -use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; - -/// See module-level comment. -pub struct Buffer { - allocation: Box<[u8; N]>, - written: usize, -} - -impl Default for Buffer { - fn default() -> Self { - Self { - allocation: Box::new( - // SAFETY: zeroed memory is a valid [u8; N] - unsafe { MaybeUninit::zeroed().assume_init() }, - ), - written: 0, - } - } -} - -impl Buffer { - #[inline(always)] - fn invariants(&self) { - // don't check by default, unoptimized is too expensive even for debug mode - if false { - debug_assert!(self.written <= N, "{}", self.written); - debug_assert!(self.allocation[self.written..N].iter().all(|v| *v == 0)); - } - } - - pub fn as_zero_padded_slice(&self) -> &[u8; N] { - &self.allocation - } -} - -impl crate::virtual_file::owned_buffers_io::write::Buffer for Buffer { - type IoBuf = Self; - - fn cap(&self) -> usize { - self.allocation.len() - } - - fn extend_from_slice(&mut self, other: &[u8]) { - self.invariants(); - let remaining = self.allocation.len() - self.written; - if other.len() > remaining { - panic!("calling extend_from_slice() with insufficient remaining capacity"); - } - self.allocation[self.written..(self.written + other.len())].copy_from_slice(other); - self.written += other.len(); - self.invariants(); - } - - fn pending(&self) -> usize { - self.written - } - - fn flush(self) -> FullSlice { - self.invariants(); - let written = self.written; - FullSlice::must_new(tokio_epoll_uring::BoundedBuf::slice(self, 0..written)) - } - - fn reuse_after_flush(iobuf: Self::IoBuf) -> Self { - let Self { - mut allocation, - written, - } = iobuf; - allocation[0..written].fill(0); - let new = Self { - allocation, - written: 0, - }; - new.invariants(); - new - } -} - -/// We have this trait impl so that the `flush` method in the `Buffer` impl above can produce a -/// [`tokio_epoll_uring::BoundedBuf::slice`] of the [`Self::written`] range of the data. -/// -/// Remember that bytes_init is generally _not_ a tracker of the amount -/// of valid data in the io buffer; we use `Slice` for that. -/// The `IoBuf` is _only_ for keeping track of uninitialized memory, a bit like MaybeUninit. -/// -/// SAFETY: -/// -/// The [`Self::allocation`] is stable becauses boxes are stable. -/// The memory is zero-initialized, so, bytes_init is always N. -unsafe impl tokio_epoll_uring::IoBuf for Buffer { - fn stable_ptr(&self) -> *const u8 { - self.allocation.as_ptr() - } - - fn bytes_init(&self) -> usize { - // Yes, N, not self.written; Read the full comment of this impl block! - N - } - - fn bytes_total(&self) -> usize { - N - } -} diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index c0508e13c05b..00ef5b0afd11 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -65,7 +65,7 @@ use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; use tokio::sync::OnceCell; -use tokio_epoll_uring::IoBufMut; +use tokio_epoll_uring::IoBuf; use tracing::*; use utils::{ @@ -471,7 +471,7 @@ impl DeltaLayerWriterInner { ctx: &RequestContext, ) -> (FullSlice, anyhow::Result<()>) where - Buf: IoBufMut + Send, + Buf: IoBuf + Send, { assert!( self.lsn_range.start <= lsn, @@ -678,7 +678,7 @@ impl DeltaLayerWriter { ctx: &RequestContext, ) -> (FullSlice, anyhow::Result<()>) where - Buf: IoBufMut + Send, + Buf: IoBuf + Send, { self.inner .as_mut() diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index a71b4dd83beb..f31ab4b1e8c3 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -4,23 +4,23 @@ //! held in an ephemeral file, not in memory. The metadata for each page version, i.e. //! its position in the file, is kept in memory, though. //! +use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64}; use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; -use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value}; -use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; +use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, OnceLock}; use std::time::Instant; use tracing::*; @@ -39,6 +39,8 @@ use super::{ DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState, }; +pub(crate) mod vectored_dio_read; + #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub(crate) struct InMemoryLayerFileId(page_cache::FileId); @@ -78,9 +80,9 @@ impl std::fmt::Debug for InMemoryLayer { pub struct InMemoryLayerInner { /// All versions of all pages in the layer are kept here. Indexed - /// by block number and LSN. The value is an offset into the + /// by block number and LSN. The [`IndexEntry`] is an offset into the /// ephemeral file where the page version is stored. - index: BTreeMap>, + index: BTreeMap>, /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. @@ -90,6 +92,154 @@ pub struct InMemoryLayerInner { resource_units: GlobalResourceUnits, } +/// Support the same max blob length as blob_io, because ultimately +/// all the InMemoryLayer contents end up being written into a delta layer, +/// using the [`crate::tenant::blob_io`]. +const MAX_SUPPORTED_BLOB_LEN: usize = crate::tenant::blob_io::MAX_SUPPORTED_BLOB_LEN; +const MAX_SUPPORTED_BLOB_LEN_BITS: usize = { + let trailing_ones = MAX_SUPPORTED_BLOB_LEN.trailing_ones() as usize; + let leading_zeroes = MAX_SUPPORTED_BLOB_LEN.leading_zeros() as usize; + assert!(trailing_ones + leading_zeroes == std::mem::size_of::() * 8); + trailing_ones +}; + +/// See [`InMemoryLayerInner::index`]. +/// +/// For memory efficiency, the data is packed into a u64. +/// +/// Layout: +/// - 1 bit: `will_init` +/// - [`MAX_SUPPORTED_BLOB_LEN_BITS`]: `len` +/// - [`MAX_SUPPORTED_POS_BITS`]: `pos` +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct IndexEntry(u64); + +impl IndexEntry { + /// See [`Self::MAX_SUPPORTED_POS`]. + const MAX_SUPPORTED_POS_BITS: usize = { + let remainder = 64 - 1 - MAX_SUPPORTED_BLOB_LEN_BITS; + if remainder < 32 { + panic!("pos can be u32 as per type system, support that"); + } + remainder + }; + /// The maximum supported blob offset that can be represented by [`Self`]. + /// See also [`Self::validate_checkpoint_distance`]. + const MAX_SUPPORTED_POS: usize = (1 << Self::MAX_SUPPORTED_POS_BITS) - 1; + + // Layout + const WILL_INIT_RANGE: Range = 0..1; + const LEN_RANGE: Range = + Self::WILL_INIT_RANGE.end..Self::WILL_INIT_RANGE.end + MAX_SUPPORTED_BLOB_LEN_BITS; + const POS_RANGE: Range = + Self::LEN_RANGE.end..Self::LEN_RANGE.end + Self::MAX_SUPPORTED_POS_BITS; + const _ASSERT: () = { + if Self::POS_RANGE.end != 64 { + panic!("we don't want undefined bits for our own sanity") + } + }; + + /// Fails if and only if the offset or length encoded in `arg` is too large to be represented by [`Self`]. + /// + /// The only reason why that can happen in the system is if the [`InMemoryLayer`] grows too long. + /// The [`InMemoryLayer`] size is determined by the checkpoint distance, enforced by [`crate::tenant::Timeline::should_roll`]. + /// + /// Thus, to avoid failure of this function, whenever we start up and/or change checkpoint distance, + /// call [`Self::validate_checkpoint_distance`] with the new checkpoint distance value. + /// + /// TODO: this check should happen ideally at config parsing time (and in the request handler when a change to checkpoint distance is requested) + /// When cleaning this up, also look into the s3 max file size check that is performed in delta layer writer. + #[inline(always)] + fn new(arg: IndexEntryNewArgs) -> anyhow::Result { + let IndexEntryNewArgs { + base_offset, + batch_offset, + len, + will_init, + } = arg; + + let pos = base_offset + .checked_add(batch_offset) + .ok_or_else(|| anyhow::anyhow!("base_offset + batch_offset overflows u64: base_offset={base_offset} batch_offset={batch_offset}"))?; + + if pos.into_usize() > Self::MAX_SUPPORTED_POS { + anyhow::bail!( + "base_offset+batch_offset exceeds the maximum supported value: base_offset={base_offset} batch_offset={batch_offset} (+)={pos} max={max}", + max = Self::MAX_SUPPORTED_POS + ); + } + + if len > MAX_SUPPORTED_BLOB_LEN { + anyhow::bail!( + "len exceeds the maximum supported length: len={len} max={MAX_SUPPORTED_BLOB_LEN}", + ); + } + + let mut data: u64 = 0; + use bit_field::BitField; + data.set_bits(Self::WILL_INIT_RANGE, if will_init { 1 } else { 0 }); + data.set_bits(Self::LEN_RANGE, len.into_u64()); + data.set_bits(Self::POS_RANGE, pos); + + Ok(Self(data)) + } + + #[inline(always)] + fn unpack(&self) -> IndexEntryUnpacked { + use bit_field::BitField; + IndexEntryUnpacked { + will_init: self.0.get_bits(Self::WILL_INIT_RANGE) != 0, + len: self.0.get_bits(Self::LEN_RANGE), + pos: self.0.get_bits(Self::POS_RANGE), + } + } + + /// See [`Self::new`]. + pub(crate) const fn validate_checkpoint_distance( + checkpoint_distance: u64, + ) -> Result<(), &'static str> { + if checkpoint_distance > Self::MAX_SUPPORTED_POS as u64 { + return Err("exceeds the maximum supported value"); + } + let res = u64_to_usize(checkpoint_distance).checked_add(MAX_SUPPORTED_BLOB_LEN); + if res.is_none() { + return Err( + "checkpoint distance + max supported blob len overflows in-memory addition", + ); + } + + // NB: it is ok for the result of the addition to be larger than MAX_SUPPORTED_POS + + Ok(()) + } + + const _ASSERT_DEFAULT_CHECKPOINT_DISTANCE_IS_VALID: () = { + let res = Self::validate_checkpoint_distance( + crate::tenant::config::defaults::DEFAULT_CHECKPOINT_DISTANCE, + ); + if res.is_err() { + panic!("default checkpoint distance is valid") + } + }; +} + +/// Args to [`IndexEntry::new`]. +#[derive(Clone, Copy)] +struct IndexEntryNewArgs { + base_offset: u64, + batch_offset: u64, + len: usize, + will_init: bool, +} + +/// Unpacked representation of the bitfielded [`IndexEntry`]. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +struct IndexEntryUnpacked { + will_init: bool, + len: u64, + pos: u64, +} + impl std::fmt::Debug for InMemoryLayerInner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("InMemoryLayerInner").finish() @@ -276,7 +426,12 @@ impl InMemoryLayer { .build(); let inner = self.inner.read().await; - let reader = inner.file.block_cursor(); + + struct ValueRead { + entry_lsn: Lsn, + read: vectored_dio_read::LogicalRead>, + } + let mut reads: HashMap> = HashMap::new(); for range in keyspace.ranges.iter() { for (key, vec_map) in inner @@ -291,24 +446,62 @@ impl InMemoryLayer { let slice = vec_map.slice_range(lsn_range); - for (entry_lsn, pos) in slice.iter().rev() { - // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183 - let buf = reader.read_blob(*pos, &ctx).await; - if let Err(e) = buf { - reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e))); + for (entry_lsn, index_entry) in slice.iter().rev() { + let IndexEntryUnpacked { + pos, + len, + will_init, + } = index_entry.unpack(); + reads.entry(key).or_default().push(ValueRead { + entry_lsn: *entry_lsn, + read: vectored_dio_read::LogicalRead::new( + pos, + Vec::with_capacity(len as usize), + ), + }); + if will_init { break; } + } + } + } + + // Execute the reads. - let value = Value::des(&buf.unwrap()); - if let Err(e) = value { + let f = vectored_dio_read::execute( + &inner.file, + reads + .iter() + .flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)), + &ctx, + ); + send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865 + .await; + + // Process results into the reconstruct state + 'next_key: for (key, value_reads) in reads { + for ValueRead { entry_lsn, read } in value_reads { + match read.into_result().expect("we run execute() above") { + Err(e) => { reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e))); - break; + continue 'next_key; } - - let key_situation = - reconstruct_state.update_key(&key, *entry_lsn, value.unwrap()); - if key_situation == ValueReconstructSituation::Complete { - break; + Ok(value_buf) => { + let value = Value::des(&value_buf); + if let Err(e) = value { + reconstruct_state + .on_key_error(key, PageReconstructError::from(anyhow!(e))); + continue 'next_key; + } + + let key_situation = + reconstruct_state.update_key(&key, entry_lsn, value.unwrap()); + if key_situation == ValueReconstructSituation::Complete { + // TODO: metric to see if we fetched more values than necessary + continue 'next_key; + } + + // process the next value in the next iteration of the loop } } } @@ -324,8 +517,9 @@ impl InMemoryLayer { struct SerializedBatchOffset { key: CompactKey, lsn: Lsn, - /// offset in bytes from the start of the batch's buffer to the Value's serialized size header. - offset: u64, + // TODO: separate type when we start serde-serializing this value, to avoid coupling + // in-memory representation to serialization format. + index_entry: IndexEntry, } pub struct SerializedBatch { @@ -340,30 +534,10 @@ pub struct SerializedBatch { } impl SerializedBatch { - /// Write a blob length in the internal format of the EphemeralFile - pub(crate) fn write_blob_length(len: usize, cursor: &mut std::io::Cursor>) { - use std::io::Write; - - if len < 0x80 { - // short one-byte length header - let len_buf = [len as u8]; - - cursor - .write_all(&len_buf) - .expect("Writing to Vec is infallible"); - } else { - let mut len_buf = u32::to_be_bytes(len as u32); - len_buf[0] |= 0x80; - cursor - .write_all(&len_buf) - .expect("Writing to Vec is infallible"); - } - } - - pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self { + pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> anyhow::Result { // Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by // [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`] - let buffer_size = batch.iter().map(|i| i.2).sum::() + 4 * batch.len(); + let buffer_size = batch.iter().map(|i| i.2).sum::(); let mut cursor = std::io::Cursor::new(Vec::::with_capacity(buffer_size)); let mut offsets: Vec = Vec::with_capacity(batch.len()); @@ -371,14 +545,19 @@ impl SerializedBatch { for (key, lsn, val_ser_size, val) in batch { let relative_off = cursor.position(); - Self::write_blob_length(val_ser_size, &mut cursor); val.ser_into(&mut cursor) .expect("Writing into in-memory buffer is infallible"); offsets.push(SerializedBatchOffset { key, lsn, - offset: relative_off, + index_entry: IndexEntry::new(IndexEntryNewArgs { + base_offset: 0, + batch_offset: relative_off, + len: val_ser_size, + will_init: val.will_init(), + }) + .context("higher-level code ensures that values are within supported ranges")?, }); max_lsn = std::cmp::max(max_lsn, lsn); } @@ -388,11 +567,11 @@ impl SerializedBatch { // Assert that we didn't do any extra allocations while building buffer. debug_assert!(buffer.len() <= buffer_size); - Self { + Ok(Self { raw: buffer, offsets, max_lsn, - } + }) } } @@ -456,44 +635,69 @@ impl InMemoryLayer { }) } - // Write path. + /// Write path. + /// + /// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from. + /// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable. + /// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors. pub async fn put_batch( &self, serialized_batch: SerializedBatch, ctx: &RequestContext, - ) -> Result<()> { + ) -> anyhow::Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); - let base_off = { - inner - .file - .write_raw( - &serialized_batch.raw, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::InMemoryLayer) - .build(), - ) - .await? - }; + let base_offset = inner.file.len(); + + let SerializedBatch { + raw, + mut offsets, + max_lsn: _, + } = serialized_batch; + + // Add the base_offset to the batch's index entries which are relative to the batch start. + for offset in &mut offsets { + let IndexEntryUnpacked { + will_init, + len, + pos, + } = offset.index_entry.unpack(); + offset.index_entry = IndexEntry::new(IndexEntryNewArgs { + base_offset, + batch_offset: pos, + len: len.into_usize(), + will_init, + })?; + } + // Write the batch to the file + inner.file.write_raw(&raw, ctx).await?; + let new_size = inner.file.len(); + let expected_new_len = base_offset + .checked_add(raw.len().into_u64()) + // write_raw would error if we were to overflow u64. + // also IndexEntry and higher levels in + //the code don't allow the file to grow that large + .unwrap(); + assert_eq!(new_size, expected_new_len); + + // Update the index with the new entries for SerializedBatchOffset { key, lsn, - offset: relative_off, - } in serialized_batch.offsets + index_entry, + } in offsets { - let off = base_off + relative_off; let vec_map = inner.index.entry(key).or_default(); - let old = vec_map.append_or_update_last(lsn, off).unwrap().0; + let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. warn!("Key {} at {} already exists", key, lsn); } } - let size = inner.file.len(); - inner.resource_units.maybe_publish_size(size); + inner.resource_units.maybe_publish_size(new_size); Ok(()) } @@ -537,7 +741,7 @@ impl InMemoryLayer { { let inner = self.inner.write().await; for vec_map in inner.index.values() { - for (lsn, _pos) in vec_map.as_slice() { + for (lsn, _) in vec_map.as_slice() { assert!(*lsn < end_lsn); } } @@ -601,36 +805,23 @@ impl InMemoryLayer { match l0_flush_global_state { l0_flush::Inner::Direct { .. } => { let file_contents: Vec = inner.file.load_to_vec(ctx).await?; - assert_eq!( - file_contents.len() % PAGE_SZ, - 0, - "needed by BlockReaderRef::Slice" - ); - assert_eq!(file_contents.len(), { - let written = usize::try_from(inner.file.len()).unwrap(); - if written % PAGE_SZ == 0 { - written - } else { - written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap() - } - }); - - let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents)); - let mut buf = Vec::new(); + let file_contents = Bytes::from(file_contents); for (key, vec_map) in inner.index.iter() { // Write all page versions - for (lsn, pos) in vec_map.as_slice() { - // TODO: once we have blob lengths in the in-memory index, we can - // 1. get rid of the blob_io / BlockReaderRef::Slice business and - // 2. load the file contents into a Bytes and - // 3. the use `Bytes::slice` to get the `buf` that is our blob - // 4. pass that `buf` into `put_value_bytes` - // => https://github.com/neondatabase/neon/issues/8183 - cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; - let will_init = Value::des(&buf)?.will_init(); - let (tmp, res) = delta_layer_writer + for (lsn, entry) in vec_map + .as_slice() + .iter() + .map(|(lsn, entry)| (lsn, entry.unpack())) + { + let IndexEntryUnpacked { + pos, + len, + will_init, + } = entry; + let buf = Bytes::slice(&file_contents, pos as usize..(pos + len) as usize); + let (_buf, res) = delta_layer_writer .put_value_bytes( Key::from_compact(*key), *lsn, @@ -640,7 +831,6 @@ impl InMemoryLayer { ) .await; res?; - buf = tmp.into_raw_slice().into_inner(); } } } @@ -662,3 +852,134 @@ impl InMemoryLayer { Ok(Some((desc, path))) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_index_entry() { + const MAX_SUPPORTED_POS: usize = IndexEntry::MAX_SUPPORTED_POS; + use IndexEntryNewArgs as Args; + use IndexEntryUnpacked as Unpacked; + + let roundtrip = |args, expect: Unpacked| { + let res = IndexEntry::new(args).expect("this tests expects no errors"); + let IndexEntryUnpacked { + will_init, + len, + pos, + } = res.unpack(); + assert_eq!(will_init, expect.will_init); + assert_eq!(len, expect.len); + assert_eq!(pos, expect.pos); + }; + + // basic roundtrip + for pos in [0, MAX_SUPPORTED_POS] { + for len in [0, MAX_SUPPORTED_BLOB_LEN] { + for will_init in [true, false] { + let expect = Unpacked { + will_init, + len: len.into_u64(), + pos: pos.into_u64(), + }; + roundtrip( + Args { + will_init, + base_offset: pos.into_u64(), + batch_offset: 0, + len, + }, + expect, + ); + roundtrip( + Args { + will_init, + base_offset: 0, + batch_offset: pos.into_u64(), + len, + }, + expect, + ); + } + } + } + + // too-large len + let too_large = Args { + will_init: false, + len: MAX_SUPPORTED_BLOB_LEN + 1, + base_offset: 0, + batch_offset: 0, + }; + assert!(IndexEntry::new(too_large).is_err()); + + // too-large pos + { + let too_large = Args { + will_init: false, + len: 0, + base_offset: MAX_SUPPORTED_POS.into_u64() + 1, + batch_offset: 0, + }; + assert!(IndexEntry::new(too_large).is_err()); + let too_large = Args { + will_init: false, + len: 0, + base_offset: 0, + batch_offset: MAX_SUPPORTED_POS.into_u64() + 1, + }; + assert!(IndexEntry::new(too_large).is_err()); + } + + // too large (base_offset + batch_offset) + { + let too_large = Args { + will_init: false, + len: 0, + base_offset: MAX_SUPPORTED_POS.into_u64(), + batch_offset: 1, + }; + assert!(IndexEntry::new(too_large).is_err()); + let too_large = Args { + will_init: false, + len: 0, + base_offset: MAX_SUPPORTED_POS.into_u64() - 1, + batch_offset: MAX_SUPPORTED_POS.into_u64() - 1, + }; + assert!(IndexEntry::new(too_large).is_err()); + } + + // valid special cases + // - area past the max supported pos that is accessible by len + for len in [1, MAX_SUPPORTED_BLOB_LEN] { + roundtrip( + Args { + will_init: false, + len, + base_offset: MAX_SUPPORTED_POS.into_u64(), + batch_offset: 0, + }, + Unpacked { + will_init: false, + len: len as u64, + pos: MAX_SUPPORTED_POS.into_u64(), + }, + ); + roundtrip( + Args { + will_init: false, + len, + base_offset: 0, + batch_offset: MAX_SUPPORTED_POS.into_u64(), + }, + Unpacked { + will_init: false, + len: len as u64, + pos: MAX_SUPPORTED_POS.into_u64(), + }, + ); + } + } +} diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs new file mode 100644 index 000000000000..0683e15659dc --- /dev/null +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -0,0 +1,937 @@ +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; + +use itertools::Itertools; +use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; + +use crate::{ + assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}, + context::RequestContext, +}; + +/// The file interface we require. At runtime, this is a [`crate::tenant::ephemeral_file::EphemeralFile`]. +pub trait File: Send { + /// Attempt to read the bytes in `self` in range `[start,start+dst.bytes_total())` + /// and return the number of bytes read (let's call it `nread`). + /// The bytes read are placed in `dst`, i.e., `&dst[..nread]` will contain the read bytes. + /// + /// The only reason why the read may be short (i.e., `nread != dst.bytes_total()`) + /// is if the file is shorter than `start+dst.len()`. + /// + /// This is unlike [`std::os::unix::fs::FileExt::read_exact_at`] which returns an + /// [`std::io::ErrorKind::UnexpectedEof`] error if the file is shorter than `start+dst.len()`. + /// + /// No guarantees are made about the remaining bytes in `dst` in case of a short read. + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + &'b self, + start: u64, + dst: Slice, + ctx: &'a RequestContext, + ) -> std::io::Result<(Slice, usize)>; +} + +/// A logical read from [`File`]. See [`Self::new`]. +pub struct LogicalRead { + pos: u64, + state: RwLockRefCell>, +} + +enum LogicalReadState { + NotStarted(B), + Ongoing(B), + Ok(B), + Error(Arc), + Undefined, +} + +impl LogicalRead { + /// Create a new [`LogicalRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`. + pub fn new(pos: u64, buf: B) -> Self { + Self { + pos, + state: RwLockRefCell::new(LogicalReadState::NotStarted(buf)), + } + } + pub fn into_result(self) -> Option>> { + match self.state.into_inner() { + LogicalReadState::Ok(buf) => Some(Ok(buf)), + LogicalReadState::Error(e) => Some(Err(e)), + LogicalReadState::NotStarted(_) | LogicalReadState::Ongoing(_) => None, + LogicalReadState::Undefined => unreachable!(), + } + } +} + +/// The buffer into which a [`LogicalRead`] result is placed. +pub trait Buffer: std::ops::Deref { + /// Immutable. + fn cap(&self) -> usize; + /// Changes only through [`Self::extend_from_slice`]. + fn len(&self) -> usize; + /// Panics if the total length would exceed the initialized capacity. + fn extend_from_slice(&mut self, src: &[u8]); +} + +/// The minimum alignment and size requirement for disk offsets and memory buffer size for direct IO. +const DIO_CHUNK_SIZE: usize = 512; + +/// If multiple chunks need to be read, merge adjacent chunk reads into batches of max size `MAX_CHUNK_BATCH_SIZE`. +/// (The unit is the number of chunks.) +const MAX_CHUNK_BATCH_SIZE: usize = { + let desired = 128 * 1024; // 128k + if desired % DIO_CHUNK_SIZE != 0 { + panic!("MAX_CHUNK_BATCH_SIZE must be a multiple of DIO_CHUNK_SIZE") + // compile-time error + } + desired / DIO_CHUNK_SIZE +}; + +/// Execute the given logical `reads` against `file`. +/// The results are placed in the buffers of the [`LogicalRead`]s. +/// Retrieve the results by calling [`LogicalRead::into_result`] on each [`LogicalRead`]. +/// +/// The [`LogicalRead`]s must be freshly created using [`LogicalRead::new`] when calling this function. +/// Otherwise, this function panics. +pub async fn execute<'a, I, F, B>(file: &F, reads: I, ctx: &RequestContext) +where + I: IntoIterator>, + F: File, + B: Buffer + IoBufMut + Send, +{ + // Terminology: + // logical read = a request to read an arbitrary range of bytes from `file`; byte-level granularity + // chunk = we conceptually divide up the byte range of `file` into DIO_CHUNK_SIZEs ranges + // interest = a range within a chunk that a logical read is interested in; one logical read gets turned into many interests + // physical read = the read request we're going to issue to the OS; covers a range of chunks; chunk-level granularity + + // Preserve a copy of the logical reads for debug assertions at the end + #[cfg(debug_assertions)] + let (reads, assert_logical_reads) = { + let (reads, assert) = reads.into_iter().tee(); + (reads, Some(Vec::from_iter(assert))) + }; + #[cfg(not(debug_assertions))] + let (reads, assert_logical_reads): (_, Option>>) = (reads, None); + + // Plan which parts of which chunks need to be appended to which buffer + let mut by_chunk: BTreeMap>> = BTreeMap::new(); + struct Interest<'a, B: Buffer> { + logical_read: &'a LogicalRead, + offset_in_chunk: u64, + len: u64, + } + for logical_read in reads { + let LogicalRead { pos, state } = logical_read; + let mut state = state.borrow_mut(); + + // transition from NotStarted to Ongoing + let cur = std::mem::replace(&mut *state, LogicalReadState::Undefined); + let req_len = match cur { + LogicalReadState::NotStarted(buf) => { + if buf.len() != 0 { + panic!("The `LogicalRead`s that are passed in must be freshly created using `LogicalRead::new`"); + } + // buf.cap() == 0 is ok + + // transition into Ongoing state + let req_len = buf.cap(); + *state = LogicalReadState::Ongoing(buf); + req_len + } + x => panic!("must only call with fresh LogicalReads, got another state, leaving Undefined state behind state={x:?}"), + }; + + // plan which chunks we need to read from + let mut remaining = req_len; + let mut chunk_no = *pos / (DIO_CHUNK_SIZE.into_u64()); + let mut offset_in_chunk = pos.into_usize() % DIO_CHUNK_SIZE; + while remaining > 0 { + let remaining_in_chunk = std::cmp::min(remaining, DIO_CHUNK_SIZE - offset_in_chunk); + by_chunk.entry(chunk_no).or_default().push(Interest { + logical_read, + offset_in_chunk: offset_in_chunk.into_u64(), + len: remaining_in_chunk.into_u64(), + }); + offset_in_chunk = 0; + chunk_no += 1; + remaining -= remaining_in_chunk; + } + } + + // At this point, we could iterate over by_chunk, in chunk order, + // read each chunk from disk, and fill the buffers. + // However, we can merge adjacent chunks into batches of MAX_CHUNK_BATCH_SIZE + // so we issue fewer IOs = fewer roundtrips = lower overall latency. + struct PhysicalRead<'a, B: Buffer> { + start_chunk_no: u64, + nchunks: usize, + dsts: Vec>, + } + struct PhysicalInterest<'a, B: Buffer> { + logical_read: &'a LogicalRead, + offset_in_physical_read: u64, + len: u64, + } + let mut physical_reads: Vec> = Vec::new(); + let mut by_chunk = by_chunk.into_iter().peekable(); + loop { + let mut last_chunk_no = None; + let to_merge: Vec<(u64, Vec>)> = by_chunk + .peeking_take_while(|(chunk_no, _)| { + if let Some(last_chunk_no) = last_chunk_no { + if *chunk_no != last_chunk_no + 1 { + return false; + } + } + last_chunk_no = Some(*chunk_no); + true + }) + .take(MAX_CHUNK_BATCH_SIZE) + .collect(); // TODO: avoid this .collect() + let Some(start_chunk_no) = to_merge.first().map(|(chunk_no, _)| *chunk_no) else { + break; + }; + let nchunks = to_merge.len(); + let dsts = to_merge + .into_iter() + .enumerate() + .flat_map(|(i, (_, dsts))| { + dsts.into_iter().map( + move |Interest { + logical_read, + offset_in_chunk, + len, + }| { + PhysicalInterest { + logical_read, + offset_in_physical_read: i + .checked_mul(DIO_CHUNK_SIZE) + .unwrap() + .into_u64() + + offset_in_chunk, + len, + } + }, + ) + }) + .collect(); + physical_reads.push(PhysicalRead { + start_chunk_no, + nchunks, + dsts, + }); + } + drop(by_chunk); + + // Execute physical reads and fill the logical read buffers + // TODO: pipelined reads; prefetch; + let get_io_buffer = |nchunks| Vec::with_capacity(nchunks * DIO_CHUNK_SIZE); + for PhysicalRead { + start_chunk_no, + nchunks, + dsts, + } in physical_reads + { + let all_done = dsts + .iter() + .all(|PhysicalInterest { logical_read, .. }| logical_read.state.borrow().is_terminal()); + if all_done { + continue; + } + let read_offset = start_chunk_no + .checked_mul(DIO_CHUNK_SIZE.into_u64()) + .expect("we produce chunk_nos by dividing by DIO_CHUNK_SIZE earlier"); + let io_buf = get_io_buffer(nchunks).slice_full(); + let req_len = io_buf.len(); + let (io_buf_slice, nread) = match file.read_exact_at_eof_ok(read_offset, io_buf, ctx).await + { + Ok(t) => t, + Err(e) => { + let e = Arc::new(e); + for PhysicalInterest { logical_read, .. } in dsts { + *logical_read.state.borrow_mut() = LogicalReadState::Error(Arc::clone(&e)); + // this will make later reads for the given LogicalRead short-circuit, see top of loop body + } + continue; + } + }; + let io_buf = io_buf_slice.into_inner(); + assert!( + nread <= io_buf.len(), + "the last chunk in the file can be a short read, so, no ==" + ); + let io_buf = &io_buf[..nread]; + for PhysicalInterest { + logical_read, + offset_in_physical_read, + len, + } in dsts + { + let mut logical_read_state_borrow = logical_read.state.borrow_mut(); + let logical_read_buf = match &mut *logical_read_state_borrow { + LogicalReadState::NotStarted(_) => { + unreachable!("we transition it into Ongoing at function entry") + } + LogicalReadState::Ongoing(buf) => buf, + LogicalReadState::Ok(_) | LogicalReadState::Error(_) => { + continue; + } + LogicalReadState::Undefined => unreachable!(), + }; + let range_in_io_buf = std::ops::Range { + start: offset_in_physical_read as usize, + end: offset_in_physical_read as usize + len as usize, + }; + assert!(range_in_io_buf.end >= range_in_io_buf.start); + if range_in_io_buf.end > nread { + let msg = format!( + "physical read returned EOF where this logical read expected more data in the file: offset=0x{read_offset:x} req_len=0x{req_len:x} nread=0x{nread:x} {:?}", + &*logical_read_state_borrow + ); + logical_read_state_borrow.transition_to_terminal(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + msg, + ))); + continue; + } + let data = &io_buf[range_in_io_buf]; + + // Copy data from io buffer into the logical read buffer. + // (And in debug mode, validate that the buffer impl adheres to the Buffer trait spec.) + let pre = if cfg!(debug_assertions) { + Some((logical_read_buf.len(), logical_read_buf.cap())) + } else { + None + }; + logical_read_buf.extend_from_slice(data); + let post = if cfg!(debug_assertions) { + Some((logical_read_buf.len(), logical_read_buf.cap())) + } else { + None + }; + match (pre, post) { + (None, None) => {} + (Some(_), None) | (None, Some(_)) => unreachable!(), + (Some((pre_len, pre_cap)), Some((post_len, post_cap))) => { + assert_eq!(pre_len + len as usize, post_len); + assert_eq!(pre_cap, post_cap); + } + } + + if logical_read_buf.len() == logical_read_buf.cap() { + logical_read_state_borrow.transition_to_terminal(Ok(())); + } + } + } + + if let Some(assert_logical_reads) = assert_logical_reads { + for logical_read in assert_logical_reads { + assert!(logical_read.state.borrow().is_terminal()); + } + } +} + +impl LogicalReadState { + fn is_terminal(&self) -> bool { + match self { + LogicalReadState::NotStarted(_) | LogicalReadState::Ongoing(_) => false, + LogicalReadState::Ok(_) | LogicalReadState::Error(_) => true, + LogicalReadState::Undefined => unreachable!(), + } + } + fn transition_to_terminal(&mut self, err: std::io::Result<()>) { + let cur = std::mem::replace(self, LogicalReadState::Undefined); + let buf = match cur { + LogicalReadState::Ongoing(buf) => buf, + x => panic!("must only call in state Ongoing, got {x:?}"), + }; + *self = match err { + Ok(()) => LogicalReadState::Ok(buf), + Err(e) => LogicalReadState::Error(Arc::new(e)), + }; + } +} + +impl std::fmt::Debug for LogicalReadState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + #[derive(Debug)] + #[allow(unused)] + struct BufferDebug { + len: usize, + cap: usize, + } + impl<'a> From<&'a dyn Buffer> for BufferDebug { + fn from(buf: &'a dyn Buffer) -> Self { + Self { + len: buf.len(), + cap: buf.cap(), + } + } + } + match self { + LogicalReadState::NotStarted(b) => { + write!(f, "NotStarted({:?})", BufferDebug::from(b as &dyn Buffer)) + } + LogicalReadState::Ongoing(b) => { + write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer)) + } + LogicalReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)), + LogicalReadState::Error(e) => write!(f, "Error({:?})", e), + LogicalReadState::Undefined => write!(f, "Undefined"), + } + } +} + +#[derive(Debug)] +struct RwLockRefCell(RwLock); +impl RwLockRefCell { + fn new(value: T) -> Self { + Self(RwLock::new(value)) + } + fn borrow(&self) -> impl std::ops::Deref + '_ { + self.0.try_read().unwrap() + } + fn borrow_mut(&self) -> impl std::ops::DerefMut + '_ { + self.0.try_write().unwrap() + } + fn into_inner(self) -> T { + self.0.into_inner().unwrap() + } +} + +impl Buffer for Vec { + fn cap(&self) -> usize { + self.capacity() + } + + fn len(&self) -> usize { + self.len() + } + + fn extend_from_slice(&mut self, src: &[u8]) { + if self.len() + src.len() > self.cap() { + panic!("Buffer capacity exceeded"); + } + Vec::extend_from_slice(self, src); + } +} + +#[cfg(test)] +#[allow(clippy::assertions_on_constants)] +mod tests { + use rand::Rng; + + use crate::{ + context::DownloadBehavior, task_mgr::TaskKind, + virtual_file::owned_buffers_io::slice::SliceMutExt, + }; + + use super::*; + use std::{cell::RefCell, collections::VecDeque}; + + struct InMemoryFile { + content: Vec, + } + + impl InMemoryFile { + fn new_random(len: usize) -> Self { + Self { + content: rand::thread_rng() + .sample_iter(rand::distributions::Standard) + .take(len) + .collect(), + } + } + fn test_logical_read(&self, pos: u64, len: usize) -> TestLogicalRead { + let expected_result = if pos as usize + len > self.content.len() { + Err("InMemoryFile short read".to_string()) + } else { + Ok(self.content[pos as usize..pos as usize + len].to_vec()) + }; + TestLogicalRead::new(pos, len, expected_result) + } + } + + #[test] + fn test_in_memory_file() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let file = InMemoryFile::new_random(10); + let test_read = |pos, len| { + let buf = vec![0; len]; + let fut = file.read_exact_at_eof_ok(pos, buf.slice_full(), &ctx); + use futures::FutureExt; + let (slice, nread) = fut + .now_or_never() + .expect("impl never awaits") + .expect("impl never errors"); + let mut buf = slice.into_inner(); + buf.truncate(nread); + buf + }; + assert_eq!(test_read(0, 1), &file.content[0..1]); + assert_eq!(test_read(1, 2), &file.content[1..3]); + assert_eq!(test_read(9, 2), &file.content[9..]); + assert!(test_read(10, 2).is_empty()); + assert!(test_read(11, 2).is_empty()); + } + + impl File for InMemoryFile { + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + &'b self, + start: u64, + mut dst: Slice, + _ctx: &'a RequestContext, + ) -> std::io::Result<(Slice, usize)> { + let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed(); + let nread = { + let req_len = dst_slice.len(); + let len = std::cmp::min(req_len, self.content.len().saturating_sub(start as usize)); + if start as usize >= self.content.len() { + 0 + } else { + dst_slice[..len] + .copy_from_slice(&self.content[start as usize..start as usize + len]); + len + } + }; + rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[nread..]); // to discover bugs + Ok((dst, nread)) + } + } + + #[derive(Clone)] + struct TestLogicalRead { + pos: u64, + len: usize, + expected_result: Result, String>, + } + + impl TestLogicalRead { + fn new(pos: u64, len: usize, expected_result: Result, String>) -> Self { + Self { + pos, + len, + expected_result, + } + } + fn make_logical_read(&self) -> LogicalRead> { + LogicalRead::new(self.pos, Vec::with_capacity(self.len)) + } + } + + async fn execute_and_validate_test_logical_reads( + file: &F, + test_logical_reads: I, + ctx: &RequestContext, + ) where + I: IntoIterator, + F: File, + { + let (tmp, test_logical_reads) = test_logical_reads.into_iter().tee(); + let logical_reads = tmp.map(|tr| tr.make_logical_read()).collect::>(); + execute(file, logical_reads.iter(), ctx).await; + for (logical_read, test_logical_read) in logical_reads.into_iter().zip(test_logical_reads) { + let actual = logical_read.into_result().expect("we call execute()"); + match (actual, test_logical_read.expected_result) { + (Ok(actual), Ok(expected)) if actual == expected => {} + (Err(actual), Err(expected)) => { + assert_eq!(actual.to_string(), expected); + } + (actual, expected) => panic!("expected {expected:?}\nactual {actual:?}"), + } + } + } + + #[tokio::test] + async fn test_blackbox() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let cs = DIO_CHUNK_SIZE; + let cs_u64 = cs.into_u64(); + + let file = InMemoryFile::new_random(10 * cs); + + let test_logical_reads = vec![ + file.test_logical_read(0, 1), + // adjacent to logical_read0 + file.test_logical_read(1, 2), + // gap + // spans adjacent chunks + file.test_logical_read(cs_u64 - 1, 2), + // gap + // tail of chunk 3, all of chunk 4, and 2 bytes of chunk 5 + file.test_logical_read(3 * cs_u64 - 1, cs + 2), + // gap + file.test_logical_read(5 * cs_u64, 1), + ]; + let num_test_logical_reads = test_logical_reads.len(); + let test_logical_reads_perms = test_logical_reads + .into_iter() + .permutations(num_test_logical_reads); + + // test all orderings of LogicalReads, the order shouldn't matter for the results + for test_logical_reads in test_logical_reads_perms { + execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await; + } + } + + #[tokio::test] + #[should_panic] + async fn test_reusing_logical_reads_panics() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let file = InMemoryFile::new_random(DIO_CHUNK_SIZE); + let a = file.test_logical_read(23, 10); + let logical_reads = vec![a.make_logical_read()]; + execute(&file, &logical_reads, &ctx).await; + // reuse pancis + execute(&file, &logical_reads, &ctx).await; + } + + struct RecorderFile<'a> { + recorded: RefCell>, + file: &'a InMemoryFile, + } + + struct RecordedRead { + pos: u64, + req_len: usize, + res: Vec, + } + + impl<'a> RecorderFile<'a> { + fn new(file: &'a InMemoryFile) -> RecorderFile<'a> { + Self { + recorded: Default::default(), + file, + } + } + } + + impl<'x> File for RecorderFile<'x> { + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + &'b self, + start: u64, + dst: Slice, + ctx: &'a RequestContext, + ) -> std::io::Result<(Slice, usize)> { + let (dst, nread) = self.file.read_exact_at_eof_ok(start, dst, ctx).await?; + self.recorded.borrow_mut().push(RecordedRead { + pos: start, + req_len: dst.bytes_total(), + res: Vec::from(&dst[..nread]), + }); + Ok((dst, nread)) + } + } + + #[tokio::test] + async fn test_logical_reads_to_same_chunk_are_merged_into_one_chunk_read() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + let file = InMemoryFile::new_random(2 * DIO_CHUNK_SIZE); + + let a = file.test_logical_read(DIO_CHUNK_SIZE.into_u64(), 10); + let b = file.test_logical_read(DIO_CHUNK_SIZE.into_u64() + 30, 20); + + let recorder = RecorderFile::new(&file); + + execute_and_validate_test_logical_reads(&recorder, vec![a, b], &ctx).await; + + let recorded = recorder.recorded.borrow(); + assert_eq!(recorded.len(), 1); + let RecordedRead { pos, req_len, .. } = &recorded[0]; + assert_eq!(*pos, DIO_CHUNK_SIZE.into_u64()); + assert_eq!(*req_len, DIO_CHUNK_SIZE); + } + + #[tokio::test] + async fn test_max_chunk_batch_size_is_respected() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + let file = InMemoryFile::new_random(4 * MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE); + + // read the 10th byte of each chunk 3 .. 3+2*MAX_CHUNK_BATCH_SIZE + assert!(3 < MAX_CHUNK_BATCH_SIZE, "test assumption"); + assert!(10 < DIO_CHUNK_SIZE, "test assumption"); + let mut test_logical_reads = Vec::new(); + for i in 3..3 + MAX_CHUNK_BATCH_SIZE + MAX_CHUNK_BATCH_SIZE / 2 { + test_logical_reads + .push(file.test_logical_read(i.into_u64() * DIO_CHUNK_SIZE.into_u64() + 10, 1)); + } + + let recorder = RecorderFile::new(&file); + + execute_and_validate_test_logical_reads(&recorder, test_logical_reads, &ctx).await; + + let recorded = recorder.recorded.borrow(); + assert_eq!(recorded.len(), 2); + { + let RecordedRead { pos, req_len, .. } = &recorded[0]; + assert_eq!(*pos as usize, 3 * DIO_CHUNK_SIZE); + assert_eq!(*req_len, MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE); + } + { + let RecordedRead { pos, req_len, .. } = &recorded[1]; + assert_eq!(*pos as usize, (3 + MAX_CHUNK_BATCH_SIZE) * DIO_CHUNK_SIZE); + assert_eq!(*req_len, MAX_CHUNK_BATCH_SIZE / 2 * DIO_CHUNK_SIZE); + } + } + + #[tokio::test] + async fn test_batch_breaks_if_chunk_is_not_interesting() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + assert!(MAX_CHUNK_BATCH_SIZE > 10, "test assumption"); + let file = InMemoryFile::new_random(3 * DIO_CHUNK_SIZE); + + let a = file.test_logical_read(0, 1); // chunk 0 + let b = file.test_logical_read(2 * DIO_CHUNK_SIZE.into_u64(), 1); // chunk 2 + + let recorder = RecorderFile::new(&file); + + execute_and_validate_test_logical_reads(&recorder, vec![a, b], &ctx).await; + + let recorded = recorder.recorded.borrow(); + + assert_eq!(recorded.len(), 2); + { + let RecordedRead { pos, req_len, .. } = &recorded[0]; + assert_eq!(*pos, 0); + assert_eq!(*req_len, DIO_CHUNK_SIZE); + } + { + let RecordedRead { pos, req_len, .. } = &recorded[1]; + assert_eq!(*pos, 2 * DIO_CHUNK_SIZE.into_u64()); + assert_eq!(*req_len, DIO_CHUNK_SIZE); + } + } + + struct ExpectedRead { + expect_pos: u64, + expect_len: usize, + respond: Result, String>, + } + + struct MockFile { + expected: RefCell>, + } + + impl Drop for MockFile { + fn drop(&mut self) { + assert!( + self.expected.borrow().is_empty(), + "expected reads not satisfied" + ); + } + } + + macro_rules! mock_file { + ($($pos:expr , $len:expr => $respond:expr),* $(,)?) => {{ + MockFile { + expected: RefCell::new(VecDeque::from(vec![$(ExpectedRead { + expect_pos: $pos, + expect_len: $len, + respond: $respond, + }),*])), + } + }}; + } + + impl File for MockFile { + async fn read_exact_at_eof_ok<'a, 'b, B: IoBufMut + Send>( + &'b self, + start: u64, + mut dst: Slice, + _ctx: &'a RequestContext, + ) -> std::io::Result<(Slice, usize)> { + let ExpectedRead { + expect_pos, + expect_len, + respond, + } = self + .expected + .borrow_mut() + .pop_front() + .expect("unexpected read"); + assert_eq!(start, expect_pos); + assert_eq!(dst.bytes_total(), expect_len); + match respond { + Ok(mocked_bytes) => { + let len = std::cmp::min(dst.bytes_total(), mocked_bytes.len()); + let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed(); + dst_slice[..len].copy_from_slice(&mocked_bytes[..len]); + rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[len..]); // to discover bugs + Ok((dst, len)) + } + Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)), + } + } + } + + #[tokio::test] + async fn test_mock_file() { + // Self-test to ensure the relevant features of mock file work as expected. + + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + let mock_file = mock_file! { + 0 , 512 => Ok(vec![0; 512]), + 512 , 512 => Ok(vec![1; 512]), + 1024 , 512 => Ok(vec![2; 10]), + 2048, 1024 => Err("foo".to_owned()), + }; + + let buf = Vec::with_capacity(512); + let (buf, nread) = mock_file + .read_exact_at_eof_ok(0, buf.slice_full(), &ctx) + .await + .unwrap(); + assert_eq!(nread, 512); + assert_eq!(&buf.into_inner()[..nread], &[0; 512]); + + let buf = Vec::with_capacity(512); + let (buf, nread) = mock_file + .read_exact_at_eof_ok(512, buf.slice_full(), &ctx) + .await + .unwrap(); + assert_eq!(nread, 512); + assert_eq!(&buf.into_inner()[..nread], &[1; 512]); + + let buf = Vec::with_capacity(512); + let (buf, nread) = mock_file + .read_exact_at_eof_ok(1024, buf.slice_full(), &ctx) + .await + .unwrap(); + assert_eq!(nread, 10); + assert_eq!(&buf.into_inner()[..nread], &[2; 10]); + + let buf = Vec::with_capacity(1024); + let err = mock_file + .read_exact_at_eof_ok(2048, buf.slice_full(), &ctx) + .await + .err() + .unwrap(); + assert_eq!(err.to_string(), "foo"); + } + + #[tokio::test] + async fn test_error_on_one_chunk_read_fails_only_dependent_logical_reads() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + let test_logical_reads = vec![ + // read spanning two batches + TestLogicalRead::new( + DIO_CHUNK_SIZE.into_u64() / 2, + MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE, + Err("foo".to_owned()), + ), + // second read in failing chunk + TestLogicalRead::new( + (MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE).into_u64() + DIO_CHUNK_SIZE.into_u64() - 10, + 5, + Err("foo".to_owned()), + ), + // read unaffected + TestLogicalRead::new( + (MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE).into_u64() + + 2 * DIO_CHUNK_SIZE.into_u64() + + 10, + 5, + Ok(vec![1; 5]), + ), + ]; + let (tmp, test_logical_reads) = test_logical_reads.into_iter().tee(); + let test_logical_read_perms = tmp.permutations(test_logical_reads.len()); + + for test_logical_reads in test_logical_read_perms { + let file = mock_file!( + 0, MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE => Ok(vec![0; MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE]), + (MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE).into_u64(), DIO_CHUNK_SIZE => Err("foo".to_owned()), + (MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE + 2*DIO_CHUNK_SIZE).into_u64(), DIO_CHUNK_SIZE => Ok(vec![1; DIO_CHUNK_SIZE]), + ); + execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await; + } + } + + struct TestShortReadsSetup { + ctx: RequestContext, + file: InMemoryFile, + written: u64, + } + fn setup_short_chunk_read_tests() -> TestShortReadsSetup { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + assert!(DIO_CHUNK_SIZE > 20, "test assumption"); + let written = (2 * DIO_CHUNK_SIZE - 10).into_u64(); + let file = InMemoryFile::new_random(written as usize); + TestShortReadsSetup { ctx, file, written } + } + + #[tokio::test] + async fn test_short_chunk_read_from_written_range() { + // Test what happens if there are logical reads + // that start within the last chunk, and + // the last chunk is not the full chunk length. + // + // The read should succeed despite the short chunk length. + let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests(); + + let a = file.test_logical_read(written - 10, 5); + let recorder = RecorderFile::new(&file); + + execute_and_validate_test_logical_reads(&recorder, vec![a], &ctx).await; + + let recorded = recorder.recorded.borrow(); + assert_eq!(recorded.len(), 1); + let RecordedRead { pos, req_len, res } = &recorded[0]; + assert_eq!(*pos, DIO_CHUNK_SIZE.into_u64()); + assert_eq!(*req_len, DIO_CHUNK_SIZE); + assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]); + } + + #[tokio::test] + async fn test_short_chunk_read_and_logical_read_from_unwritten_range() { + // Test what happens if there are logical reads + // that start within the last chunk, and + // the last chunk is not the full chunk length, and + // the logical reads end in the unwritten range. + // + // All should fail with UnexpectedEof and have the same IO pattern. + async fn the_impl(offset_delta: i64) { + let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests(); + + let offset = u64::try_from( + i64::try_from(written) + .unwrap() + .checked_add(offset_delta) + .unwrap(), + ) + .unwrap(); + let a = file.test_logical_read(offset, 5); + let recorder = RecorderFile::new(&file); + let a_vr = a.make_logical_read(); + execute(&recorder, vec![&a_vr], &ctx).await; + + // validate the LogicalRead result + let a_res = a_vr.into_result().unwrap(); + let a_err = a_res.unwrap_err(); + assert_eq!(a_err.kind(), std::io::ErrorKind::UnexpectedEof); + + // validate the IO pattern + let recorded = recorder.recorded.borrow(); + assert_eq!(recorded.len(), 1); + let RecordedRead { pos, req_len, res } = &recorded[0]; + assert_eq!(*pos, DIO_CHUNK_SIZE.into_u64()); + assert_eq!(*req_len, DIO_CHUNK_SIZE); + assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]); + } + + the_impl(-1).await; // start == length - 1 + the_impl(0).await; // start == length + the_impl(1).await; // start == length + 1 + } + + // TODO: mixed: some valid, some UnexpectedEof + + // TODO: same tests but with merges +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 098c196ee8ce..e1dd80fbf2d2 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -69,7 +69,7 @@ use crate::{ config::defaults::DEFAULT_PITR_INTERVAL, layer_map::{LayerMap, SearchResult}, metadata::TimelineMetadata, - storage_layer::PersistentLayerDesc, + storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc}, }, walredo, }; @@ -1907,6 +1907,8 @@ impl Timeline { true } else if projected_layer_size >= checkpoint_distance { + // NB: this check is relied upon by: + let _ = IndexEntry::validate_checkpoint_distance; info!( "Will roll layer at {} with layer size {} due to layer size ({})", projected_lsn, layer_size, projected_layer_size @@ -5702,7 +5704,7 @@ impl<'a> TimelineWriter<'a> { return Ok(()); } - let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch); + let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch)?; let batch_max_lsn = serialized_batch.max_lsn; let buf_size: u64 = serialized_batch.raw.len() as u64; diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index f8f37b17e33f..568cf62e5617 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -78,6 +78,7 @@ where .expect("must not use after we returned an error") } + /// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted. #[cfg_attr(target_os = "macos", allow(dead_code))] pub async fn write_buffered( &mut self, diff --git a/test_runner/regress/test_pageserver_layer_rolling.py b/test_runner/regress/test_pageserver_layer_rolling.py index 66b6185aaae3..f6404d68ac1a 100644 --- a/test_runner/regress/test_pageserver_layer_rolling.py +++ b/test_runner/regress/test_pageserver_layer_rolling.py @@ -247,9 +247,10 @@ def test_total_size_limit(neon_env_builder: NeonEnvBuilder): compaction_period_s = 10 + checkpoint_distance = 1024**3 tenant_conf = { # Large space + time thresholds: effectively disable these limits - "checkpoint_distance": f"{1024 ** 4}", + "checkpoint_distance": f"{checkpoint_distance}", "checkpoint_timeout": "3600s", "compaction_period": f"{compaction_period_s}s", } @@ -269,7 +270,11 @@ def test_total_size_limit(neon_env_builder: NeonEnvBuilder): for tenant, timeline, last_flush_lsn in last_flush_lsns: http_client = env.pageserver.http_client() initdb_lsn = Lsn(http_client.timeline_detail(tenant, timeline)["initdb_lsn"]) - total_bytes_ingested += last_flush_lsn - initdb_lsn + this_timeline_ingested = last_flush_lsn - initdb_lsn + assert ( + this_timeline_ingested < checkpoint_distance * 0.8 + ), "this test is supposed to fill InMemoryLayer" + total_bytes_ingested += this_timeline_ingested log.info(f"Ingested {total_bytes_ingested} bytes since initdb (vs max dirty {max_dirty_data})") assert total_bytes_ingested > max_dirty_data