Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

page_cache: find_victim: don't spin while there's no chance for a slot #5319

Merged
merged 7 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
},
});

pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_cache_acquire_pinned_slot_seconds",
"Time spent acquiring a pinned slot in the page cache",
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});

pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wait_lsn_seconds",
Expand Down
117 changes: 93 additions & 24 deletions pageserver/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use anyhow::Context;
Expand Down Expand Up @@ -224,30 +228,38 @@ pub struct PageCache {
/// The actual buffers with their metadata.
slots: Box<[Slot]>,

pinned_slots: Arc<tokio::sync::Semaphore>,

/// Index of the next candidate to evict, for the Clock replacement algorithm.
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,

size_metrics: &'static PageCacheSizeMetrics,
}

struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);

///
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
pub struct PageReadGuard<'i> {
#[allow(unused)]
problame marked this conversation as resolved.
Show resolved Hide resolved
permit: PinnedSlotsPermit,
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
}

impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];

fn deref(&self) -> &Self::Target {
self.0.buf
self.slot_guard.buf
}
}

impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.0.buf
self.slot_guard.buf
}
}

Expand All @@ -264,6 +276,9 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
pub struct PageWriteGuard<'i> {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,

#[allow(unused)]
problame marked this conversation as resolved.
Show resolved Hide resolved
permit: PinnedSlotsPermit,

// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
Expand Down Expand Up @@ -347,6 +362,11 @@ impl PageCache {
key: &Key,
lsn: Lsn,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
// TODO metric?
problame marked this conversation as resolved.
Show resolved Hide resolved
return None;
};

crate::metrics::PAGE_CACHE
.read_accesses_materialized_page
.inc();
Expand All @@ -360,7 +380,10 @@ impl PageCache {
lsn,
};

if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
if let Some(guard) = self
.try_lock_for_read(&mut cache_key, &mut Some(permit))
.await
{
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
Expand Down Expand Up @@ -440,6 +463,25 @@ impl PageCache {
// "mappings" after this section. But the routines in this section should
// not require changes.

async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
let _timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
match tokio::time::timeout(
// Choose small timeout, neon_smgr does its own retries.
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
Duration::from_secs(10),
Arc::clone(&self.pinned_slots).acquire_owned(),
)
.await
{
Ok(res) => Ok(PinnedSlotsPermit(
res.expect("this semaphore is never closed"),
)),
Err(_timeout) => {
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
}
}
}

/// Look up a page in the cache.
///
/// If the search criteria is not exact, *cache_key is updated with the key
Expand All @@ -449,7 +491,11 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
async fn try_lock_for_read(
&self,
cache_key: &mut CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
Expand All @@ -459,7 +505,10 @@ impl PageCache {
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard(inner));
return Some(PageReadGuard {
permit: permit.take().unwrap(),
slot_guard: inner,
});
} else {
// search_mapping might have modified the search key; restore it.
*cache_key = cache_key_orig;
Expand Down Expand Up @@ -498,6 +547,8 @@ impl PageCache {
/// ```
///
async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);

let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
Expand All @@ -512,17 +563,21 @@ impl PageCache {
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard));
}
debug_assert!(permit.is_some());
is_first_iteration = false;

// Not found. Find a victim buffer
let (slot_idx, mut inner) =
self.find_victim().context("Failed to find evict victim")?;
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;

// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
Expand All @@ -545,6 +600,7 @@ impl PageCache {
slot.set_usage_count(1);

return Ok(ReadBufResult::NotFound(PageWriteGuard {
permit: permit.take().unwrap(),
inner,
valid: false,
}));
Expand All @@ -555,7 +611,11 @@ impl PageCache {
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
Expand All @@ -564,7 +624,11 @@ impl PageCache {
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageWriteGuard { inner, valid: true });
return Some(PageWriteGuard {
inner,
valid: true,
permit: permit.take().unwrap(),
});
}
}
None
Expand All @@ -575,15 +639,20 @@ impl PageCache {
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
return Ok(WriteBufResult::Found(write_guard));
}
debug_assert!(permit.is_some());

// Not found. Find a victim buffer
let (slot_idx, mut inner) =
self.find_victim().context("Failed to find evict victim")?;
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;

// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
Expand All @@ -608,6 +677,7 @@ impl PageCache {
return Ok(WriteBufResult::NotFound(PageWriteGuard {
inner,
valid: false,
permit: permit.take().unwrap(),
}));
}
}
Expand Down Expand Up @@ -758,8 +828,11 @@ impl PageCache {
/// Find a slot to evict.
///
/// On return, the slot is empty and write-locked.
fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
async fn find_victim(
&self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * (usize::try_from(MAX_USAGE_COUNT).unwrap());
problame marked this conversation as resolved.
Show resolved Hide resolved
let mut iters = 0;
loop {
iters += 1;
Expand All @@ -771,14 +844,9 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
// If we have looped through the whole buffer pool 10 times
// and still haven't found a victim buffer, something's wrong.
// Maybe all the buffers were in locked. That could happen in
// theory, if you have more threads holding buffers locked than
// there are buffers in the pool. In practice, with a reasonably
// large buffer pool it really shouldn't happen.
if iters > iter_limit {
anyhow::bail!("exceeded evict iter limit");
// we take the _permit_witness as proof
unreachable!("we hold a slot permit, there mut be one not-locked slot");
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
}
continue;
}
Expand Down Expand Up @@ -827,6 +895,7 @@ impl PageCache {
slots,
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
}
}
}
Expand Down