Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: use direct IO for delta and image layer reads #9326

Merged
merged 32 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a85bd88
pageserver: add direct io config to virtual file
yliang412 Oct 1, 2024
95554c7
fix clippy
yliang412 Oct 1, 2024
3a5b44e
add set_io_mode option to getpage_latest_lsn
yliang412 Oct 1, 2024
97f7b0b
simplify virtual file wrapper
yliang412 Oct 1, 2024
5c76b2d
fix put_io_mode to use the correct http endpoint
yliang412 Oct 1, 2024
bc13310
Merge branch 'main' into yuchen/virtual-file-config
yliang412 Oct 7, 2024
a04cfd7
get rid of io_buffer_alignment config (always 512)
yliang412 Oct 7, 2024
f1418ca
Merge branch 'main' into yuchen/virtual-file-config
yliang412 Oct 7, 2024
4e13094
review: clone open_options instead of taking mut
yliang412 Oct 8, 2024
6d03e28
review: use a inner and mode member for VirtualFile
yliang412 Oct 8, 2024
12a4e33
Merge branch 'main' into yuchen/virtual-file-config
yliang412 Oct 8, 2024
ee46000
pageserver: implement aligned io buffer
yliang412 Oct 7, 2024
f28dd95
use aligned buffer for image and delta layers
yliang412 Oct 7, 2024
e9d9663
use aligned buffer for page cache
yliang412 Oct 7, 2024
84e2242
use aligned buffer for inmemory layer
yliang412 Oct 7, 2024
1c61b68
use aligned buffer marker trait
yliang412 Oct 8, 2024
b418c34
Merge branch 'main' into yuchen/direct-io-for-read
yliang412 Oct 9, 2024
62722e8
fix clippy
yliang412 Oct 9, 2024
8f9679c
refactor aligned buffer
yliang412 Oct 9, 2024
84b0902
fix clippy
yliang412 Oct 9, 2024
e377177
use IoBuffer instead of Bytes for inmemory_layer put_bytes
yliang412 Oct 9, 2024
929c8d4
use io mode from config
yliang412 Oct 9, 2024
1360069
Merge branch 'main' into yuchen/direct-io-for-read
yliang412 Oct 9, 2024
156237d
add more comments
yliang412 Oct 11, 2024
cb51ddc
Merge branch 'main' into yuchen/direct-io-for-read
yliang412 Oct 14, 2024
4b4a3ed
review: remove allow(unused)
yliang412 Oct 18, 2024
3b88998
review: clarify IoBuf safety comments
yliang412 Oct 18, 2024
d99a61b
review: remove outdated todo
yliang412 Oct 18, 2024
ad44e11
follow bytes::Bytes convention for AlignedBuffer
yliang412 Oct 18, 2024
e92d0fa
Merge branch 'main' into yuchen/direct-io-for-read
yliang412 Oct 18, 2024
a5a86be
fix clippy
yliang412 Oct 18, 2024
656ddbc
Merge branch 'main' into yuchen/direct-io-for-read
yliang412 Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pageserver/benches/bench_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ fn criterion_benchmark(c: &mut Criterion) {
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(16384, virtual_file::io_engine_for_bench());
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
conf.virtual_file_io_mode,
);
page_cache::init(conf.page_cache_size);

{
Expand Down
7 changes: 6 additions & 1 deletion pageserver/ctl/src/layer_map_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::virtual_file::api::IoMode;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::Range;
Expand Down Expand Up @@ -152,7 +153,11 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
IoMode::preferred(),
);
pageserver::page_cache::init(100);

let mut total_delta_layers = 0usize;
Expand Down
13 changes: 11 additions & 2 deletions pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::virtual_file::api::IoMode;
use pageserver::{page_cache, virtual_file};
use pageserver::{
repository::{Key, KEY_SIZE},
Expand Down Expand Up @@ -59,7 +60,11 @@ pub(crate) enum LayerCmd {

async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
IoMode::preferred(),
);
page_cache::init(100);
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
Expand Down Expand Up @@ -190,7 +195,11 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
IoMode::preferred(),
);
pageserver::page_cache::init(100);

let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
Expand Down
8 changes: 6 additions & 2 deletions pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use pageserver::{
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
virtual_file::{self, api::IoMode},
};
use pageserver_api::shard::TenantShardId;
use postgres_ffi::ControlFileData;
Expand Down Expand Up @@ -205,7 +205,11 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {

async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
virtual_file::init(
10,
virtual_file::api::IoEngineKind::StdFs,
IoMode::preferred(),
);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await
Expand Down
6 changes: 5 additions & 1 deletion pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ fn main() -> anyhow::Result<()> {
let scenario = failpoint_support::init();

// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
virtual_file::init(
conf.max_file_descriptors,
conf.virtual_file_io_engine,
conf.virtual_file_io_mode,
);
page_cache::init(conf.page_cache_size);

start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
Expand Down
16 changes: 9 additions & 7 deletions pageserver/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use once_cell::sync::OnceCell;
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
virtual_file::{IoBufferMut, IoPageSlice},
};

static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
Expand Down Expand Up @@ -144,7 +145,7 @@ struct SlotInner {
key: Option<CacheKey>,
// for `coalesce_readers_permit`
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
buf: &'static mut [u8; PAGE_SZ],
buf: IoPageSlice<'static>,
}

impl Slot {
Expand Down Expand Up @@ -234,13 +235,13 @@ impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];

fn deref(&self) -> &Self::Target {
self.slot_guard.buf
self.slot_guard.buf.deref()
}
}

impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.slot_guard.buf
self.slot_guard.buf.as_ref()
}
}

Expand All @@ -266,7 +267,7 @@ enum PageWriteGuardState<'i> {
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(),
PageWriteGuardState::Downgraded => unreachable!(),
}
}
Expand All @@ -277,7 +278,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {

fn deref(&self) -> &Self::Target {
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(),
PageWriteGuardState::Downgraded => unreachable!(),
}
}
Expand Down Expand Up @@ -643,7 +644,7 @@ impl PageCache {
// We could use Vec::leak here, but that potentially also leaks
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
// this is avoided.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();

let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
Expand All @@ -652,7 +653,8 @@ impl PageCache {
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
.map(|chunk| {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
// SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };

Slot {
inner: tokio::sync::RwLock::new(SlotInner {
Expand Down
6 changes: 4 additions & 2 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
#[cfg(test)]
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::Deref;
Expand Down Expand Up @@ -40,7 +42,7 @@ pub enum BlockLease<'a> {
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
#[cfg(test)]
Vec(Vec<u8>),
IoBufferMut(IoBufferMut),
}

impl From<PageReadGuard<'static>> for BlockLease<'static> {
Expand All @@ -67,7 +69,7 @@ impl Deref for BlockLease<'_> {
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
#[cfg(test)]
BlockLease::Vec(v) => {
BlockLease::IoBufferMut(v) => {
TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
}
}
Expand Down
28 changes: 16 additions & 12 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use num_traits::Num;
Expand Down Expand Up @@ -107,15 +108,18 @@ impl EphemeralFile {
self.page_cache_file_id
}

pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
pub(crate) async fn load_to_io_buf(
&self,
ctx: &RequestContext,
) -> Result<IoBufferMut, io::Error> {
let size = self.len().into_usize();
let vec = Vec::with_capacity(size);
let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?;
let buf = IoBufferMut::with_capacity(size);
let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
assert_eq!(nread, size);
let vec = slice.into_inner();
assert_eq!(vec.len(), nread);
assert_eq!(vec.capacity(), size, "we shouldn't be reallocating");
Ok(vec)
let buf = slice.into_inner();
assert_eq!(buf.len(), nread);
assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
Ok(buf)
}

/// Returns the offset at which the first byte of the input was written, for use
Expand Down Expand Up @@ -158,7 +162,7 @@ impl EphemeralFile {
}

impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
&'b self,
start: u64,
dst: tokio_epoll_uring::Slice<B>,
Expand Down Expand Up @@ -345,7 +349,7 @@ mod tests {
assert!(file.len() as usize == write_nbytes);
for i in 0..write_nbytes {
assert_eq!(value_offsets[i], i.into_u64());
let buf = Vec::with_capacity(1);
let buf = IoBufferMut::with_capacity(1);
let (buf_slice, nread) = file
.read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
.await
Expand Down Expand Up @@ -385,7 +389,7 @@ mod tests {

// assert the state is as this test expects it to be
assert_eq!(
&file.load_to_vec(&ctx).await.unwrap(),
&file.load_to_io_buf(&ctx).await.unwrap(),
&content[0..cap + cap / 2]
);
let md = file
Expand Down Expand Up @@ -440,7 +444,7 @@ mod tests {
let (buf, nread) = file
.read_exact_at_eof_ok(
start.into_u64(),
Vec::with_capacity(len).slice_full(),
IoBufferMut::with_capacity(len).slice_full(),
ctx,
)
.await
Expand Down
15 changes: 7 additions & 8 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -1002,7 +1002,7 @@ impl DeltaLayerInner {
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let mut buf = Some(IoBufferMut::with_capacity(buf_size));

// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
Expand All @@ -1029,7 +1029,7 @@ impl DeltaLayerInner {

// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
buf = Some(IoBufferMut::with_capacity(buf_size));

continue;
}
Expand Down Expand Up @@ -1203,7 +1203,7 @@ impl DeltaLayerInner {
.map(|x| x.0.get())
.unwrap_or(8192);

let mut buffer = Some(BytesMut::with_capacity(max_read_size));
let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));

// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
Expand Down Expand Up @@ -1561,12 +1561,11 @@ impl<'a> DeltaLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;
Expand Down Expand Up @@ -1941,7 +1940,7 @@ pub(crate) mod test {
&vectored_reads,
constants::MAX_VECTORED_READ_BYTES,
);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let mut buf = Some(IoBufferMut::with_capacity(buf_size));

for read in vectored_reads {
let blobs_buf = vectored_blob_reader
Expand Down
19 changes: 9 additions & 10 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ use crate::tenant::vectored_blob_io::{
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
Expand Down Expand Up @@ -547,10 +548,10 @@ impl ImageLayerInner {
for read in plan.into_iter() {
let buf_size = read.size();

let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);

let view = BufView::new_slice(&blobs_buf.buf);

for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
Expand Down Expand Up @@ -609,13 +610,12 @@ impl ImageLayerInner {
}
}

let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;

match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await;

Expand Down Expand Up @@ -1050,12 +1050,11 @@ impl<'a> ImageLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
next_batch.push_back((
Expand Down
Loading
Loading