diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index ee331ea154dd..86d0390c30b1 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -88,13 +88,16 @@ use crate::task_mgr::TaskKind; +pub(crate) mod optional_counter; + // The main structure of this module, see module-level comment. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct RequestContext { task_kind: TaskKind, download_behavior: DownloadBehavior, access_stats_behavior: AccessStatsBehavior, page_content_kind: PageContentKind, + pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32, } /// The kind of access to the page cache. @@ -150,6 +153,7 @@ impl RequestContextBuilder { download_behavior: DownloadBehavior::Download, access_stats_behavior: AccessStatsBehavior::Update, page_content_kind: PageContentKind::Unknown, + micros_spent_throttled: Default::default(), }, } } @@ -163,6 +167,7 @@ impl RequestContextBuilder { download_behavior: original.download_behavior, access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, + micros_spent_throttled: Default::default(), }, } } diff --git a/pageserver/src/context/optional_counter.rs b/pageserver/src/context/optional_counter.rs new file mode 100644 index 000000000000..100c649f18cb --- /dev/null +++ b/pageserver/src/context/optional_counter.rs @@ -0,0 +1,101 @@ +use std::{ + sync::atomic::{AtomicU32, Ordering}, + time::Duration, +}; + +#[derive(Debug)] +pub struct CounterU32 { + inner: AtomicU32, +} +impl Default for CounterU32 { + fn default() -> Self { + Self { + inner: AtomicU32::new(u32::MAX), + } + } +} +impl CounterU32 { + pub fn open(&self) -> Result<(), &'static str> { + match self + .inner + .compare_exchange(u32::MAX, 0, Ordering::Relaxed, Ordering::Relaxed) + { + Ok(_) => Ok(()), + Err(_) => Err("open() called on clsoed state"), + } + } + pub fn close(&self) -> Result { + match self.inner.swap(u32::MAX, Ordering::Relaxed) { + u32::MAX => Err("close() called on closed state"), + x => Ok(x), + } + } + + pub fn add(&self, count: u32) -> Result<(), &'static str> { + if count == 0 { + return Ok(()); + } + let mut had_err = None; + self.inner + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| match cur { + u32::MAX => { + had_err = Some("add() called on closed state"); + None + } + x => { + let (new, overflowed) = x.overflowing_add(count); + if new == u32::MAX || overflowed { + had_err = Some("add() overflowed the counter"); + None + } else { + Some(new) + } + } + }) + .map_err(|_| had_err.expect("we set it whenever the function returns None")) + .map(|_| ()) + } +} + +#[derive(Default, Debug)] +pub struct MicroSecondsCounterU32 { + inner: CounterU32, +} + +impl MicroSecondsCounterU32 { + pub fn open(&self) -> Result<(), &'static str> { + self.inner.open() + } + pub fn add(&self, duration: Duration) -> Result<(), &'static str> { + match duration.as_micros().try_into() { + Ok(x) => self.inner.add(x), + Err(_) => Err("add(): duration conversion error"), + } + } + pub fn close_and_checked_sub_from(&self, from: Duration) -> Result { + let val = self.inner.close()?; + let val = Duration::from_micros(val as u64); + let subbed = match from.checked_sub(val) { + Some(v) => v, + None => return Err("Duration::checked_sub"), + }; + Ok(subbed) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_basic() { + let counter = MicroSecondsCounterU32::default(); + counter.open().unwrap(); + counter.add(Duration::from_micros(23)).unwrap(); + let res = counter + .close_and_checked_sub_from(Duration::from_micros(42)) + .unwrap(); + assert_eq!(res, Duration::from_micros(42 - 23)); + } +} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 1d894ed8a5db..5661e01a8873 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -11,6 +11,7 @@ use once_cell::sync::Lazy; use pageserver_api::shard::TenantShardId; use strum::{EnumCount, IntoEnumIterator, VariantNames}; use strum_macros::{EnumVariantNames, IntoStaticStr}; +use tracing::warn; use utils::id::TimelineId; /// Prometheus histogram buckets (in seconds) for operations in the critical @@ -1005,15 +1006,39 @@ impl GlobalAndPerTimelineHistogram { } } -struct GlobalAndPerTimelineHistogramTimer<'a> { +struct GlobalAndPerTimelineHistogramTimer<'a, 'c> { h: &'a GlobalAndPerTimelineHistogram, + ctx: &'c RequestContext, start: std::time::Instant, + op: SmgrQueryType, } -impl<'a> Drop for GlobalAndPerTimelineHistogramTimer<'a> { +impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { fn drop(&mut self) { let elapsed = self.start.elapsed(); - self.h.observe(elapsed.as_secs_f64()); + let ex_throttled = self + .ctx + .micros_spent_throttled + .close_and_checked_sub_from(elapsed); + let ex_throttled = match ex_throttled { + Ok(res) => res, + Err(error) => { + use utils::rate_limit::RateLimit; + static LOGGED: Lazy>> = + Lazy::new(|| { + Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { + RateLimit::new(Duration::from_secs(10)) + }))) + }); + let mut guard = LOGGED.lock().unwrap(); + let rate_limit = &mut guard[self.op]; + rate_limit.call(|| { + warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit"); + }); + elapsed + } + }; + self.h.observe(ex_throttled.as_secs_f64()); } } @@ -1025,6 +1050,7 @@ impl<'a> Drop for GlobalAndPerTimelineHistogramTimer<'a> { strum_macros::EnumCount, strum_macros::EnumIter, strum_macros::FromRepr, + enum_map::Enum, )] #[strum(serialize_all = "snake_case")] pub enum SmgrQueryType { @@ -1130,11 +1156,35 @@ impl SmgrQueryTimePerTimeline { }); Self { metrics } } - pub(crate) fn start_timer(&self, op: SmgrQueryType) -> impl Drop + '_ { + pub(crate) fn start_timer<'c: 'a, 'a>( + &'a self, + op: SmgrQueryType, + ctx: &'c RequestContext, + ) -> impl Drop + '_ { let metric = &self.metrics[op as usize]; + let start = Instant::now(); + match ctx.micros_spent_throttled.open() { + Ok(()) => (), + Err(error) => { + use utils::rate_limit::RateLimit; + static LOGGED: Lazy>> = + Lazy::new(|| { + Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { + RateLimit::new(Duration::from_secs(10)) + }))) + }); + let mut guard = LOGGED.lock().unwrap(); + let rate_limit = &mut guard[op]; + rate_limit.call(|| { + warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit"); + }); + } + } GlobalAndPerTimelineHistogramTimer { h: metric, - start: std::time::Instant::now(), + ctx, + start, + op, } } } @@ -1145,6 +1195,11 @@ mod smgr_query_time_tests { use strum::IntoEnumIterator; use utils::id::{TenantId, TimelineId}; + use crate::{ + context::{DownloadBehavior, RequestContext}, + task_mgr::TaskKind, + }; + // Regression test, we used hard-coded string constants before using an enum. #[test] fn op_label_name() { @@ -1193,7 +1248,8 @@ mod smgr_query_time_tests { let (pre_global, pre_per_tenant_timeline) = get_counts(); assert_eq!(pre_per_tenant_timeline, 0); - let timer = metrics.start_timer(*op); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); + let timer = metrics.start_timer(*op, &ctx); drop(timer); let (post_global, post_per_tenant_timeline) = get_counts(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 11eb512750e1..95efca58038d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -911,7 +911,7 @@ impl PageServerHandler { let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetRelExists); + .start_timer(metrics::SmgrQueryType::GetRelExists, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = @@ -939,7 +939,7 @@ impl PageServerHandler { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetRelSize); + .start_timer(metrics::SmgrQueryType::GetRelSize, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = @@ -967,7 +967,7 @@ impl PageServerHandler { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetDbSize); + .start_timer(metrics::SmgrQueryType::GetDbSize, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = @@ -1145,7 +1145,7 @@ impl PageServerHandler { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetPageAtLsn); + .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = @@ -1173,7 +1173,7 @@ impl PageServerHandler { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetSlruSegment); + .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 45ce6c938155..57c3edcddd29 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -217,7 +217,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { } let allowed_rps = tenant.timeline_get_throttle.steady_rps(); let delta = now - prev; - warn!( + info!( n_seconds=%format_args!("{:.3}", delta.as_secs_f64()), count_accounted, diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 6894a88b930d..280773e9c305 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -2,14 +2,14 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicU64, Ordering}, - Arc, + Arc, Mutex, }, time::{Duration, Instant}, }; use arc_swap::ArcSwap; use enumset::EnumSet; -use tracing::error; +use tracing::{error, warn}; use crate::{context::RequestContext, task_mgr::TaskKind}; @@ -157,6 +157,19 @@ where .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed); let observation = Observation { wait_time }; self.metric.observe_throttling(&observation); + match ctx.micros_spent_throttled.add(wait_time) { + Ok(res) => res, + Err(error) => { + use once_cell::sync::Lazy; + use utils::rate_limit::RateLimit; + static WARN_RATE_LIMIT: Lazy> = + Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10)))); + let mut guard = WARN_RATE_LIMIT.lock().unwrap(); + guard.call(move || { + warn!(error, "error adding time spent throttled; this message is logged at a global rate limit"); + }); + } + } } } } diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py new file mode 100644 index 000000000000..42cc28efee2d --- /dev/null +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -0,0 +1,118 @@ +import json +import uuid + +from anyio import Path +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin +from fixtures.pg_version import PgVersion +from fixtures.types import TenantId, TimelineId +from fixtures.utils import wait_until + + +def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + env = neon_env_builder.init_start() + + env.pageserver.tenant_detach(env.initial_tenant) + + env.pageserver.allowed_errors.append( + # https://github.com/neondatabase/neon/issues/6925 + r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*" + ) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + + rate_limit_rps = 100 + compaction_period = 5 + env.pageserver.tenant_create( + tenant_id, + conf={ + "compaction_period": f"{compaction_period}s", + "timeline_get_throttle": { + "task_kinds": ["PageRequestHandler"], + "initial": 0, + "refill_interval": "100ms", + "refill_amount": int(rate_limit_rps / 10), + "max": int(rate_limit_rps / 10), + "fair": True, + }, + }, + ) + + ps_http = env.pageserver.http_client() + + ps_http.timeline_create(PgVersion.V16, tenant_id, timeline_id) + + def run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs: int): + cmd = [ + str(env.neon_binpath / "pagebench"), + "get-page-latest-lsn", + "--mgmt-api-endpoint", + ps_http.base_url, + "--page-service-connstring", + env.pageserver.connstr(password=None), + "--runtime", + f"{duration_secs}s", + f"{tenant_id}/{timeline_id}", + ] + + basepath = pg_bin.run_capture(cmd, with_command_header=False) + results_path = Path(basepath + ".stdout") + log.info(f"Benchmark results at: {results_path}") + + with open(results_path, "r") as f: + results = json.load(f) + log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}") + return int(results["total"]["request_count"]) + + log.info("warmup / make sure metrics are present") + run_pagebench_at_max_speed_and_get_total_requests_completed(2) + metrics_query = { + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "smgr_query_type": "get_page_at_lsn", + } + metric_name = "pageserver_smgr_query_seconds_sum" + smgr_query_seconds_pre = ps_http.get_metric_value(metric_name, metrics_query) + assert smgr_query_seconds_pre is not None + + marker = uuid.uuid4().hex + ps_http.post_tracing_event("info", marker) + _, marker_offset = wait_until( + 10, 0.5, lambda: env.pageserver.assert_log_contains(marker, offset=None) + ) + + log.info("run pagebench") + duration_secs = 10 + actual_ncompleted = run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs) + + log.info("validate the client is capped at the configured rps limit") + expect_ncompleted = duration_secs * rate_limit_rps + delta_abs = abs(expect_ncompleted - actual_ncompleted) + threshold = 0.05 * expect_ncompleted + assert ( + threshold / rate_limit_rps < 0.1 * duration_secs + ), "test self-test: unrealistic expecations regarding precision in this test" + assert ( + delta_abs < 0.05 * expect_ncompleted + ), "the throttling deviates more than 5percent from the expectation" + + log.info("validate that we logged the throttling") + + wait_until( + 10, + compaction_period / 10, + lambda: env.pageserver.assert_log_contains( + f".*{tenant_id}.*shard was throttled in the last n_seconds.*", + offset=marker_offset, + ), + ) + + log.info("validate that the metric doesn't include throttle wait time") + smgr_query_seconds_post = ps_http.get_metric_value(metric_name, metrics_query) + assert smgr_query_seconds_post is not None + actual_smgr_query_seconds = smgr_query_seconds_post - smgr_query_seconds_pre + + assert ( + duration_secs >= 10 * actual_smgr_query_seconds + ), "smgr metrics should not include throttle wait time"