From e408cba4ce7e6509cad3a702cf0c429c30b204f8 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 29 Jul 2024 10:57:08 +0000 Subject: [PATCH] WIP --- pageserver/src/l0_flush.rs | 19 +- pageserver/src/tenant/block_io.rs | 23 --- pageserver/src/tenant/ephemeral_file.rs | 130 +++--------- .../src/tenant/ephemeral_file/page_caching.rs | 187 ++++++++---------- .../ephemeral_file/zero_padded_read_write.rs | 30 +-- .../src/tenant/storage_layer/delta_layer.rs | 6 +- .../tenant/storage_layer/inmemory_layer.rs | 178 +++++++---------- pageserver/src/tenant/timeline.rs | 1 + 8 files changed, 207 insertions(+), 367 deletions(-) diff --git a/pageserver/src/l0_flush.rs b/pageserver/src/l0_flush.rs index 10187f2ba3092..313a7961a678b 100644 --- a/pageserver/src/l0_flush.rs +++ b/pageserver/src/l0_flush.rs @@ -1,15 +1,10 @@ use std::{num::NonZeroUsize, sync::Arc}; -use crate::tenant::ephemeral_file; - #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)] #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)] pub enum L0FlushConfig { - PageCached, #[serde(rename_all = "snake_case")] - Direct { - max_concurrency: NonZeroUsize, - }, + Direct { max_concurrency: NonZeroUsize }, } impl Default for L0FlushConfig { @@ -25,14 +20,12 @@ impl Default for L0FlushConfig { pub struct L0FlushGlobalState(Arc); pub enum Inner { - PageCached, Direct { semaphore: tokio::sync::Semaphore }, } impl L0FlushGlobalState { pub fn new(config: L0FlushConfig) -> Self { match config { - L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)), L0FlushConfig::Direct { max_concurrency } => { let semaphore = tokio::sync::Semaphore::new(max_concurrency.get()); Self(Arc::new(Inner::Direct { semaphore })) @@ -44,13 +37,3 @@ impl L0FlushGlobalState { &self.0 } } - -impl L0FlushConfig { - pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite { - use L0FlushConfig::*; - match self { - PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes, - Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No, - } - } -} diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 601b095155195..3afa3a86b9487 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -2,7 +2,6 @@ //! Low-level Block-oriented I/O functions //! -use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; @@ -81,9 +80,7 @@ impl<'a> Deref for BlockLease<'a> { /// Unlike traits, we also support the read function to be async though. pub(crate) enum BlockReaderRef<'a> { FileBlockReader(&'a FileBlockReader<'a>), - EphemeralFile(&'a EphemeralFile), Adapter(Adapter<&'a DeltaLayerInner>), - Slice(&'a [u8]), #[cfg(test)] TestDisk(&'a super::disk_btree::tests::TestDisk), #[cfg(test)] @@ -100,9 +97,7 @@ impl<'a> BlockReaderRef<'a> { use BlockReaderRef::*; match self { FileBlockReader(r) => r.read_blk(blknum, ctx).await, - EphemeralFile(r) => r.read_blk(blknum, ctx).await, Adapter(r) => r.read_blk(blknum, ctx).await, - Slice(s) => Self::read_blk_slice(s, blknum), #[cfg(test)] TestDisk(r) => r.read_blk(blknum), #[cfg(test)] @@ -111,24 +106,6 @@ impl<'a> BlockReaderRef<'a> { } } -impl<'a> BlockReaderRef<'a> { - fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result { - let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap(); - let end = start.checked_add(PAGE_SZ).unwrap(); - if end > slice.len() { - return Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - format!("slice too short, len={} end={}", slice.len(), end), - )); - } - let slice = &slice[start..end]; - let page_sized: &[u8; PAGE_SZ] = slice - .try_into() - .expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ"); - Ok(BlockLease::Slice(page_sized)) - } -} - /// /// A "cursor" for efficiently reading multiple pages from a BlockReader /// diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 770f3ca5f05ee..e782e01859936 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -4,7 +4,6 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; -use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; use crate::virtual_file::{self, VirtualFile}; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; @@ -20,8 +19,9 @@ pub struct EphemeralFile { rw: page_caching::RW, } -mod page_caching; -pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite; +pub(super) mod page_caching; + +use super::storage_layer::inmemory_layer::InMemoryLayerIndexValue; mod zero_padded_read_write; impl EphemeralFile { @@ -52,16 +52,14 @@ impl EphemeralFile { ) .await?; - let prewarm = conf.l0_flush.prewarm_on_write(); - Ok(EphemeralFile { _tenant_shard_id: tenant_shard_id, _timeline_id: timeline_id, - rw: page_caching::RW::new(file, prewarm, gate_guard), + rw: page_caching::RW::new(file, gate_guard), }) } - pub(crate) fn len(&self) -> u64 { + pub(crate) fn len(&self) -> u32 { self.rw.bytes_written() } @@ -74,37 +72,40 @@ impl EphemeralFile { self.rw.load_to_vec(ctx).await } - pub(crate) async fn read_blk( + pub(crate) async fn read_page( &self, blknum: u32, + dst: page_caching::PageBuf, ctx: &RequestContext, - ) -> Result { - self.rw.read_blk(blknum, ctx).await + ) -> Result { + self.rw.read_page(blknum, dst, ctx).await } pub(crate) async fn write_blob( &mut self, - srcbuf: &[u8], + buf: &[u8], ctx: &RequestContext, - ) -> Result { + ) -> Result { let pos = self.rw.bytes_written(); - - // Write the length field - if srcbuf.len() < 0x80 { - // short one-byte length header - let len_buf = [srcbuf.len() as u8]; - - self.rw.write_all_borrowed(&len_buf, ctx).await?; - } else { - let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32); - len_buf[0] |= 0x80; - self.rw.write_all_borrowed(&len_buf, ctx).await?; - } - - // Write the payload - self.rw.write_all_borrowed(srcbuf, ctx).await?; - - Ok(pos) + let len = u32::try_from(buf.len()).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + anyhow::anyhow!( + "EphemeralFile::write_blob value too large: {}: {e}", + buf.len() + ), + ) + })?; + pos.checked_add(len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "EphemeralFile::write_blob: overflow", + ) + })?; + + self.rw.write_all_borrowed(buf, ctx).await?; + + Ok(InMemoryLayerIndexValue { pos, len }) } } @@ -117,19 +118,11 @@ pub fn is_ephemeral_file(filename: &str) -> bool { } } -impl BlockReader for EphemeralFile { - fn block_cursor(&self) -> super::block_io::BlockCursor<'_> { - BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self)) - } -} - #[cfg(test)] mod tests { use super::*; use crate::context::DownloadBehavior; use crate::task_mgr::TaskKind; - use crate::tenant::block_io::BlockReaderRef; - use rand::{thread_rng, RngCore}; use std::fs; use std::str::FromStr; @@ -160,69 +153,6 @@ mod tests { Ok((conf, tenant_shard_id, timeline_id, ctx)) } - #[tokio::test] - async fn test_ephemeral_blobs() -> Result<(), io::Error> { - let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?; - - let gate = utils::sync::gate::Gate::default(); - - let entered = gate.enter().unwrap(); - - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?; - - let pos_foo = file.write_blob(b"foo", &ctx).await?; - assert_eq!( - b"foo", - file.block_cursor() - .read_blob(pos_foo, &ctx) - .await? - .as_slice() - ); - let pos_bar = file.write_blob(b"bar", &ctx).await?; - assert_eq!( - b"foo", - file.block_cursor() - .read_blob(pos_foo, &ctx) - .await? - .as_slice() - ); - assert_eq!( - b"bar", - file.block_cursor() - .read_blob(pos_bar, &ctx) - .await? - .as_slice() - ); - - let mut blobs = Vec::new(); - for i in 0..10000 { - let data = Vec::from(format!("blob{}", i).as_bytes()); - let pos = file.write_blob(&data, &ctx).await?; - blobs.push((pos, data)); - } - // also test with a large blobs - for i in 0..100 { - let data = format!("blob{}", i).as_bytes().repeat(100); - let pos = file.write_blob(&data, &ctx).await?; - blobs.push((pos, data)); - } - - let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file)); - for (pos, expected) in blobs { - let actual = cursor.read_blob(pos, &ctx).await?; - assert_eq!(actual, expected); - } - - // Test a large blob that spans multiple pages - let mut large_data = vec![0; 20000]; - thread_rng().fill_bytes(&mut large_data); - let pos_large = file.write_blob(&large_data, &ctx).await?; - let result = file.block_cursor().read_blob(pos_large, &ctx).await?; - assert_eq!(result, large_data); - - Ok(()) - } - #[tokio::test] async fn ephemeral_file_holds_gate_open() { const FOREVER: std::time::Duration = std::time::Duration::from_secs(5); diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 7355b3b5a37d5..d3e6f232cceef 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -3,11 +3,9 @@ use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; -use crate::tenant::block_io::BlockLease; use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; use crate::virtual_file::VirtualFile; -use once_cell::sync::Lazy; use std::io::{self, ErrorKind}; use std::ops::{Deref, Range}; use tokio_epoll_uring::BoundedBuf; @@ -23,28 +21,77 @@ pub struct RW { _gate_guard: utils::sync::gate::GateGuard, } -/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`], -/// should we pre-warm the [`crate::page_cache`] with the contents? -#[derive(Clone, Copy)] -pub enum PrewarmOnWrite { - Yes, - No, +/// Result of [`RW::read_page`]. +pub(crate) enum ReadResult<'a> { + EphemeralFileMutableTail(PageBuf, &'a [u8; PAGE_SZ]), + Owned(PageBuf), +} + +impl ReadResult<'_> { + pub(crate) fn contents(&self) -> &[u8; PAGE_SZ] { + match self { + ReadResult::EphemeralFileMutableTail(_, buf) => buf, + ReadResult::Owned(buf) => buf.deref(), + } + } + pub(crate) fn into_page_buf(self) -> PageBuf { + match self { + ReadResult::EphemeralFileMutableTail(buf, _) => buf, + ReadResult::Owned(buf) => buf, + } + } +} + +pub(crate) struct PageBuf(Box<[u8; PAGE_SZ]>); + +impl From> for PageBuf { + fn from(buf: Box<[u8; PAGE_SZ]>) -> Self { + Self(buf) + } +} + +impl Deref for PageBuf { + type Target = [u8; PAGE_SZ]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +// Safety: `PageBuf` is a fixed-size buffer that is zero-initialized. +unsafe impl tokio_epoll_uring::IoBuf for PageBuf { + fn stable_ptr(&self) -> *const u8 { + self.0.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.0.len() + } + + fn bytes_total(&self) -> usize { + self.0.len() + } +} + +// Safety: the `&mut self` guarantees no aliasing. `set_init` is safe +// because the buffer is always fully initialized. +unsafe impl tokio_epoll_uring::IoBufMut for PageBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.0.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + // this is a no-op because the buffer is always fully initialized + assert!(pos <= self.0.len()); + } } impl RW { - pub fn new( - file: VirtualFile, - prewarm_on_write: PrewarmOnWrite, - _gate_guard: utils::sync::gate::GateGuard, - ) -> Self { + pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self { let page_cache_file_id = page_cache::next_file_id(); Self { page_cache_file_id, - rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new( - page_cache_file_id, - file, - prewarm_on_write, - )), + rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(file)), _gate_guard, } } @@ -57,17 +104,17 @@ impl RW { &mut self, srcbuf: &[u8], ctx: &RequestContext, - ) -> Result { + ) -> Result<(), io::Error> { // It doesn't make sense to proactively fill the page cache on the Pageserver write path // because Compute is unlikely to access recently written data. - self.rw.write_all_borrowed(srcbuf, ctx).await + self.rw.write_all_borrowed(srcbuf, ctx).await.map(|_| ()) } - pub(crate) fn bytes_written(&self) -> u64 { + pub(crate) fn bytes_written(&self) -> u32 { self.rw.bytes_written() } - /// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer. + /// Load all blocks that can be read via [`Self::read_page`] into a contiguous memory buffer. /// /// This includes the blocks that aren't yet flushed to disk by the internal buffered writer. /// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`]. @@ -104,45 +151,24 @@ impl RW { Ok(vec) } - pub(crate) async fn read_blk( + pub(crate) async fn read_page( &self, blknum: u32, + buf: PageBuf, ctx: &RequestContext, - ) -> Result { + ) -> Result { match self.rw.read_blk(blknum).await? { zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => { - let cache = page_cache::get(); - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, ctx) + let buf = writer + .file + .read_exact_at(buf.slice_full(), blknum as u64 * PAGE_SZ as u64, ctx) .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - // order path before error because error is anyhow::Error => might have many contexts - format!( - "ephemeral file: read immutable page #{}: {}: {:#}", - blknum, - self.rw.as_writer().file.path, - e, - ), - ) - })? { - page_cache::ReadBufResult::Found(guard) => { - return Ok(BlockLease::PageReadGuard(guard)) - } - page_cache::ReadBufResult::NotFound(write_guard) => { - let write_guard = writer - .file - .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx) - .await?; - let read_guard = write_guard.mark_valid(); - return Ok(BlockLease::PageReadGuard(read_guard)); - } - } - } - zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => { - Ok(BlockLease::EphemeralFileMutableTail(buffer)) + .map(|slice| slice.into_inner())?; + Ok(ReadResult::Owned(buf)) } + zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { + buffer: tail_ref, + } => Ok(ReadResult::EphemeralFileMutableTail(buf, tail_ref)), } } } @@ -172,22 +198,14 @@ impl Drop for RW { } struct PreWarmingWriter { - prewarm_on_write: PrewarmOnWrite, nwritten_blocks: u32, - page_cache_file_id: page_cache::FileId, file: VirtualFile, } impl PreWarmingWriter { - fn new( - page_cache_file_id: page_cache::FileId, - file: VirtualFile, - prewarm_on_write: PrewarmOnWrite, - ) -> Self { + fn new(file: VirtualFile) -> Self { Self { - prewarm_on_write, nwritten_blocks: 0, - page_cache_file_id, file, } } @@ -241,49 +259,6 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi let nblocks = buflen / PAGE_SZ; let nblocks32 = u32::try_from(nblocks).unwrap(); - - if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) { - // Pre-warm page cache with the contents. - // At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming - // benefits the code that writes InMemoryLayer=>L0 layers. - - let cache = page_cache::get(); - static CTX: Lazy = Lazy::new(|| { - RequestContext::new( - crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache, - crate::context::DownloadBehavior::Error, - ) - }); - for blknum_in_buffer in 0..nblocks { - let blk_in_buffer = - &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ]; - let blknum = self - .nwritten_blocks - .checked_add(blknum_in_buffer as u32) - .unwrap(); - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, &CTX) - .await - { - Err(e) => { - error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); - // fail gracefully, it's not the end of the world if we can't pre-warm the cache here - } - Ok(v) => match v { - page_cache::ReadBufResult::Found(_guard) => { - // This function takes &mut self, so, it shouldn't be possible to reach this point. - unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \ - and this function takes &mut self, so, no concurrent read_blk is possible"); - } - page_cache::ReadBufResult::NotFound(mut write_guard) => { - write_guard.copy_from_slice(blk_in_buffer); - let _ = write_guard.mark_valid(); - } - }, - } - } - } - self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap(); Ok((buflen, buf)) } diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs index fe310acab888d..063fd854c5883 100644 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs +++ b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs @@ -19,6 +19,8 @@ mod zero_padded; +use anyhow::Context; + use crate::{ context::RequestContext, page_cache::PAGE_SZ, @@ -69,10 +71,12 @@ where self.buffered_writer.write_buffered_borrowed(buf, ctx).await } - pub fn bytes_written(&self) -> u64 { + pub fn bytes_written(&self) -> u32 { let flushed_offset = self.buffered_writer.as_inner().bytes_written(); + let flushed_offset = u32::try_from(flushed_offset).with_context(|| format!("buffered_writer.write_buffered_borrowed() disallows sizes larger than u32::MAX: {flushed_offset}")).unwrap(); let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); - flushed_offset + u64::try_from(buffer.pending()).unwrap() + let buffer_pending = u32::try_from(buffer.pending()).expect("TAIL_SZ is < u32::MAX"); + flushed_offset.checked_add(buffer_pending).with_context(|| format!("buffered_writer.write_buffered_borrowed() disallows sizes larger than u32::MAX: {flushed_offset} + {buffer_pending}")).unwrap() } /// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`]. @@ -91,10 +95,12 @@ where } pub(crate) async fn read_blk(&self, blknum: u32) -> Result, std::io::Error> { - let flushed_offset = self.buffered_writer.as_inner().bytes_written(); + let flushed_offset = + u32::try_from(self.buffered_writer.as_inner().bytes_written()).expect(""); let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); - let buffered_offset = flushed_offset + u64::try_from(buffer.pending()).unwrap(); - let read_offset = (blknum as u64) * (PAGE_SZ as u64); + let buffered_offset = flushed_offset + u32::try_from(buffer.pending()).unwrap(); + let page_sz = u32::try_from(PAGE_SZ).unwrap(); + let read_offset = blknum.checked_mul(page_sz).unwrap(); // The trailing page ("block") might only be partially filled, // yet the blob_io code relies on us to return a full PAGE_SZed slice anyway. @@ -103,28 +109,28 @@ where // DeltaLayer probably has the same issue, not sure why it needs no special treatment. // => check here that the read doesn't go beyond this potentially trailing // => the zero-padding is done in the `else` branch below - let blocks_written = if buffered_offset % (PAGE_SZ as u64) == 0 { - buffered_offset / (PAGE_SZ as u64) + let blocks_written = if buffered_offset % page_sz == 0 { + buffered_offset / page_sz } else { - (buffered_offset / (PAGE_SZ as u64)) + 1 + (buffered_offset / page_sz) + 1 }; - if (blknum as u64) >= blocks_written { + if blknum >= blocks_written { return Err(std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("read past end of ephemeral_file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}"))); } // assertions for the `if-else` below assert_eq!( - flushed_offset % (TAIL_SZ as u64), 0, + flushed_offset % (u32::try_from(TAIL_SZ).unwrap()), 0, "we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks" ); assert_eq!( - flushed_offset % (PAGE_SZ as u64), + flushed_offset % page_sz, 0, "the logic below can't handle if the page is spread across the flushed part and the buffer" ); if read_offset < flushed_offset { - assert!(read_offset + (PAGE_SZ as u64) <= flushed_offset); + assert!(read_offset + page_sz <= flushed_offset); Ok(ReadResult::NeedsReadFromWriter { writer: self.as_writer(), }) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index d5b40c74f2be8..38033f8064bde 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -64,7 +64,7 @@ use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; use tokio::sync::OnceCell; -use tokio_epoll_uring::IoBufMut; +use tokio_epoll_uring::IoBuf; use tracing::*; use utils::{ @@ -458,7 +458,7 @@ impl DeltaLayerWriterInner { ctx: &RequestContext, ) -> (FullSlice, anyhow::Result<()>) where - Buf: IoBufMut + Send, + Buf: IoBuf + Send, { assert!( self.lsn_range.start <= lsn, @@ -666,7 +666,7 @@ impl DeltaLayerWriter { ctx: &RequestContext, ) -> (FullSlice, anyhow::Result<()>) where - Buf: IoBufMut + Send, + Buf: IoBuf + Send, { self.inner .as_mut() diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 748d79c149697..fa9d8e9aadc6f 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -8,13 +8,14 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value}; -use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef}; +use crate::tenant::ephemeral_file::page_caching::PageBuf; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; -use crate::{l0_flush, page_cache, walrecord}; +use crate::{l0_flush, page_cache}; use anyhow::{anyhow, Result}; +use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; use pageserver_api::keyspace::KeySpace; @@ -80,7 +81,7 @@ pub struct InMemoryLayerInner { /// All versions of all pages in the layer are kept here. Indexed /// by block number and LSN. The value is an offset into the /// ephemeral file where the page version is stored. - index: BTreeMap>, + index: BTreeMap>, /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. @@ -89,6 +90,10 @@ pub struct InMemoryLayerInner { resource_units: GlobalResourceUnits, } +pub(crate) struct InMemoryLayerIndexValue { + pub(crate) pos: u32, + pub(crate) len: u32, +} impl std::fmt::Debug for InMemoryLayerInner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -230,7 +235,7 @@ impl InMemoryLayer { } } - pub(crate) fn try_len(&self) -> Option { + pub(crate) fn try_len(&self) -> Option { self.inner.try_read().map(|i| i.file.len()).ok() } @@ -249,9 +254,7 @@ impl InMemoryLayer { /// debugging function to print out the contents of the layer /// /// this is likely completly unused - pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { - let inner = self.inner.read().await; - + pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { let end_str = self.end_lsn_or_max(); println!( @@ -259,39 +262,6 @@ impl InMemoryLayer { self.timeline_id, self.start_lsn, end_str, ); - if !verbose { - return Ok(()); - } - - let cursor = inner.file.block_cursor(); - let mut buf = Vec::new(); - for (key, vec_map) in inner.index.iter() { - for (lsn, pos) in vec_map.as_slice() { - let mut desc = String::new(); - cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; - let val = Value::des(&buf); - match val { - Ok(Value::Image(img)) => { - write!(&mut desc, " img {} bytes", img.len())?; - } - Ok(Value::WalRecord(rec)) => { - let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); - write!( - &mut desc, - " rec {} bytes will_init: {} {}", - buf.len(), - rec.will_init(), - wal_desc - )?; - } - Err(err) => { - write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?; - } - } - println!(" key {} at {}: {}", key, lsn, desc); - } - } - Ok(()) } @@ -311,7 +281,6 @@ impl InMemoryLayer { .build(); let inner = self.inner.read().await; - let reader = inner.file.block_cursor(); for range in keyspace.ranges.iter() { for (key, vec_map) in inner @@ -326,15 +295,53 @@ impl InMemoryLayer { let slice = vec_map.slice_range(lsn_range); - for (entry_lsn, pos) in slice.iter().rev() { - // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183 - let buf = reader.read_blob(*pos, &ctx).await; - if let Err(e) = buf { - reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e))); - break; + 'foreach_value: for (entry_lsn, value) in slice.iter().rev() { + let InMemoryLayerIndexValue { pos, len } = value; + + // TODO: coalesce multiple reads that hit the same page into one page read + // Yuchen is working on a VectoredReadPlanner change to support this. + // In the meantime, we prepare the way for direct IO by doing full page reads. + let len = usize::try_from(*len).unwrap(); + let mut value_buf = Vec::with_capacity(len); + let mut page_buf_storage = Some(PageBuf::from(Box::new([0u8; PAGE_SZ]))); + let mut page_no = *pos / (PAGE_SZ as u32); + let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap(); + while value_buf.len() < len { + let read_result = match inner + .file + .read_page( + page_no, + page_buf_storage + .take() + .expect("we put it back each iteration"), + &ctx, + ) + .await + { + Ok(page) => page, + Err(e) => { + reconstruct_state + .on_key_error(key, PageReconstructError::from(anyhow!(e))); + break 'foreach_value; + } + }; + { + let page_contents = read_result.contents(); + let remaining_in_page = std::cmp::min( + len - value_buf.len(), + page_contents.len() - offset_in_page, + ); + value_buf.extend_from_slice( + &page_contents[offset_in_page..offset_in_page + remaining_in_page], + ); + } + offset_in_page = 0; + page_no += 1; + page_buf_storage = Some(read_result.into_page_buf()); } + assert!(value_buf.len() == len); - let value = Value::des(&buf.unwrap()); + let value = Value::des(&value_buf); if let Err(e) = value { reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e))); break; @@ -380,7 +387,7 @@ impl InMemoryLayer { /// Get layer size. pub async fn size(&self) -> Result { let inner = self.inner.read().await; - Ok(inner.file.len()) + Ok(inner.file.len() as u64) } /// Create a new, empty, in-memory layer @@ -441,27 +448,25 @@ impl InMemoryLayer { ) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); - let off = { - locked_inner - .file - .write_blob( - buf, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::InMemoryLayer) - .build(), - ) - .await? - }; + let entry = locked_inner + .file + .write_blob( + buf, + &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::InMemoryLayer) + .build(), + ) + .await?; let vec_map = locked_inner.index.entry(key).or_default(); - let old = vec_map.append_or_update_last(lsn, off).unwrap().0; + let old = vec_map.append_or_update_last(lsn, entry).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. warn!("Key {} at {} already exists", key, lsn); } let size = locked_inner.file.len(); - locked_inner.resource_units.maybe_publish_size(size); + locked_inner.resource_units.maybe_publish_size(size as u64); Ok(()) } @@ -473,7 +478,7 @@ impl InMemoryLayer { pub(crate) async fn tick(&self) -> Option { let mut inner = self.inner.write().await; let size = inner.file.len(); - inner.resource_units.publish_size(size) + inner.resource_units.publish_size(size as u64) } pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range, Lsn)]) -> Result<()> { @@ -536,7 +541,6 @@ impl InMemoryLayer { use l0_flush::Inner; let _concurrency_permit = match l0_flush_global_state { - Inner::PageCached => None, Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await), }; @@ -568,34 +572,6 @@ impl InMemoryLayer { .await?; match l0_flush_global_state { - l0_flush::Inner::PageCached => { - let ctx = RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::InMemoryLayer) - .build(); - - let mut buf = Vec::new(); - - let cursor = inner.file.block_cursor(); - - for (key, vec_map) in inner.index.iter() { - // Write all page versions - for (lsn, pos) in vec_map.as_slice() { - cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; - let will_init = Value::des(&buf)?.will_init(); - let (tmp, res) = delta_layer_writer - .put_value_bytes( - Key::from_compact(*key), - *lsn, - buf.slice_len(), - will_init, - &ctx, - ) - .await; - res?; - buf = tmp.into_raw_slice().into_inner(); - } - } - } l0_flush::Inner::Direct { .. } => { let file_contents: Vec = inner.file.load_to_vec(ctx).await?; assert_eq!( @@ -612,22 +588,15 @@ impl InMemoryLayer { } }); - let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents)); - - let mut buf = Vec::new(); + let file_contents = Bytes::from(file_contents); for (key, vec_map) in inner.index.iter() { // Write all page versions - for (lsn, pos) in vec_map.as_slice() { - // TODO: once we have blob lengths in the in-memory index, we can - // 1. get rid of the blob_io / BlockReaderRef::Slice business and - // 2. load the file contents into a Bytes and - // 3. the use `Bytes::slice` to get the `buf` that is our blob - // 4. pass that `buf` into `put_value_bytes` - // => https://github.com/neondatabase/neon/issues/8183 - cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; + for (lsn, entry) in vec_map.as_slice() { + let InMemoryLayerIndexValue { pos, len } = entry; + let buf = file_contents.slice(*pos as usize..(*pos + *len) as usize); let will_init = Value::des(&buf)?.will_init(); - let (tmp, res) = delta_layer_writer + let (_buf, res) = delta_layer_writer .put_value_bytes( Key::from_compact(*key), *lsn, @@ -637,7 +606,6 @@ impl InMemoryLayer { ) .await; res?; - buf = tmp.into_raw_slice().into_inner(); } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index abe3f56e45895..acbb9cb7c5101 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1437,6 +1437,7 @@ impl Timeline { tracing::warn!("Lock conflict while reading size of open layer"); return; }; + let current_size = current_size as u64; let current_lsn = self.get_last_record_lsn();