diff --git a/Cargo.lock b/Cargo.lock index 74cd2c8d2c52..e7a0d8b965ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,6 +1813,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e875f1719c16de097dee81ed675e2d9bb63096823ed3f0ca827b7dea3028bbbb" dependencies = [ "enumset_derive", + "serde", ] [[package]] @@ -2757,6 +2758,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leaky-bucket" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853" +dependencies = [ + "parking_lot 0.12.1", + "tokio", + "tracing", +] + [[package]] name = "libc" version = "0.2.150" @@ -3448,6 +3460,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "async-compression", "async-stream", "async-trait", @@ -3475,6 +3488,7 @@ dependencies = [ "humantime-serde", "hyper", "itertools", + "leaky-bucket", "md5", "metrics", "nix 0.27.1", @@ -6347,6 +6361,7 @@ dependencies = [ "hex-literal", "hyper", "jsonwebtoken", + "leaky-bucket", "metrics", "nix 0.27.1", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 8952f7627fa7..98fbc9c4f40f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ ipnet = "2.9.0" itertools = "0.10" jsonwebtoken = "9" lasso = "0.7" +leaky-bucket = "1.0.1" libc = "0.2" md5 = "0.7.0" memoffset = "0.8" diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index a1b0ba425206..8dd86bad9665 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -400,6 +400,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'lazy_slru_download' as bool")?, + timeline_get_throttle: settings + .remove("timeline_get_throttle") + .map(serde_json::from_str) + .transpose() + .context("parse `timeline_get_throttle` from json")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") @@ -505,6 +510,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'lazy_slru_download' as bool")?, + timeline_get_throttle: settings + .remove("timeline_get_throttle") + .map(serde_json::from_str) + .transpose() + .context("parse `timeline_get_throttle` from json")?, } }; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index db2292072cf8..d546cb5c5483 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -283,6 +283,7 @@ pub struct TenantConfig { pub gc_feedback: Option, pub heatmap_period: Option, pub lazy_slru_download: Option, + pub timeline_get_throttle: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -309,6 +310,35 @@ pub struct EvictionPolicyLayerAccessThreshold { pub threshold: Duration, } +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct ThrottleConfig { + pub task_kinds: Vec, // TaskKind + pub initial: usize, + #[serde(with = "humantime_serde")] + pub refill_interval: Duration, + pub refill_amount: NonZeroUsize, + pub max: usize, + pub fair: bool, +} + +impl ThrottleConfig { + pub fn disabled() -> Self { + Self { + task_kinds: vec![], // effectively disables the throttle + // other values don't matter with emtpy `task_kinds`. + initial: 0, + refill_interval: Duration::from_millis(1), + refill_amount: NonZeroUsize::new(1).unwrap(), + max: 1, + fair: true, + } + } + /// The requests per second allowed by the given config. + pub fn steady_rps(&self) -> f64 { + (self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64()) / 1e3 + } +} + /// A flattened analog of a `pagesever::tenant::LocationMode`, which /// lists out all possible states (and the virtual "Detached" state) /// in a flat form rather than using rust-style enums. diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 706b7a3187d5..983e94d96383 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -25,6 +25,7 @@ hyper = { workspace = true, features = ["full"] } fail.workspace = true futures = { workspace = true} jsonwebtoken.workspace = true +leaky-bucket.workspace = true nix.workspace = true once_cell.workspace = true pin-project-lite.workspace = true diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 95d558bb7b85..eeee2055c2eb 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -12,6 +12,7 @@ testing = ["fail/failpoints"] [dependencies] anyhow.workspace = true +arc-swap.workspace = true async-compression.workspace = true async-stream.workspace = true async-trait.workspace = true @@ -35,6 +36,7 @@ humantime.workspace = true humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true +leaky-bucket.workspace = true md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses @@ -82,7 +84,7 @@ workspace_hack.workspace = true reqwest.workspace = true rpds.workspace = true enum-map.workspace = true -enumset.workspace = true +enumset = { workspace = true, features = ["serde"]} strum.workspace = true strum_macros.workspace = true diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 647f571e599a..2838511a77e3 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -1,6 +1,5 @@ use anyhow::Context; use camino::Utf8PathBuf; -use futures::future::join_all; use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key}; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::models::PagestreamGetPageRequest; @@ -10,11 +9,10 @@ use utils::id::TenantTimelineId; use utils::lsn::Lsn; use rand::prelude::*; -use tokio::sync::Barrier; use tokio::task::JoinSet; -use tracing::{info, instrument}; +use tracing::info; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::future::Future; use std::num::NonZeroUsize; use std::pin::Pin; @@ -38,8 +36,12 @@ pub(crate) struct Args { num_clients: NonZeroUsize, #[clap(long)] runtime: Option, + /// Each client sends requests at the given rate. + /// + /// If a request takes too long and we should be issuing a new request already, + /// we skip that request and account it as `MISSED`. #[clap(long)] - per_target_rate_limit: Option, + per_client_rate: Option, /// Probability for sending `latest=true` in the request (uniform distribution). #[clap(long, default_value = "1")] req_latest_probability: f64, @@ -61,12 +63,16 @@ pub(crate) struct Args { #[derive(Debug, Default)] struct LiveStats { completed_requests: AtomicU64, + missed: AtomicU64, } impl LiveStats { - fn inc(&self) { + fn request_done(&self) { self.completed_requests.fetch_add(1, Ordering::Relaxed); } + fn missed(&self, n: u64) { + self.missed.fetch_add(n, Ordering::Relaxed); + } } #[derive(Clone, serde::Serialize, serde::Deserialize)] @@ -220,13 +226,12 @@ async fn main_impl( let live_stats = Arc::new(LiveStats::default()); - let num_client_tasks = args.num_clients.get() * timelines.len(); let num_live_stats_dump = 1; - let num_work_sender_tasks = 1; + let num_work_sender_tasks = args.num_clients.get() * timelines.len(); let num_main_impl = 1; let start_work_barrier = Arc::new(tokio::sync::Barrier::new( - num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl, + num_live_stats_dump + num_work_sender_tasks + num_main_impl, )); tokio::spawn({ @@ -238,10 +243,12 @@ async fn main_impl( let start = std::time::Instant::now(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed); + let missed = stats.missed.swap(0, Ordering::Relaxed); let elapsed = start.elapsed(); info!( - "RPS: {:.0}", - completed_requests as f64 / elapsed.as_secs_f64() + "RPS: {:.0} MISSED: {:.0}", + completed_requests as f64 / elapsed.as_secs_f64(), + missed as f64 / elapsed.as_secs_f64() ); } } @@ -249,127 +256,105 @@ async fn main_impl( let cancel = CancellationToken::new(); - let mut work_senders: HashMap = HashMap::new(); - let mut tasks = Vec::new(); - for timeline in timelines.iter().cloned() { - for num_client in 0..args.num_clients.get() { - let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are - let worker_id = WorkerId { - timeline, - num_client, - }; - work_senders.insert(worker_id, sender); - tasks.push(tokio::spawn(client( - args, - worker_id, - Arc::clone(&start_work_barrier), - receiver, - Arc::clone(&live_stats), - cancel.clone(), - ))); - } - } - - let work_sender: Pin>> = { + let rps_period = args + .per_client_rate + .map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64))); + let make_worker: &dyn Fn(WorkerId) -> Pin>> = &|worker_id| { + let live_stats = live_stats.clone(); let start_work_barrier = start_work_barrier.clone(); + let ranges: Vec = all_ranges + .iter() + .filter(|r| r.timeline == worker_id.timeline) + .cloned() + .collect(); + let weights = + rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len())) + .unwrap(); + let cancel = cancel.clone(); - match args.per_target_rate_limit { - None => Box::pin(async move { - let weights = rand::distributions::weighted::WeightedIndex::new( - all_ranges.iter().map(|v| v.len()), - ) + Box::pin(async move { + let client = + pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) + .await + .unwrap(); + let mut client = client + .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id) + .await .unwrap(); - start_work_barrier.wait().await; - - while !cancel.is_cancelled() { - let (timeline, req) = { - let mut rng = rand::thread_rng(); - let r = &all_ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - let (rel_tag, block_no) = - key_to_rel_block(key).expect("we filter non-rel-block keys out above"); - ( - WorkerId { - timeline: r.timeline, - num_client: rng.gen_range(0..args.num_clients.get()), - }, - PagestreamGetPageRequest { - latest: rng.gen_bool(args.req_latest_probability), - lsn: r.timeline_lsn, - rel: rel_tag, - blkno: block_no, - }, - ) - }; - let sender = work_senders.get(&timeline).unwrap(); - // TODO: what if this blocks? - if sender.send(req).await.is_err() { - assert!(cancel.is_cancelled(), "client has gone away unexpectedly"); + start_work_barrier.wait().await; + let client_start = Instant::now(); + let mut ticks_processed = 0; + while !cancel.is_cancelled() { + // Detect if a request took longer than the RPS rate + if let Some(period) = &rps_period { + let periods_passed_until_now = + usize::try_from(client_start.elapsed().as_micros() / period.as_micros()) + .unwrap(); + + if periods_passed_until_now > ticks_processed { + live_stats.missed((periods_passed_until_now - ticks_processed) as u64); } + ticks_processed = periods_passed_until_now; } - }), - Some(rps_limit) => Box::pin(async move { - let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); - let make_task: &dyn Fn(WorkerId) -> Pin>> = - &|worker_id| { - let sender = work_senders.get(&worker_id).unwrap(); - let ranges: Vec = all_ranges - .iter() - .filter(|r| r.timeline == worker_id.timeline) - .cloned() - .collect(); - let weights = rand::distributions::weighted::WeightedIndex::new( - ranges.iter().map(|v| v.len()), - ) - .unwrap(); - - let cancel = cancel.clone(); - Box::pin(async move { - let mut ticker = tokio::time::interval(period); - ticker.set_missed_tick_behavior( - /* TODO review this choice */ - tokio::time::MissedTickBehavior::Burst, - ); - while !cancel.is_cancelled() { - ticker.tick().await; - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(is_rel_block_key(&key)); - let (rel_tag, block_no) = key_to_rel_block(key) - .expect("we filter non-rel-block keys out above"); - PagestreamGetPageRequest { - latest: rng.gen_bool(args.req_latest_probability), - lsn: r.timeline_lsn, - rel: rel_tag, - blkno: block_no, - } - }; - if sender.send(req).await.is_err() { - assert!( - cancel.is_cancelled(), - "client has gone away unexpectedly" - ); - } - } - }) - }; - - let tasks: Vec<_> = work_senders.keys().map(|tl| make_task(*tl)).collect(); - start_work_barrier.wait().await; + let start = Instant::now(); + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(is_rel_block_key(&key)); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we filter non-rel-block keys out above"); + PagestreamGetPageRequest { + latest: rng.gen_bool(args.req_latest_probability), + lsn: r.timeline_lsn, + rel: rel_tag, + blkno: block_no, + } + }; + client.getpage(req).await.unwrap(); + let end = Instant::now(); + live_stats.request_done(); + ticks_processed += 1; + STATS.with(|stats| { + stats + .borrow() + .lock() + .unwrap() + .observe(end.duration_since(start)) + .unwrap(); + }); + + if let Some(period) = &rps_period { + let next_at = client_start + + Duration::from_micros( + (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(), + ); + tokio::time::sleep_until(next_at.into()).await; + } + } + }) + }; - join_all(tasks).await; - }), + info!("spawning workers"); + let mut workers = JoinSet::new(); + for timeline in timelines.iter().cloned() { + for num_client in 0..args.num_clients.get() { + let worker_id = WorkerId { + timeline, + num_client, + }; + workers.spawn(make_worker(worker_id)); + } + } + let workers = async move { + while let Some(res) = workers.join_next().await { + res.unwrap(); } }; - let work_sender_task = tokio::spawn(work_sender); - info!("waiting for everything to become ready"); start_work_barrier.wait().await; info!("work started"); @@ -377,20 +362,13 @@ async fn main_impl( tokio::time::sleep(runtime.into()).await; info!("runtime over, signalling cancellation"); cancel.cancel(); - work_sender_task.await.unwrap(); + workers.await; info!("work sender exited"); } else { - work_sender_task.await.unwrap(); + workers.await; unreachable!("work sender never terminates"); } - info!("joining clients"); - for t in tasks { - t.await.unwrap(); - } - - info!("all clients stopped"); - let output = Output { total: { let mut agg_stats = request_stats::Stats::new(); @@ -407,49 +385,3 @@ async fn main_impl( anyhow::Ok(()) } - -#[instrument(skip_all)] -async fn client( - args: &'static Args, - id: WorkerId, - start_work_barrier: Arc, - mut work: tokio::sync::mpsc::Receiver, - live_stats: Arc, - cancel: CancellationToken, -) { - let WorkerId { - timeline, - num_client: _, - } = id; - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - let mut client = client - .pagestream(timeline.tenant_id, timeline.timeline_id) - .await - .unwrap(); - - let do_requests = async { - start_work_barrier.wait().await; - while let Some(req) = work.recv().await { - let start = Instant::now(); - client - .getpage(req) - .await - .with_context(|| format!("getpage for {timeline}")) - .unwrap(); - let elapsed = start.elapsed(); - live_stats.inc(); - STATS.with(|stats| { - stats.borrow().lock().unwrap().observe(elapsed).unwrap(); - }); - } - }; - tokio::select! { - res = do_requests => { res }, - _ = cancel.cancelled() => { - // fallthrough to shutdown - } - } - client.shutdown().await; -} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index c2b1eafc3a1d..a0fda3960596 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2496,6 +2496,56 @@ pub mod tokio_epoll_uring { } } +pub(crate) mod tenant_throttling { + use metrics::{register_int_counter_vec, IntCounter}; + use once_cell::sync::Lazy; + + use crate::tenant::{self, throttle::Metric}; + + pub(crate) struct TimelineGet { + wait_time: IntCounter, + count: IntCounter, + } + + pub(crate) static TIMELINE_GET: Lazy = Lazy::new(|| { + static WAIT_USECS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_tenant_throttling_wait_usecs_sum_global", + "Sum of microseconds that tenants spent waiting for a tenant throttle of a given kind.", + &["kind"] + ) + .unwrap() + }); + + static WAIT_COUNT: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_tenant_throttling_count_global", + "Count of tenant throttlings, by kind of throttle.", + &["kind"] + ) + .unwrap() + }); + + let kind = "timeline_get"; + TimelineGet { + wait_time: WAIT_USECS.with_label_values(&[kind]), + count: WAIT_COUNT.with_label_values(&[kind]), + } + }); + + impl Metric for &'static TimelineGet { + #[inline(always)] + fn observe_throttling( + &self, + tenant::throttle::Observation { wait_time }: &tenant::throttle::Observation, + ) { + let val = u64::try_from(wait_time.as_micros()).unwrap(); + self.wait_time.inc_by(val); + self.count.inc(); + } + } +} + pub fn preinitialize_metrics() { // Python tests need these and on some we do alerting. // @@ -2557,4 +2607,5 @@ pub fn preinitialize_metrics() { // Custom Lazy::force(&RECONSTRUCT_TIME); + Lazy::force(&tenant_throttling::TIMELINE_GET); } diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 6317b0a7ae3c..adaa55c179c5 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -188,6 +188,7 @@ task_local! { serde::Serialize, serde::Deserialize, strum_macros::IntoStaticStr, + strum_macros::EnumString, )] pub enum TaskKind { // Pageserver startup, i.e., `main` diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e2d66711c84d..a4d3a4142a9e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -167,6 +167,8 @@ pub(crate) mod timeline; pub mod size; +pub(crate) mod throttle; + pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline}; @@ -305,6 +307,11 @@ pub struct Tenant { // Users of the Tenant such as the page service must take this Gate to avoid // trying to use a Tenant which is shutting down. pub(crate) gate: Gate, + + /// Throttle applied at the top of [`Timeline::get`]. + /// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance. + pub(crate) timeline_get_throttle: + Arc>, } impl std::fmt::Debug for Tenant { @@ -990,6 +997,7 @@ impl Tenant { TimelineResources { remote_client: Some(remote_client), deletion_queue_client: self.deletion_queue_client.clone(), + timeline_get_throttle: self.timeline_get_throttle.clone(), }, ctx, ) @@ -2075,7 +2083,7 @@ impl Tenant { }; // We have a pageserver TenantConf, we need the API-facing TenantConfig. - let tenant_config: models::TenantConfig = conf.tenant_conf.into(); + let tenant_config: models::TenantConfig = conf.tenant_conf.clone().into(); models::LocationConfig { mode: location_config_mode, @@ -2209,93 +2217,93 @@ where impl Tenant { pub fn tenant_specific_overrides(&self) -> TenantConfOpt { - self.tenant_conf.read().unwrap().tenant_conf + self.tenant_conf.read().unwrap().tenant_conf.clone() } pub fn effective_config(&self) -> TenantConf { self.tenant_specific_overrides() - .merge(self.conf.default_tenant_conf) + .merge(self.conf.default_tenant_conf.clone()) } pub fn get_checkpoint_distance(&self) -> u64 { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .checkpoint_distance .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance) } pub fn get_checkpoint_timeout(&self) -> Duration { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .checkpoint_timeout .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout) } pub fn get_compaction_target_size(&self) -> u64 { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .compaction_target_size .unwrap_or(self.conf.default_tenant_conf.compaction_target_size) } pub fn get_compaction_period(&self) -> Duration { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .compaction_period .unwrap_or(self.conf.default_tenant_conf.compaction_period) } pub fn get_compaction_threshold(&self) -> usize { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .compaction_threshold .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) } pub fn get_gc_horizon(&self) -> u64 { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .gc_horizon .unwrap_or(self.conf.default_tenant_conf.gc_horizon) } pub fn get_gc_period(&self) -> Duration { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .gc_period .unwrap_or(self.conf.default_tenant_conf.gc_period) } pub fn get_image_creation_threshold(&self) -> usize { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .image_creation_threshold .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } pub fn get_pitr_interval(&self) -> Duration { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .pitr_interval .unwrap_or(self.conf.default_tenant_conf.pitr_interval) } pub fn get_trace_read_requests(&self) -> bool { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .trace_read_requests .unwrap_or(self.conf.default_tenant_conf.trace_read_requests) } pub fn get_min_resident_size_override(&self) -> Option { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .min_resident_size_override .or(self.conf.default_tenant_conf.min_resident_size_override) } pub fn get_heatmap_period(&self) -> Option { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); let heatmap_period = tenant_conf .heatmap_period .unwrap_or(self.conf.default_tenant_conf.heatmap_period); @@ -2308,6 +2316,7 @@ impl Tenant { pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) { self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf; + self.tenant_conf_updated(); // Don't hold self.timelines.lock() during the notifies. // There's no risk of deadlock right now, but there could be if we consolidate // mutexes in struct Timeline in the future. @@ -2319,6 +2328,7 @@ impl Tenant { pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) { *self.tenant_conf.write().unwrap() = new_conf; + self.tenant_conf_updated(); // Don't hold self.timelines.lock() during the notifies. // There's no risk of deadlock right now, but there could be if we consolidate // mutexes in struct Timeline in the future. @@ -2328,6 +2338,24 @@ impl Tenant { } } + fn get_timeline_get_throttle_config( + psconf: &'static PageServerConf, + overrides: &TenantConfOpt, + ) -> throttle::Config { + overrides + .timeline_get_throttle + .clone() + .unwrap_or(psconf.default_tenant_conf.timeline_get_throttle.clone()) + } + + pub(crate) fn tenant_conf_updated(&self) { + let conf = { + let guard = self.tenant_conf.read().unwrap(); + Self::get_timeline_get_throttle_config(self.conf, &guard.tenant_conf) + }; + self.timeline_get_throttle.reconfigure(conf) + } + /// Helper function to create a new Timeline struct. /// /// The returned Timeline is in Loading state. The caller is responsible for @@ -2454,7 +2482,6 @@ impl Tenant { // using now here is good enough approximation to catch tenants with really long // activation times. constructed_at: Instant::now(), - tenant_conf: Arc::new(RwLock::new(attached_conf)), timelines: Mutex::new(HashMap::new()), timelines_creating: Mutex::new(HashSet::new()), gc_cs: tokio::sync::Mutex::new(()), @@ -2469,6 +2496,11 @@ impl Tenant { delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())), cancel: CancellationToken::default(), gate: Gate::default(), + timeline_get_throttle: Arc::new(throttle::Throttle::new( + Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf), + &crate::metrics::tenant_throttling::TIMELINE_GET, + )), + tenant_conf: Arc::new(RwLock::new(attached_conf)), } } @@ -3224,6 +3256,7 @@ impl Tenant { TimelineResources { remote_client, deletion_queue_client: self.deletion_queue_client.clone(), + timeline_get_throttle: self.timeline_get_throttle.clone(), } } @@ -3495,7 +3528,7 @@ impl Tenant { } pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt { - self.tenant_conf.read().unwrap().tenant_conf + self.tenant_conf.read().unwrap().tenant_conf.clone() } } @@ -3654,6 +3687,7 @@ pub(crate) mod harness { gc_feedback: Some(tenant_conf.gc_feedback), heatmap_period: Some(tenant_conf.heatmap_period), lazy_slru_download: Some(tenant_conf.lazy_slru_download), + timeline_get_throttle: Some(tenant_conf.timeline_get_throttle), } } } @@ -3757,7 +3791,7 @@ pub(crate) mod harness { TenantState::Loading, self.conf, AttachedTenantConf::try_from(LocationConf::attached_single( - TenantConfOpt::from(self.tenant_conf), + TenantConfOpt::from(self.tenant_conf.clone()), self.generation, &ShardParameters::default(), )) diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 961decd247e5..5c88d30caf89 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -9,8 +9,8 @@ //! may lead to a data loss. //! use anyhow::bail; -use pageserver_api::models; use pageserver_api::models::EvictionPolicy; +use pageserver_api::models::{self, ThrottleConfig}; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}; use serde::de::IntoDeserializer; use serde::{Deserialize, Serialize}; @@ -285,7 +285,7 @@ impl Default for LocationConf { /// /// For storing and transmitting individual tenant's configuration, see /// TenantConfOpt. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TenantConf { // Flush out an inmemory layer, if it's holding WAL older than this // This puts a backstop on how much WAL needs to be re-digested if the @@ -348,11 +348,13 @@ pub struct TenantConf { /// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup pub lazy_slru_download: bool, + + pub timeline_get_throttle: pageserver_api::models::ThrottleConfig, } /// Same as TenantConf, but this struct preserves the information about /// which parameters are set and which are not. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] @@ -437,6 +439,9 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub lazy_slru_download: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub timeline_get_throttle: Option, } impl TenantConfOpt { @@ -485,6 +490,10 @@ impl TenantConfOpt { lazy_slru_download: self .lazy_slru_download .unwrap_or(global_conf.lazy_slru_download), + timeline_get_throttle: self + .timeline_get_throttle + .clone() + .unwrap_or(global_conf.timeline_get_throttle), } } } @@ -524,6 +533,7 @@ impl Default for TenantConf { gc_feedback: false, heatmap_period: Duration::ZERO, lazy_slru_download: false, + timeline_get_throttle: crate::tenant::throttle::Config::disabled(), } } } @@ -596,6 +606,7 @@ impl From for models::TenantConfig { gc_feedback: value.gc_feedback, heatmap_period: value.heatmap_period.map(humantime), lazy_slru_download: value.lazy_slru_download, + timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from), } } } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 90c442464fef..b7f4723702aa 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -484,7 +484,7 @@ pub async fn init_tenant_mgr( TenantSlot::Secondary(SecondaryTenant::new( tenant_shard_id, location_conf.shard, - location_conf.tenant_conf, + location_conf.tenant_conf.clone(), &SecondaryLocationConfig { warm: false }, )), ); @@ -805,7 +805,7 @@ pub(crate) async fn set_new_tenant_config( // API to use is the location_config/ endpoint, which lets the caller provide // the full LocationConf. let location_conf = LocationConf::attached_single( - new_tenant_conf, + new_tenant_conf.clone(), tenant.generation, &ShardParameters::default(), ); @@ -1466,7 +1466,7 @@ impl TenantManager { attach_mode: AttachmentMode::Single, }), shard: child_shard_identity, - tenant_conf: parent_tenant_conf, + tenant_conf: parent_tenant_conf.clone(), }; self.upsert_location( diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 2c8ced4eb774..c466ac0c2461 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -133,7 +133,7 @@ impl SecondaryTenant { } pub(crate) fn set_tenant_conf(&self, config: &TenantConfOpt) { - *(self.tenant_conf.lock().unwrap()) = *config; + *(self.tenant_conf.lock().unwrap()) = config.clone(); } /// For API access: generate a LocationConfig equivalent to the one that would be used to @@ -144,7 +144,7 @@ impl SecondaryTenant { let conf = models::LocationConfigSecondary { warm: conf.warm }; - let tenant_conf = *self.tenant_conf.lock().unwrap(); + let tenant_conf = self.tenant_conf.lock().unwrap().clone(); models::LocationConfig { mode: models::LocationConfigMode::Secondary, generation: None, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 950cc46e7110..45ce6c938155 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; +use crate::tenant::throttle::Stats; use crate::tenant::timeline::CompactionError; use crate::tenant::{Tenant, TenantState}; use tokio_util::sync::CancellationToken; @@ -139,6 +140,8 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // How many errors we have seen consequtively let mut error_run_count = 0; + let mut last_throttle_flag_reset_at = Instant::now(); + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download); @@ -203,6 +206,27 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { walredo_mgr.maybe_quiesce(period * 10); } + // TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off, + // so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens. + info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| { + let now = Instant::now(); + let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now); + let Stats { count_accounted, count_throttled, sum_throttled_usecs } = tenant.timeline_get_throttle.reset_stats(); + if count_throttled == 0 { + return; + } + let allowed_rps = tenant.timeline_get_throttle.steady_rps(); + let delta = now - prev; + warn!( + n_seconds=%format_args!("{:.3}", + delta.as_secs_f64()), + count_accounted, + count_throttled, + sum_throttled_usecs, + allowed_rps=%format_args!("{allowed_rps:.0}"), + "shard was throttled in the last n_seconds") + }); + // Sleep if tokio::time::timeout(sleep_duration, cancel.cancelled()) .await diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs new file mode 100644 index 000000000000..6894a88b930d --- /dev/null +++ b/pageserver/src/tenant/throttle.rs @@ -0,0 +1,162 @@ +use std::{ + str::FromStr, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use arc_swap::ArcSwap; +use enumset::EnumSet; +use tracing::error; + +use crate::{context::RequestContext, task_mgr::TaskKind}; + +/// Throttle for `async` functions. +/// +/// Runtime reconfigurable. +/// +/// To share a throttle among multiple entities, wrap it in an [`Arc`]. +/// +/// The intial use case for this is tenant-wide throttling of getpage@lsn requests. +pub struct Throttle { + inner: ArcSwap, + metric: M, + /// will be turned into [`Stats::count_accounted`] + count_accounted: AtomicU64, + /// will be turned into [`Stats::count_throttled`] + count_throttled: AtomicU64, + /// will be turned into [`Stats::sum_throttled_usecs`] + sum_throttled_usecs: AtomicU64, +} + +pub struct Inner { + task_kinds: EnumSet, + rate_limiter: Arc, + config: Config, +} + +pub type Config = pageserver_api::models::ThrottleConfig; + +pub struct Observation { + pub wait_time: Duration, +} +pub trait Metric { + fn observe_throttling(&self, observation: &Observation); +} + +/// See [`Throttle::reset_stats`]. +pub struct Stats { + // Number of requests that were subject to throttling, i.e., requests of the configured [`Config::task_kinds`]. + pub count_accounted: u64, + // Subset of the `accounted` requests that were actually throttled. + // Note that the numbers are stored as two independent atomics, so, there might be a slight drift. + pub count_throttled: u64, + // Sum of microseconds that throttled requests spent waiting for throttling. + pub sum_throttled_usecs: u64, +} + +impl Throttle +where + M: Metric, +{ + pub fn new(config: Config, metric: M) -> Self { + Self { + inner: ArcSwap::new(Arc::new(Self::new_inner(config))), + metric, + count_accounted: AtomicU64::new(0), + count_throttled: AtomicU64::new(0), + sum_throttled_usecs: AtomicU64::new(0), + } + } + fn new_inner(config: Config) -> Inner { + let Config { + task_kinds, + initial, + refill_interval, + refill_amount, + max, + fair, + } = &config; + let task_kinds: EnumSet = task_kinds + .iter() + .filter_map(|s| match TaskKind::from_str(s) { + Ok(v) => Some(v), + Err(e) => { + // TODO: avoid this failure mode + error!( + "cannot parse task kind, ignoring for rate limiting {}", + utils::error::report_compact_sources(&e) + ); + None + } + }) + .collect(); + Inner { + task_kinds, + rate_limiter: Arc::new( + leaky_bucket::RateLimiter::builder() + .initial(*initial) + .interval(*refill_interval) + .refill(refill_amount.get()) + .max(*max) + .fair(*fair) + .build(), + ), + config, + } + } + pub fn reconfigure(&self, config: Config) { + self.inner.store(Arc::new(Self::new_inner(config))); + } + + /// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling. + /// This method allows retrieving & resetting that flag. + /// Useful for periodic reporting. + pub fn reset_stats(&self) -> Stats { + let count_accounted = self.count_accounted.swap(0, Ordering::Relaxed); + let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed); + let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed); + Stats { + count_accounted, + count_throttled, + sum_throttled_usecs, + } + } + + /// See [`Config::steady_rps`]. + pub fn steady_rps(&self) -> f64 { + self.inner.load().config.steady_rps() + } + + pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) { + let inner = self.inner.load_full(); // clones the `Inner` Arc + if !inner.task_kinds.contains(ctx.task_kind()) { + return; + }; + let start = std::time::Instant::now(); + let mut did_throttle = false; + let acquire = inner.rate_limiter.acquire(key_count); + // turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate + let acquire = tokio::task::unconstrained(acquire); + let mut acquire = std::pin::pin!(acquire); + std::future::poll_fn(|cx| { + use std::future::Future; + let poll = acquire.as_mut().poll(cx); + did_throttle = did_throttle || poll.is_pending(); + poll + }) + .await; + self.count_accounted.fetch_add(1, Ordering::Relaxed); + if did_throttle { + self.count_throttled.fetch_add(1, Ordering::Relaxed); + let now = Instant::now(); + let wait_time = now - start; + self.sum_throttled_usecs + .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed); + let observation = Observation { wait_time }; + self.metric.observe_throttling(&observation); + } + } +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7f7713a6c6e5..cd88327f3440 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -164,6 +164,9 @@ fn drop_wlock(rlock: tokio::sync::RwLockWriteGuard<'_, T>) { pub struct TimelineResources { pub remote_client: Option, pub deletion_queue_client: DeletionQueueClient, + pub timeline_get_throttle: Arc< + crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>, + >, } pub struct Timeline { @@ -355,6 +358,11 @@ pub struct Timeline { /// /// Timeline deletion will acquire both compaction and gc locks in whatever order. gc_lock: tokio::sync::Mutex<()>, + + /// Cloned from [`super::Tenant::timeline_get_throttle`] on construction. + timeline_get_throttle: Arc< + crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>, + >, } pub struct WalReceiverInfo { @@ -615,6 +623,8 @@ impl Timeline { return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN"))); } + self.timeline_get_throttle.throttle(ctx, 1).await; + // This check is debug-only because of the cost of hashing, and because it's a double-check: we // already checked the key against the shard_identity when looking up the Timeline from // page_service. @@ -714,6 +724,10 @@ impl Timeline { return Err(GetVectoredError::Oversized(key_count)); } + self.timeline_get_throttle + .throttle(ctx, key_count as usize) + .await; + let _timer = crate::metrics::GET_VECTORED_LATENCY .for_task_kind(ctx.task_kind()) .map(|t| t.start_timer()); @@ -1335,49 +1349,49 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10; // Private functions impl Timeline { pub(crate) fn get_lazy_slru_download(&self) -> bool { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .lazy_slru_download .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download) } fn get_checkpoint_distance(&self) -> u64 { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .checkpoint_distance .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance) } fn get_checkpoint_timeout(&self) -> Duration { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .checkpoint_timeout .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout) } fn get_compaction_target_size(&self) -> u64 { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .compaction_target_size .unwrap_or(self.conf.default_tenant_conf.compaction_target_size) } fn get_compaction_threshold(&self) -> usize { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .compaction_threshold .unwrap_or(self.conf.default_tenant_conf.compaction_threshold) } fn get_image_creation_threshold(&self) -> usize { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .image_creation_threshold .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } fn get_eviction_policy(&self) -> EvictionPolicy { - let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .eviction_policy .unwrap_or(self.conf.default_tenant_conf.eviction_policy) @@ -1393,7 +1407,7 @@ impl Timeline { } fn get_gc_feedback(&self) -> bool { - let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf; + let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf.clone(); tenant_conf .gc_feedback .unwrap_or(self.conf.default_tenant_conf.gc_feedback) @@ -1555,6 +1569,8 @@ impl Timeline { compaction_lock: tokio::sync::Mutex::default(), gc_lock: tokio::sync::Mutex::default(), + + timeline_get_throttle: resources.timeline_get_throttle, }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index dc499197b0a3..d2e9eda906b9 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -419,6 +419,7 @@ impl DeleteTimelineFlow { TimelineResources { remote_client, deletion_queue_client, + timeline_get_throttle: tenant.timeline_get_throttle.clone(), }, // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 7cdc3146580b..1aaded222c9f 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -176,6 +176,14 @@ def test_fully_custom_config(positive_env: NeonEnv): "lazy_slru_download": True, "max_lsn_wal_lag": 230000, "min_resident_size_override": 23, + "timeline_get_throttle": { + "task_kinds": ["PageRequestHandler"], + "fair": True, + "initial": 0, + "refill_interval": "1s", + "refill_amount": 1000, + "max": 1000, + }, "trace_read_requests": True, "walreceiver_connect_timeout": "13m", }