Skip to content

Commit

Permalink
chore(pageserver): plumb through RequestContext to VirtualFile write …
Browse files Browse the repository at this point in the history
…methods (#7566)

This PR introduces no functional changes.

The read path will be done separately.

refs #6107
refs #7386
  • Loading branch information
problame authored May 2, 2024
1 parent 4b55dad commit 45ec868
Show file tree
Hide file tree
Showing 18 changed files with 246 additions and 105 deletions.
2 changes: 2 additions & 0 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ pub enum TaskKind {

EphemeralFilePreWarmPageCache,

LayerDownload,

#[cfg(test)]
UnitTest,
}
Expand Down
31 changes: 17 additions & 14 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf).await;
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
Err(e) => return (src_buf, Err(e)),
Expand All @@ -142,9 +143,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {

#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self) -> Result<(), Error> {
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf);
let (mut buf, res) = self.inner.write_all(buf).await;
let (mut buf, res) = self.inner.write_all(buf, ctx).await;
res?;
buf.clear();
self.buf = buf;
Expand All @@ -165,10 +166,11 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<(), Error>) {
if !BUFFERED {
assert!(self.buf.is_empty());
return self.write_all_unbuffered(src_buf).await;
return self.write_all_unbuffered(src_buf, ctx).await;
}
let remaining = Self::CAPACITY - self.buf.len();
let src_buf_len = src_buf.bytes_init();
Expand All @@ -183,7 +185,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}
// Then, if the buffer is full, flush it out
if self.buf.len() == Self::CAPACITY {
if let Err(e) = self.flush_buffer().await {
if let Err(e) = self.flush_buffer(ctx).await {
return (Slice::into_inner(src_buf), Err(e));
}
}
Expand All @@ -199,7 +201,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
assert_eq!(copied, src_buf.len());
Slice::into_inner(src_buf)
} else {
let (src_buf, res) = self.write_all_unbuffered(src_buf).await;
let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
if let Err(e) = res {
return (src_buf, Err(e));
}
Expand All @@ -216,6 +218,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;

Expand All @@ -227,7 +230,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf).await
self.write_all(io_buf, ctx).await
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
Expand All @@ -242,7 +245,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf).await
self.write_all(io_buf, ctx).await
}
}
.await;
Expand All @@ -251,7 +254,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
}
let (srcbuf, res) = self.write_all(srcbuf).await;
let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
(srcbuf, res.map(|_| offset))
}
}
Expand All @@ -261,8 +264,8 @@ impl BlobWriter<true> {
///
/// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`.
pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
self.flush_buffer().await?;
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
self.flush_buffer(ctx).await?;
Ok(self.inner)
}

Expand Down Expand Up @@ -299,16 +302,16 @@ mod tests {
let file = VirtualFile::create(pathbuf.as_path()).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = wtr.write_blob(blob.clone()).await;
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
let offs = res?;
offsets.push(offs);
}
// Write out one page worth of zeros so that we can
// read again with read_blk
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ]).await;
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
wtr.flush_buffer().await?;
wtr.flush_buffer(&ctx).await?;
}

let file = VirtualFile::open(pathbuf.as_path()).await?;
Expand Down
8 changes: 4 additions & 4 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl EphemeralFile {
pub(crate) async fn write_blob(
&mut self,
srcbuf: &[u8],
_ctx: &RequestContext,
ctx: &RequestContext,
) -> Result<u64, io::Error> {
let pos = self.rw.bytes_written();

Expand All @@ -83,15 +83,15 @@ impl EphemeralFile {
// short one-byte length header
let len_buf = [srcbuf.len() as u8];

self.rw.write_all_borrowed(&len_buf).await?;
self.rw.write_all_borrowed(&len_buf, ctx).await?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
self.rw.write_all_borrowed(&len_buf).await?;
self.rw.write_all_borrowed(&len_buf, ctx).await?;
}

// Write the payload
self.rw.write_all_borrowed(srcbuf).await?;
self.rw.write_all_borrowed(srcbuf, ctx).await?;

Ok(pos)
}
Expand Down
11 changes: 8 additions & 3 deletions pageserver/src/tenant/ephemeral_file/page_caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ impl RW {
self.page_cache_file_id
}

pub(crate) async fn write_all_borrowed(&mut self, srcbuf: &[u8]) -> Result<usize, io::Error> {
pub(crate) async fn write_all_borrowed(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<usize, io::Error> {
// It doesn't make sense to proactively fill the page cache on the Pageserver write path
// because Compute is unlikely to access recently written data.
self.rw.write_all_borrowed(srcbuf).await
self.rw.write_all_borrowed(srcbuf, ctx).await
}

pub(crate) fn bytes_written(&self) -> u64 {
Expand Down Expand Up @@ -134,6 +138,7 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
>(
&mut self,
buf: B,
ctx: &RequestContext,
) -> std::io::Result<(usize, B::Buf)> {
let buf = buf.slice(..);
let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done
Expand All @@ -150,7 +155,7 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi
);

// Do the IO.
let iobuf = match self.file.write_all(buf).await {
let iobuf = match self.file.write_all(buf, ctx).await {
(iobuf, Ok(nwritten)) => {
assert_eq!(nwritten, buflen);
iobuf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
mod zero_padded;

use crate::{
context::RequestContext,
page_cache::PAGE_SZ,
virtual_file::owned_buffers_io::{
self,
Expand Down Expand Up @@ -60,8 +61,12 @@ where
self.buffered_writer.as_inner().as_inner()
}

pub async fn write_all_borrowed(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buffered_writer.write_buffered_borrowed(buf).await
pub async fn write_all_borrowed(
&mut self,
buf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<usize> {
self.buffered_writer.write_buffered_borrowed(buf, ctx).await
}

pub fn bytes_written(&self) -> u64 {
Expand Down
3 changes: 3 additions & 0 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ use tracing::{debug, error, info, instrument, warn};
use tracing::{info_span, Instrument};
use utils::lsn::Lsn;

use crate::context::RequestContext;
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
use crate::metrics::{
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
Expand Down Expand Up @@ -505,6 +506,7 @@ impl RemoteTimelineClient {
layer_file_name: &LayerFileName,
layer_metadata: &LayerFileMetadata,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<u64> {
let downloaded_size = {
let _unfinished_gauge_guard = self.metrics.call_begin(
Expand All @@ -522,6 +524,7 @@ impl RemoteTimelineClient {
layer_file_name,
layer_metadata,
cancel,
ctx,
)
.measure_remote_op(
RemoteOpFileKind::Layer,
Expand Down
10 changes: 7 additions & 3 deletions pageserver/src/tenant/remote_timeline_client/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tracing::warn;
use utils::backoff;

use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerFileName;
Expand All @@ -40,6 +41,7 @@ use super::{
/// in the metadata. (In the future, we might do more cross-checks, like CRC validation)
///
/// Returns the size of the downloaded file.
#[allow(clippy::too_many_arguments)]
pub async fn download_layer_file<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
Expand All @@ -48,6 +50,7 @@ pub async fn download_layer_file<'a>(
layer_file_name: &'a LayerFileName,
layer_metadata: &'a LayerFileMetadata,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();

Expand Down Expand Up @@ -75,7 +78,7 @@ pub async fn download_layer_file<'a>(
let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);

let bytes_amount = download_retry(
|| async { download_object(storage, &remote_path, &temp_file_path, cancel).await },
|| async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
&format!("download {remote_path:?}"),
cancel,
)
Expand Down Expand Up @@ -133,6 +136,7 @@ async fn download_object<'a>(
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<u64, DownloadError> {
let res = match crate::virtual_file::io_engine::get() {
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
Expand Down Expand Up @@ -208,10 +212,10 @@ async fn download_object<'a>(
Err(e) => return Err(e),
};
buffered
.write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk))
.write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk), ctx)
.await?;
}
let size_tracking = buffered.flush_and_into_inner().await?;
let size_tracking = buffered.flush_and_into_inner(ctx).await?;
Ok(size_tracking.into_inner())
}
.await?;
Expand Down
8 changes: 7 additions & 1 deletion pageserver/src/tenant/secondary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{sync::Arc, time::SystemTime};

use crate::{
config::PageServerConf,
context::RequestContext,
disk_usage_eviction_task::DiskUsageEvictionInfo,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
virtual_file::MaybeFatalIo,
Expand Down Expand Up @@ -316,9 +317,13 @@ pub fn spawn_tasks(
let (upload_req_tx, upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);

let downloader_task_ctx = RequestContext::new(
TaskKind::SecondaryDownloads,
crate::context::DownloadBehavior::Download,
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
downloader_task_ctx.task_kind(),
None,
None,
"secondary tenant downloads",
Expand All @@ -330,6 +335,7 @@ pub fn spawn_tasks(
download_req_rx,
bg_jobs_clone,
cancel_clone,
downloader_task_ctx,
)
.await;

Expand Down
Loading

1 comment on commit 45ec868

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2943 tests run: 2807 passed, 2 failed, 134 skipped (full report)


Failures on Postgres 14

  • test_heavy_write_workload[neon_off-github-actions-selfhosted-10-5-5]: release
  • test_pgbench_intensive_init_workload[neon_off-github-actions-selfhosted-1000]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_heavy_write_workload[neon_off-release-pg14-github-actions-selfhosted-10-5-5] or test_pgbench_intensive_init_workload[neon_off-release-pg14-github-actions-selfhosted-1000]"
Flaky tests (1)

Postgres 14

  • test_timeline_size_quota_on_startup: release

Code coverage* (full report)

  • functions: 28.1% (6604 of 23543 functions)
  • lines: 46.8% (46982 of 100480 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
45ec868 at 2024-05-02T18:25:32.129Z :recycle:

Please sign in to comment.