Skip to content

Commit

Permalink
feat: only page-cache the immutable part of EphemeralFile
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Aug 15, 2023
1 parent 2854279 commit 2a5cdee
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 362 deletions.
152 changes: 16 additions & 136 deletions pageserver/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,18 @@ use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU8, AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
},
};

use anyhow::Context;
use once_cell::sync::OnceCell;
use tracing::error;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};

use crate::tenant::writeback_ephemeral_file;
use crate::{metrics::PageCacheSizeMetrics, repository::Key};

static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
Expand Down Expand Up @@ -87,6 +85,14 @@ pub fn get() -> &'static PageCache {
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
const MAX_USAGE_COUNT: u8 = 5;

static NEXT_ID: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct FileId(u64);

pub fn next_file_id() -> FileId {
FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}

///
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
///
Expand All @@ -97,12 +103,8 @@ enum CacheKey {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
EphemeralPage {
file_id: u64,
blkno: u32,
},
ImmutableFilePage {
file_id: u64,
file_id: FileId,
blkno: u32,
},
}
Expand All @@ -128,7 +130,6 @@ struct Slot {
struct SlotInner {
key: Option<CacheKey>,
buf: &'static mut [u8; PAGE_SZ],
dirty: bool,
}

impl Slot {
Expand Down Expand Up @@ -177,9 +178,7 @@ pub struct PageCache {
/// can have a separate mapping map, next to this field.
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,

ephemeral_page_map: RwLock<HashMap<(u64, u32), usize>>,

immutable_page_map: RwLock<HashMap<(u64, u32), usize>>,
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,

/// The actual buffers with their metadata.
slots: Box<[Slot]>,
Expand Down Expand Up @@ -258,14 +257,6 @@ impl PageWriteGuard<'_> {
);
self.valid = true;
}
pub fn mark_dirty(&mut self) {
// only ephemeral pages can be dirty ATM.
assert!(matches!(
self.inner.key,
Some(CacheKey::EphemeralPage { .. })
));
self.inner.dirty = true;
}
}

impl Drop for PageWriteGuard<'_> {
Expand All @@ -280,7 +271,6 @@ impl Drop for PageWriteGuard<'_> {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
self.inner.dirty = false;
}
}
}
Expand Down Expand Up @@ -388,50 +378,16 @@ impl PageCache {
Ok(())
}

// Section 1.2: Public interface functions for working with Ephemeral pages.

pub fn read_ephemeral_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::EphemeralPage { file_id, blkno };

self.lock_for_read(&mut cache_key)
}

pub fn write_ephemeral_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<WriteBufResult> {
let cache_key = CacheKey::EphemeralPage { file_id, blkno };

self.lock_for_write(&cache_key)
}

/// Immediately drop all buffers belonging to given file, without writeback
pub fn drop_buffers_for_ephemeral(&self, drop_file_id: u64) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
// Section 1.2: Public interface functions for working with immutable file pages.

let mut inner = slot.inner.write().unwrap();
if let Some(key) = &inner.key {
match key {
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
inner.dirty = false;
}
_ => {}
}
}
}
}

// Section 1.3: Public interface functions for working with immutable file pages.

pub fn read_immutable_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<ReadBufResult> {
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };

self.lock_for_read(&mut cache_key)
}

/// Immediately drop all buffers belonging to given file, without writeback
pub fn drop_buffers_for_immutable(&self, drop_file_id: u64) {
pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];

Expand All @@ -444,7 +400,6 @@ impl PageCache {
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
inner.dirty = false;
}
_ => {}
}
Expand Down Expand Up @@ -522,10 +477,6 @@ impl PageCache {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::EphemeralPage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_ephemeral,
&crate::metrics::PAGE_CACHE.read_hits_ephemeral,
),
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
&crate::metrics::PAGE_CACHE.read_hits_immutable,
Expand Down Expand Up @@ -566,7 +517,6 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
inner.dirty = false;
slot.usage_count.store(1, Ordering::Relaxed);

return Ok(ReadBufResult::NotFound(PageWriteGuard {
Expand Down Expand Up @@ -628,7 +578,6 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
inner.dirty = false;
slot.usage_count.store(1, Ordering::Relaxed);

return Ok(WriteBufResult::NotFound(PageWriteGuard {
Expand Down Expand Up @@ -667,10 +616,6 @@ impl PageCache {
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
Expand All @@ -694,10 +639,6 @@ impl PageCache {
None
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
Expand Down Expand Up @@ -731,12 +672,6 @@ impl PageCache {
panic!("could not find old key in mapping")
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
self.size_metrics.current_bytes_ephemeral.sub_page_sz(1);
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
Expand Down Expand Up @@ -776,17 +711,7 @@ impl PageCache {
}
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
self.size_metrics.current_bytes_ephemeral.add_page_sz(1);
None
}
}
}

CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
Expand Down Expand Up @@ -837,54 +762,15 @@ impl PageCache {
}
};
if let Some(old_key) = &inner.key {
if inner.dirty {
if let Err(err) = Self::writeback(old_key, inner.buf) {
// Writing the page to disk failed.
//
// FIXME: What to do here, when? We could propagate the error to the
// caller, but victim buffer is generally unrelated to the original
// call. It can even belong to a different tenant. Currently, we
// report the error to the log and continue the clock sweep to find
// a different victim. But if the problem persists, the page cache
// could fill up with dirty pages that we cannot evict, and we will
// loop retrying the writebacks indefinitely.
error!("writeback of buffer {:?} failed: {}", old_key, err);
continue;
}
}

// remove mapping for old buffer
self.remove_mapping(old_key);
inner.dirty = false;
inner.key = None;
}
return Ok((slot_idx, inner));
}
}
}

fn writeback(cache_key: &CacheKey, buf: &[u8]) -> Result<(), std::io::Error> {
match cache_key {
CacheKey::MaterializedPage {
hash_key: _,
lsn: _,
} => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"unexpected dirty materialized page",
)),
CacheKey::EphemeralPage { file_id, blkno } => {
writeback_ephemeral_file(*file_id, *blkno, buf)
}
CacheKey::ImmutableFilePage {
file_id: _,
blkno: _,
} => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"unexpected dirty immutable page",
)),
}
}

/// Initialize a new page cache
///
/// This should be called only once at page server startup.
Expand All @@ -895,7 +781,6 @@ impl PageCache {

let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_ephemeral.set_page_sz(0);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);

Expand All @@ -905,19 +790,14 @@ impl PageCache {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();

Slot {
inner: RwLock::new(SlotInner {
key: None,
buf,
dirty: false,
}),
inner: RwLock::new(SlotInner { key: None, buf }),
usage_count: AtomicU8::new(0),
}
})
.collect();

Self {
materialized_page_map: Default::default(),
ephemeral_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
Expand Down
3 changes: 0 additions & 3 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};

// re-export this function so that page_cache.rs can use it.
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;

// re-export for use in remote_timeline_client.rs
pub use crate::tenant::metadata::save_metadata;

Expand Down
Loading

0 comments on commit 2a5cdee

Please sign in to comment.