Skip to content

Commit

Permalink
chore(pageserver): plumb through RequestContext to VirtualFile open m…
Browse files Browse the repository at this point in the history
…ethods

This PR introduces no functional changes.

The `open()` path will be done separately.

refs #6107
refs #7386
  • Loading branch information
problame committed May 13, 2024
1 parent b58a615 commit 2cf8642
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pageserver/ctl/src/layer_map_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {

// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &RequestContext) -> Result<Vec<Hole>> {
let file = VirtualFile::open(path).await?;
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
Expand Down
2 changes: 1 addition & 1 deletion pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let file = VirtualFile::open(path).await?;
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ impl<'a> DatadirModification<'a> {
}

if !self.pending_deletions.is_empty() {
writer.delete_batch(&self.pending_deletions).await?;
writer.delete_batch(&self.pending_deletions, ctx).await?;
self.pending_deletions.clear();
}

Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path()).await?;
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
Expand All @@ -314,7 +314,7 @@ mod tests {
wtr.flush_buffer(&ctx).await?;
}

let file = VirtualFile::open(pathbuf.as_path()).await?;
let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
Expand Down
4 changes: 3 additions & 1 deletion pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl EphemeralFile {
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<EphemeralFile, io::Error> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
Expand All @@ -45,6 +46,7 @@ impl EphemeralFile {
.read(true)
.write(true)
.create(true),
ctx,
)
.await?;

Expand Down Expand Up @@ -153,7 +155,7 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;

let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;

let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(
Expand Down
21 changes: 12 additions & 9 deletions pageserver/src/tenant/remote_timeline_client/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,17 @@ pub async fn download_layer_file<'a>(
// We use fatal_err() below because the after the rename above,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let work = async move {
let timeline_dir = VirtualFile::open(&timeline_path)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
let work = {
let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
async move {
let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
}
};
crate::virtual_file::io_engine::get()
.spawn_blocking_and_block_on_if_std(work)
Expand Down Expand Up @@ -196,7 +199,7 @@ async fn download_object<'a>(
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
use bytes::BytesMut;
async {
let destination_file = VirtualFile::create(dst_path)
let destination_file = VirtualFile::create(dst_path, ctx)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
Expand Down
10 changes: 8 additions & 2 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ impl DeltaLayerWriterInner {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will
Expand All @@ -404,7 +405,7 @@ impl DeltaLayerWriterInner {
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);

let mut file = VirtualFile::create(&path).await?;
let mut file = VirtualFile::create(&path, ctx).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
Expand Down Expand Up @@ -586,6 +587,7 @@ impl DeltaLayerWriter {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
inner: Some(
Expand All @@ -595,6 +597,7 @@ impl DeltaLayerWriter {
tenant_shard_id,
key_start,
lsn_range,
ctx,
)
.await?,
),
Expand Down Expand Up @@ -701,6 +704,7 @@ impl DeltaLayer {
let mut file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
Expand Down Expand Up @@ -734,7 +738,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
Expand Down Expand Up @@ -1792,6 +1796,7 @@ mod test {
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&ctx,
)
.await?;

Expand Down Expand Up @@ -1979,6 +1984,7 @@ mod test {
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
ctx,
)
.await
.unwrap();
Expand Down
8 changes: 6 additions & 2 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl ImageLayer {
let mut file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
Expand Down Expand Up @@ -377,7 +378,7 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
Expand Down Expand Up @@ -632,6 +633,7 @@ impl ImageLayerWriterInner {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
// We'll atomically rename it to the final name when we're done.
Expand All @@ -651,6 +653,7 @@ impl ImageLayerWriterInner {
virtual_file::OpenOptions::new()
.write(true)
.create_new(true),
ctx,
)
.await?
};
Expand Down Expand Up @@ -805,10 +808,11 @@ impl ImageLayerWriter {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
.await?,
),
})
Expand Down
4 changes: 3 additions & 1 deletion pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,11 @@ impl InMemoryLayer {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_lsn: Lsn,
ctx: &RequestContext,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");

let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());

Ok(InMemoryLayer {
Expand Down Expand Up @@ -642,6 +643,7 @@ impl InMemoryLayer {
self.tenant_shard_id,
Key::MIN,
self.start_lsn..end_lsn,
ctx,
)
.await?;

Expand Down
35 changes: 24 additions & 11 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3560,7 +3560,11 @@ impl Timeline {
///
/// Get a handle to the latest layer for appending.
///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
async fn get_layer_for_write(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard
.get_layer_for_write(
Expand All @@ -3569,6 +3573,7 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
ctx,
)
.await?;
Ok(layer)
Expand Down Expand Up @@ -3833,8 +3838,8 @@ impl Timeline {
);
self.create_delta_layer(
&frozen_layer,
ctx,
Some(metadata_keyspace.0.ranges[0].clone()),
ctx,
)
.await?
} else {
Expand Down Expand Up @@ -3863,7 +3868,7 @@ impl Timeline {
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
// We will remove frozen layer and add delta layer in one atomic operation later.
let Some(layer) = self.create_delta_layer(&frozen_layer, ctx, None).await? else {
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
panic!("delta layer cannot be empty if no filter is applied");
};
(
Expand Down Expand Up @@ -3992,8 +3997,8 @@ impl Timeline {
async fn create_delta_layer(
self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>,
ctx: &RequestContext,
key_range: Option<Range<Key>>,
ctx: &RequestContext,
) -> anyhow::Result<Option<ResidentLayer>> {
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
Expand All @@ -4016,6 +4021,7 @@ impl Timeline {
&self_clone
.conf
.timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id),
&ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
Expand Down Expand Up @@ -4209,6 +4215,7 @@ impl Timeline {
self.tenant_shard_id,
&img_range,
lsn,
ctx,
)
.await?;

Expand Down Expand Up @@ -4313,6 +4320,7 @@ impl Timeline {
&self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
Expand Down Expand Up @@ -5214,7 +5222,7 @@ impl<'a> TimelineWriter<'a> {
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");

let action = self.get_open_layer_action(lsn, buf_size);
let layer = self.handle_open_layer_action(lsn, action).await?;
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
let res = layer.put_value(key, lsn, &buf, ctx).await;

if res.is_ok() {
Expand All @@ -5237,14 +5245,15 @@ impl<'a> TimelineWriter<'a> {
&mut self,
at: Lsn,
action: OpenLayerAction,
ctx: &RequestContext,
) -> anyhow::Result<&Arc<InMemoryLayer>> {
match action {
OpenLayerAction::Roll => {
let freeze_at = self.write_guard.as_ref().unwrap().max_lsn.unwrap();
self.roll_layer(freeze_at).await?;
self.open_layer(at).await?;
self.open_layer(at, ctx).await?;
}
OpenLayerAction::Open => self.open_layer(at).await?,
OpenLayerAction::Open => self.open_layer(at, ctx).await?,
OpenLayerAction::None => {
assert!(self.write_guard.is_some());
}
Expand All @@ -5253,8 +5262,8 @@ impl<'a> TimelineWriter<'a> {
Ok(&self.write_guard.as_ref().unwrap().open_layer)
}

async fn open_layer(&mut self, at: Lsn) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at).await?;
async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at, ctx).await?;
let initial_size = layer.size().await?;

let last_freeze_at = self.last_freeze_at.load();
Expand Down Expand Up @@ -5331,10 +5340,14 @@ impl<'a> TimelineWriter<'a> {
Ok(())
}

pub(crate) async fn delete_batch(&mut self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
pub(crate) async fn delete_batch(
&mut self,
batch: &[(Range<Key>, Lsn)],
ctx: &RequestContext,
) -> anyhow::Result<()> {
if let Some((_, lsn)) = batch.first() {
let action = self.get_open_layer_action(*lsn, 0);
let layer = self.handle_open_layer_action(*lsn, action).await?;
let layer = self.handle_open_layer_action(*lsn, action, ctx).await?;
layer.put_tombstones(batch).await?;
}

Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ impl Timeline {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
ctx,
)
.await?,
);
Expand Down Expand Up @@ -755,6 +756,7 @@ impl Timeline {
&self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
Expand Down Expand Up @@ -1093,6 +1095,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
ctx,
)
.await?;

Expand Down Expand Up @@ -1167,6 +1170,7 @@ impl TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range,
lsn,
ctx,
)
.await?;

Expand Down
Loading

0 comments on commit 2cf8642

Please sign in to comment.