Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Sep 20, 2023
1 parent d51a1b4 commit 2ae2d21
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 66 deletions.
8 changes: 4 additions & 4 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
{
pg_control = Some(control_file);
}
modification.flush().await?;
modification.flush(ctx).await?;
}
}

// We're done importing all the data files.
modification.commit().await?;
modification.commit(ctx).await?;

// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
Expand Down Expand Up @@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush().await?;
modification.flush(ctx).await?;
}
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
Expand All @@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;

modification.commit().await?;
modification.commit(ctx).await?;
Ok(())
}

Expand Down
18 changes: 10 additions & 8 deletions pageserver/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use utils::{
lsn::Lsn,
};

use crate::{metrics::PageCacheSizeMetrics, repository::Key};
use crate::{metrics::PageCacheSizeMetrics, repository::Key, context::RequestContext};

static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
Expand Down Expand Up @@ -346,9 +346,10 @@ impl PageCache {
timeline_id: TimelineId,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
crate::metrics::PAGE_CACHE
.for_task_kind(todo!())
.for_task_kind(ctx)
.read_accesses_materialized_page
.inc();

Expand All @@ -369,12 +370,12 @@ impl PageCache {
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.for_task_kind(todo!())
.for_task_kind(ctx)
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.for_task_kind(todo!())
.for_task_kind(ctx)
.read_hits_materialized_page_older_lsn
.inc();
}
Expand Down Expand Up @@ -429,10 +430,11 @@ impl PageCache {
&self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };

self.lock_for_read(&mut cache_key).await
self.lock_for_read(&mut cache_key, ctx).await
}

//
Expand Down Expand Up @@ -500,17 +502,17 @@ impl PageCache {
/// }
/// ```
///
async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
async fn lock_for_read(&self, cache_key: &mut CacheKey, ctx: &RequestContext) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE
.for_task_kind(todo!())
.for_task_kind(ctx.task_kind())
.read_accesses_immutable,
&crate::metrics::PAGE_CACHE
.for_task_kind(todo!())
.for_task_kind(ctx.task_kind())
.read_hits_immutable,
),
};
Expand Down
8 changes: 4 additions & 4 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub async fn flush(&mut self) -> anyhow::Result<()> {
pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
Expand All @@ -1154,7 +1154,7 @@ impl<'a> DatadirModification<'a> {
if is_rel_block_key(key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, self.lsn, &value).await?;
writer.put(key, self.lsn, &value, ctx).await?;
} else {
retained_pending_updates.insert(key, value);
}
Expand All @@ -1174,14 +1174,14 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub async fn commit(&mut self) -> anyhow::Result<()> {
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;

for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value).await?;
writer.put(key, lsn, &value, ctx).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn).await?;
Expand Down
24 changes: 12 additions & 12 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,7 @@ impl Tenant {
.init_empty_test_timeline()
.context("init_empty_test_timeline")?;
modification
.commit()
.commit(&ctx)
.await
.context("commit init_empty_test_timeline modification")?;

Expand Down Expand Up @@ -3545,7 +3545,7 @@ mod tests {

let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")), &ctx)
.await?;
writer.finish_write(Lsn(0x20));
drop(writer);
Expand Down Expand Up @@ -3619,19 +3619,19 @@ mod tests {

// Insert a value on the timeline
writer
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
.await?;
writer
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
.await?;
writer.finish_write(Lsn(0x20));

writer
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
.await?;
writer.finish_write(Lsn(0x30));
writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
.await?;
writer.finish_write(Lsn(0x40));

Expand All @@ -3646,7 +3646,7 @@ mod tests {
.expect("Should have a local timeline");
let new_writer = newtline.writer().await;
new_writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
.await?;
new_writer.finish_write(Lsn(0x40));

Expand Down Expand Up @@ -4087,7 +4087,7 @@ mod tests {

let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")), &ctx)
.await?;
writer.finish_write(Lsn(0x10));
drop(writer);
Expand All @@ -4097,7 +4097,7 @@ mod tests {

let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")), &ctx)
.await?;
writer.finish_write(Lsn(0x20));
drop(writer);
Expand All @@ -4107,7 +4107,7 @@ mod tests {

let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")), &ctx)
.await?;
writer.finish_write(Lsn(0x30));
drop(writer);
Expand All @@ -4117,7 +4117,7 @@ mod tests {

let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")), &ctx)
.await?;
writer.finish_write(Lsn(0x40));
drop(writer);
Expand Down Expand Up @@ -4474,7 +4474,7 @@ mod tests {
.init_empty_test_timeline()
.context("init_empty_test_timeline")?;
modification
.commit()
.commit(&ctx)
.await
.context("commit init_empty_test_timeline modification")?;

Expand Down
5 changes: 3 additions & 2 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
Expand Down Expand Up @@ -169,11 +170,11 @@ impl FileBlockReader {
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
pub async fn read_blk(&self, blknum: u32, ctx: &RequestContext) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum)
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
Expand Down
16 changes: 9 additions & 7 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! used to keep in-memory layers spilled on disk.
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
Expand Down Expand Up @@ -61,13 +62,13 @@ impl EphemeralFile {
self.len
}

pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
pub(crate) async fn read_blk(&self, blknum: u32, ctx: &RequestContext) -> Result<BlockLease, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
Expand Down Expand Up @@ -103,7 +104,7 @@ impl EphemeralFile {
}
}

pub(crate) async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
pub(crate) async fn write_blob(&mut self, srcbuf: &[u8], ctx: &RequestContext) -> Result<u64, io::Error> {
struct Writer<'a> {
ephemeral_file: &'a mut EphemeralFile,
/// The block to which the next [`push_bytes`] will write.
Expand All @@ -120,7 +121,7 @@ impl EphemeralFile {
})
}
#[inline(always)]
async fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
async fn push_bytes(&mut self, src: &[u8], ctx: &RequestContext) -> Result<(), io::Error> {
let mut src_remaining = src;
while !src_remaining.is_empty() {
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
Expand All @@ -146,6 +147,7 @@ impl EphemeralFile {
.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
ctx,
)
.await
{
Expand Down Expand Up @@ -199,15 +201,15 @@ impl EphemeralFile {
if srcbuf.len() < 0x80 {
// short one-byte length header
let len_buf = [srcbuf.len() as u8];
writer.push_bytes(&len_buf).await?;
writer.push_bytes(&len_buf, ctx).await?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
writer.push_bytes(&len_buf).await?;
writer.push_bytes(&len_buf, ctx).await?;
}

// Write the payload
writer.push_bytes(srcbuf).await?;
writer.push_bytes(srcbuf, ctx).await?;

if srcbuf.len() < 0x80 {
self.len += 1;
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl InMemoryLayer {

/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value, ctx: &RequestContext) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let inner: &mut _ = &mut *self.inner.write().await;
self.assert_writable();
Expand All @@ -275,7 +275,7 @@ impl InMemoryLayer {
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
buf.clear();
val.ser_into(&mut buf)?;
inner.file.write_blob(&buf).await?
inner.file.write_blob(&buf, ctx).await?
};

let vec_map = inner.index.entry(key).or_default();
Expand Down
14 changes: 7 additions & 7 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl Timeline {
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Expand Down Expand Up @@ -2518,13 +2518,13 @@ impl Timeline {
}
}

async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
async fn lookup_cached_page(&self, key: &Key, lsn: Lsn, ctx: &RequestContext) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();

// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)
.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn, ctx)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
Expand Down Expand Up @@ -2558,10 +2558,10 @@ impl Timeline {
Ok(layer)
}

async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value, ctx: &RequestContext) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn).await?;
layer.put_value(key, lsn, val).await?;
layer.put_value(key, lsn, val, ctx).await?;
Ok(())
}

Expand Down Expand Up @@ -4699,8 +4699,8 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value).await
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value, ctx: &RequestContext) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value, ctx).await
}

pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
Expand Down
Loading

0 comments on commit 2ae2d21

Please sign in to comment.