diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index de94eb81527f..f85f525630b7 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -264,6 +264,46 @@ pub static PAGE_CACHE_SIZE: Lazy = Lazy::new(|| PageCacheS }, }); +pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy = Lazy::new(|| { + register_histogram!( + "pageserver_page_cache_acquire_pinned_slot_seconds", + "Time spent acquiring a pinned slot in the page cache", + CRITICAL_OP_BUCKETS.into(), + ) + .expect("failed to define a metric") +}); + +pub(crate) static PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_page_cache_find_victim_iters_total", + "Counter for the number of iterations in the find_victim loop", + ) + .expect("failed to define a metric") +}); + +static PAGE_CACHE_ERRORS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "page_cache_errors_total", + "Number of timeouts while acquiring a pinned slot in the page cache", + &["error_kind"] + ) + .expect("failed to define a metric") +}); + +#[derive(IntoStaticStr)] +#[strum(serialize_all = "kebab_case")] +pub(crate) enum PageCacheErrorKind { + AcquirePinnedSlotTimeout, + EvictIterLimit, +} + +pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) { + PAGE_CACHE_ERRORS + .get_metric_with_label_values(&[error_kind.into()]) + .unwrap() + .inc(); +} + pub(crate) static WAIT_LSN_TIME: Lazy = Lazy::new(|| { register_histogram!( "pageserver_wait_lsn_seconds", diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 38b169ea8507..97ca2bfea721 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -75,7 +75,11 @@ use std::{ collections::{hash_map::Entry, HashMap}, convert::TryInto, - sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, + Arc, Weak, + }, + time::Duration, }; use anyhow::Context; @@ -165,6 +169,8 @@ struct Slot { struct SlotInner { key: Option, + // for `coalesce_readers_permit` + permit: std::sync::Mutex>, buf: &'static mut [u8; PAGE_SZ], } @@ -207,6 +213,22 @@ impl Slot { } } +impl SlotInner { + /// If there is aready a reader, drop our permit and share its permit, just like we share read access. + fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc { + let mut guard = self.permit.lock().unwrap(); + if let Some(existing_permit) = guard.upgrade() { + drop(guard); + drop(permit); + existing_permit + } else { + let permit = Arc::new(permit); + *guard = Arc::downgrade(&permit); + permit + } + } +} + pub struct PageCache { /// This contains the mapping from the cache key to buffer slot that currently /// contains the page, if any. @@ -224,6 +246,8 @@ pub struct PageCache { /// The actual buffers with their metadata. slots: Box<[Slot]>, + pinned_slots: Arc, + /// Index of the next candidate to evict, for the Clock replacement algorithm. /// This is interpreted modulo the page cache size. next_evict_slot: AtomicUsize, @@ -231,23 +255,28 @@ pub struct PageCache { size_metrics: &'static PageCacheSizeMetrics, } +struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); + /// /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked /// until the guard is dropped. /// -pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>); +pub struct PageReadGuard<'i> { + _permit: Arc, + slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>, +} impl std::ops::Deref for PageReadGuard<'_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.0.buf + self.slot_guard.buf } } impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { fn as_ref(&self) -> &[u8; PAGE_SZ] { - self.0.buf + self.slot_guard.buf } } @@ -264,6 +293,8 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { pub struct PageWriteGuard<'i> { inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, + _permit: PinnedSlotsPermit, + // Are the page contents currently valid? // Used to mark pages as invalid that are assigned but not yet filled with data. valid: bool, @@ -348,6 +379,10 @@ impl PageCache { lsn: Lsn, ctx: &RequestContext, ) -> Option<(Lsn, PageReadGuard)> { + let Ok(permit) = self.try_get_pinned_slot_permit().await else { + return None; + }; + crate::metrics::PAGE_CACHE .for_ctx(ctx) .read_accesses_materialized_page @@ -362,7 +397,10 @@ impl PageCache { lsn, }; - if let Some(guard) = self.try_lock_for_read(&mut cache_key).await { + if let Some(guard) = self + .try_lock_for_read(&mut cache_key, &mut Some(permit)) + .await + { if let CacheKey::MaterializedPage { hash_key: _, lsn: available_lsn, @@ -445,6 +483,29 @@ impl PageCache { // "mappings" after this section. But the routines in this section should // not require changes. + async fn try_get_pinned_slot_permit(&self) -> anyhow::Result { + let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer(); + match tokio::time::timeout( + // Choose small timeout, neon_smgr does its own retries. + // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869 + Duration::from_secs(10), + Arc::clone(&self.pinned_slots).acquire_owned(), + ) + .await + { + Ok(res) => Ok(PinnedSlotsPermit( + res.expect("this semaphore is never closed"), + )), + Err(_timeout) => { + timer.stop_and_discard(); + crate::metrics::page_cache_errors_inc( + crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout, + ); + anyhow::bail!("timeout: there were page guards alive for all page cache slots") + } + } + } + /// Look up a page in the cache. /// /// If the search criteria is not exact, *cache_key is updated with the key @@ -454,7 +515,11 @@ impl PageCache { /// /// If no page is found, returns None and *cache_key is left unmodified. /// - async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option { + async fn try_lock_for_read( + &self, + cache_key: &mut CacheKey, + permit: &mut Option, + ) -> Option { let cache_key_orig = cache_key.clone(); if let Some(slot_idx) = self.search_mapping(cache_key) { // The page was found in the mapping. Lock the slot, and re-check @@ -464,7 +529,10 @@ impl PageCache { let inner = slot.inner.read().await; if inner.key.as_ref() == Some(cache_key) { slot.inc_usage_count(); - return Some(PageReadGuard(inner)); + return Some(PageReadGuard { + _permit: inner.coalesce_readers_permit(permit.take().unwrap()), + slot_guard: inner, + }); } else { // search_mapping might have modified the search key; restore it. *cache_key = cache_key_orig; @@ -507,6 +575,8 @@ impl PageCache { cache_key: &mut CacheKey, ctx: &RequestContext, ) -> anyhow::Result { + let mut permit = Some(self.try_get_pinned_slot_permit().await?); + let (read_access, hit) = match cache_key { CacheKey::MaterializedPage { .. } => { unreachable!("Materialized pages use lookup_materialized_page") @@ -523,17 +593,21 @@ impl PageCache { let mut is_first_iteration = true; loop { // First check if the key already exists in the cache. - if let Some(read_guard) = self.try_lock_for_read(cache_key).await { + if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await { + debug_assert!(permit.is_none()); if is_first_iteration { hit.inc(); } return Ok(ReadBufResult::Found(read_guard)); } + debug_assert!(permit.is_some()); is_first_iteration = false; // Not found. Find a victim buffer - let (slot_idx, mut inner) = - self.find_victim().context("Failed to find evict victim")?; + let (slot_idx, mut inner) = self + .find_victim(permit.as_ref().unwrap()) + .await + .context("Failed to find evict victim")?; // Insert mapping for this. At this point, we may find that another // thread did the same thing concurrently. In that case, we evicted @@ -555,7 +629,16 @@ impl PageCache { inner.key = Some(cache_key.clone()); slot.set_usage_count(1); + debug_assert!( + { + let guard = inner.permit.lock().unwrap(); + guard.upgrade().is_none() + }, + "we hold a write lock, so, no one else should have a permit" + ); + return Ok(ReadBufResult::NotFound(PageWriteGuard { + _permit: permit.take().unwrap(), inner, valid: false, })); @@ -566,7 +649,11 @@ impl PageCache { /// found, returns None. /// /// When locking a page for writing, the search criteria is always "exact". - async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option { + async fn try_lock_for_write( + &self, + cache_key: &CacheKey, + permit: &mut Option, + ) -> Option { if let Some(slot_idx) = self.search_mapping_for_write(cache_key) { // The page was found in the mapping. Lock the slot, and re-check // that it's still what we expected (because we don't released the mapping @@ -575,7 +662,18 @@ impl PageCache { let inner = slot.inner.write().await; if inner.key.as_ref() == Some(cache_key) { slot.inc_usage_count(); - return Some(PageWriteGuard { inner, valid: true }); + debug_assert!( + { + let guard = inner.permit.lock().unwrap(); + guard.upgrade().is_none() + }, + "we hold a write lock, so, no one else should have a permit" + ); + return Some(PageWriteGuard { + _permit: permit.take().unwrap(), + inner, + valid: true, + }); } } None @@ -586,15 +684,20 @@ impl PageCache { /// Similar to lock_for_read(), but the returned buffer is write-locked and /// may be modified by the caller even if it's already found in the cache. async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result { + let mut permit = Some(self.try_get_pinned_slot_permit().await?); loop { // First check if the key already exists in the cache. - if let Some(write_guard) = self.try_lock_for_write(cache_key).await { + if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await { + debug_assert!(permit.is_none()); return Ok(WriteBufResult::Found(write_guard)); } + debug_assert!(permit.is_some()); // Not found. Find a victim buffer - let (slot_idx, mut inner) = - self.find_victim().context("Failed to find evict victim")?; + let (slot_idx, mut inner) = self + .find_victim(permit.as_ref().unwrap()) + .await + .context("Failed to find evict victim")?; // Insert mapping for this. At this point, we may find that another // thread did the same thing concurrently. In that case, we evicted @@ -616,7 +719,16 @@ impl PageCache { inner.key = Some(cache_key.clone()); slot.set_usage_count(1); + debug_assert!( + { + let guard = inner.permit.lock().unwrap(); + guard.upgrade().is_none() + }, + "we hold a write lock, so, no one else should have a permit" + ); + return Ok(WriteBufResult::NotFound(PageWriteGuard { + _permit: permit.take().unwrap(), inner, valid: false, })); @@ -769,7 +881,10 @@ impl PageCache { /// Find a slot to evict. /// /// On return, the slot is empty and write-locked. - fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { + async fn find_victim( + &self, + _permit_witness: &PinnedSlotsPermit, + ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { let iter_limit = self.slots.len() * 10; let mut iters = 0; loop { @@ -782,13 +897,40 @@ impl PageCache { let mut inner = match slot.inner.try_write() { Ok(inner) => inner, Err(_err) => { - // If we have looped through the whole buffer pool 10 times - // and still haven't found a victim buffer, something's wrong. - // Maybe all the buffers were in locked. That could happen in - // theory, if you have more threads holding buffers locked than - // there are buffers in the pool. In practice, with a reasonably - // large buffer pool it really shouldn't happen. if iters > iter_limit { + // NB: Even with the permits, there's no hard guarantee that we will find a slot with + // any particular number of iterations: other threads might race ahead and acquire and + // release pins just as we're scanning the array. + // + // Imagine that nslots is 2, and as starting point, usage_count==1 on all + // slots. There are two threads running concurrently, A and B. A has just + // acquired the permit from the semaphore. + // + // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search + // B: Acquire permit. + // B: Look at slot 2, decrement its usage_count to zero and continue the search + // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1. + // B: Release pin and permit again + // B: Acquire permit. + // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1. + // B: Release pin and permit again + // + // Now we're back in the starting situation that both slots have + // usage_count 1, but A has now been through one iteration of the + // find_victim() loop. This can repeat indefinitely and on each + // iteration, A's iteration count increases by one. + // + // So, even though the semaphore for the permits is fair, the victim search + // itself happens in parallel and is not fair. + // Hence even with a permit, a task can theoretically be starved. + // To avoid this, we'd need tokio to give priority to tasks that are holding + // permits for longer. + // Note that just yielding to tokio during iteration without such + // priority boosting is likely counter-productive. We'd just give more opportunities + // for B to bump usage count, further starving A. + crate::metrics::page_cache_errors_inc( + crate::metrics::PageCacheErrorKind::EvictIterLimit, + ); anyhow::bail!("exceeded evict iter limit"); } continue; @@ -799,6 +941,7 @@ impl PageCache { self.remove_mapping(old_key); inner.key = None; } + crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64); return Ok((slot_idx, inner)); } } @@ -826,7 +969,11 @@ impl PageCache { let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); Slot { - inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }), + inner: tokio::sync::RwLock::new(SlotInner { + key: None, + buf, + permit: std::sync::Mutex::new(Weak::new()), + }), usage_count: AtomicU8::new(0), } }) @@ -838,6 +985,7 @@ impl PageCache { slots, next_evict_slot: AtomicUsize::new(0), size_metrics, + pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), } } }