Skip to content

Commit

Permalink
page_cache: find_victim: don't spin while there's no chance for a slot
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Sep 15, 2023
1 parent e6985bd commit 64cb058
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 24 deletions.
9 changes: 9 additions & 0 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
},
});

pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_cache_acquire_pinned_slot_seconds",
"Time spent acquiring a pinned slot in the page cache",
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});

pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wait_lsn_seconds",
Expand Down
117 changes: 93 additions & 24 deletions pageserver/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use anyhow::Context;
Expand Down Expand Up @@ -224,30 +228,38 @@ pub struct PageCache {
/// The actual buffers with their metadata.
slots: Box<[Slot]>,

pinned_slots: Arc<tokio::sync::Semaphore>,

/// Index of the next candidate to evict, for the Clock replacement algorithm.
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,

size_metrics: &'static PageCacheSizeMetrics,
}

struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);

///
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
pub struct PageReadGuard<'i> {
#[allow(unused)]
permit: PinnedSlotsPermit,
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
}

impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];

fn deref(&self) -> &Self::Target {
self.0.buf
self.slot_guard.buf
}
}

impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.0.buf
self.slot_guard.buf
}
}

Expand All @@ -264,6 +276,9 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
pub struct PageWriteGuard<'i> {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,

#[allow(unused)]
permit: PinnedSlotsPermit,

// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
Expand Down Expand Up @@ -347,6 +362,11 @@ impl PageCache {
key: &Key,
lsn: Lsn,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
// TODO metric?
return None;
};

crate::metrics::PAGE_CACHE
.read_accesses_materialized_page
.inc();
Expand All @@ -360,7 +380,10 @@ impl PageCache {
lsn,
};

if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
if let Some(guard) = self
.try_lock_for_read(&mut cache_key, &mut Some(permit))
.await
{
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
Expand Down Expand Up @@ -440,6 +463,25 @@ impl PageCache {
// "mappings" after this section. But the routines in this section should
// not require changes.

async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
let _timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
match tokio::time::timeout(
// Choose small timeout, neon_smgr does its own retries.
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
Duration::from_secs(10),
Arc::clone(&self.pinned_slots).acquire_owned(),
)
.await
{
Ok(res) => Ok(PinnedSlotsPermit(
res.expect("this semaphore is never closed"),
)),
Err(_timeout) => {
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
}
}
}

/// Look up a page in the cache.
///
/// If the search criteria is not exact, *cache_key is updated with the key
Expand All @@ -449,7 +491,11 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
async fn try_lock_for_read(
&self,
cache_key: &mut CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
Expand All @@ -459,7 +505,10 @@ impl PageCache {
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard(inner));
return Some(PageReadGuard {
permit: permit.take().unwrap(),
slot_guard: inner,
});
} else {
// search_mapping might have modified the search key; restore it.
*cache_key = cache_key_orig;
Expand Down Expand Up @@ -498,6 +547,8 @@ impl PageCache {
/// ```
///
async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);

let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
Expand All @@ -512,17 +563,21 @@ impl PageCache {
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard));
}
debug_assert!(permit.is_some());
is_first_iteration = false;

// Not found. Find a victim buffer
let (slot_idx, mut inner) =
self.find_victim().context("Failed to find evict victim")?;
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;

// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
Expand All @@ -545,6 +600,7 @@ impl PageCache {
slot.set_usage_count(1);

return Ok(ReadBufResult::NotFound(PageWriteGuard {
permit: permit.take().unwrap(),
inner,
valid: false,
}));
Expand All @@ -555,7 +611,11 @@ impl PageCache {
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
Expand All @@ -564,7 +624,11 @@ impl PageCache {
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageWriteGuard { inner, valid: true });
return Some(PageWriteGuard {
inner,
valid: true,
permit: permit.take().unwrap(),
});
}
}
None
Expand All @@ -575,15 +639,20 @@ impl PageCache {
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
return Ok(WriteBufResult::Found(write_guard));
}
debug_assert!(permit.is_some());

// Not found. Find a victim buffer
let (slot_idx, mut inner) =
self.find_victim().context("Failed to find evict victim")?;
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;

// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
Expand All @@ -608,6 +677,7 @@ impl PageCache {
return Ok(WriteBufResult::NotFound(PageWriteGuard {
inner,
valid: false,
permit: permit.take().unwrap(),
}));
}
}
Expand Down Expand Up @@ -758,8 +828,11 @@ impl PageCache {
/// Find a slot to evict.
///
/// On return, the slot is empty and write-locked.
fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
async fn find_victim(
&self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * (usize::try_from(MAX_USAGE_COUNT).unwrap());
let mut iters = 0;
loop {
iters += 1;
Expand All @@ -771,14 +844,9 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
// If we have looped through the whole buffer pool 10 times
// and still haven't found a victim buffer, something's wrong.
// Maybe all the buffers were in locked. That could happen in
// theory, if you have more threads holding buffers locked than
// there are buffers in the pool. In practice, with a reasonably
// large buffer pool it really shouldn't happen.
if iters > iter_limit {
anyhow::bail!("exceeded evict iter limit");
// we take the _permit_witness as proof
unreachable!("we hold a slot permit, there mut be one not-locked slot");
}
continue;
}
Expand Down Expand Up @@ -827,6 +895,7 @@ impl PageCache {
slots,
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
}
}
}
Expand Down

0 comments on commit 64cb058

Please sign in to comment.