Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

page_service: don't count time spent in Batcher towards smgr latency metrics #10075

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 122 additions & 48 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use postgres_backend::{is_expected_io_error, QueryError};
use pq_proto::framed::ConnectionError;
use strum::{EnumCount, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use tracing::warn;
use utils::id::TimelineId;

/// Prometheus histogram buckets (in seconds) for operations in the critical
Expand Down Expand Up @@ -1225,32 +1224,58 @@ pub(crate) mod virtual_file_io_engine {

pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
pub(crate) struct SmgrOpTimerInner {
global_latency_histo: Histogram,
global_execution_latency_histo: Histogram,
per_timeline_execution_latency_histo: Option<Histogram>,

// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<Histogram>,
global_batch_wait_time: Histogram,
per_timeline_batch_wait_time: Histogram,

global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,

start: Instant,
throttled: Duration,
op: SmgrQueryType,
timings: SmgrOpTimerState,
}

#[derive(Debug)]
enum SmgrOpTimerState {
Received {
received_at: Instant,
},
ThrottleDoneExecutionStarting {
received_at: Instant,
throttle_started_at: Instant,
started_execution_at: Instant,
},
}

pub(crate) struct SmgrOpFlushInProgress {
base: Instant,
flush_started_at: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}

impl SmgrOpTimer {
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
let Some(throttle) = throttle else {
return;
};
pub(crate) fn observe_throttle_done_execution_starting(&mut self, throttle: &ThrottleResult) {
let inner = self.0.as_mut().expect("other public methods consume self");
inner.throttled += *throttle;
match (&mut inner.timings, throttle) {
(SmgrOpTimerState::Received { received_at }, throttle) => match throttle {
ThrottleResult::NotThrottled { start } => {
inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at: *received_at,
throttle_started_at: *start,
started_execution_at: *start,
};
}
ThrottleResult::Throttled { start, end } => {
inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at: *start,
throttle_started_at: *start,
started_execution_at: *end,
};
}
},
(x, _) => panic!("called in unexpected state: {x:?}"),
}
}

pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
Expand All @@ -1263,7 +1288,7 @@ impl SmgrOpTimer {
..
} = inner;
SmgrOpFlushInProgress {
base: flush_start,
flush_started_at: flush_start,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
}
Expand All @@ -1274,32 +1299,42 @@ impl SmgrOpTimer {
let inner = self.0.take()?;

let now = Instant::now();
let elapsed = now - inner.start;

let elapsed = match elapsed.checked_sub(inner.throttled) {
Some(elapsed) => elapsed,
None => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[inner.op];
rate_limit.call(|| {
warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
});
elapsed // un-throttled time, more info than just saturating to 0

let batch;
let execution;
let throttle;
match inner.timings {
SmgrOpTimerState::Received { received_at } => {
batch = (now - received_at).as_secs_f64();
// TODO: use label for dropped requests.
// This is quite rare in practice, only during tenant/pageservers shutdown.
throttle = Duration::ZERO;
execution = Duration::ZERO.as_secs_f64();
}
};
SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at,
throttle_started_at,
started_execution_at,
} => {
batch = (throttle_started_at - received_at).as_secs_f64();
throttle = started_execution_at - throttle_started_at;
execution = (now - started_execution_at).as_secs_f64();
}
}

// update time spent in batching
inner.global_batch_wait_time.observe(batch);
inner.per_timeline_batch_wait_time.observe(batch);

let elapsed = elapsed.as_secs_f64();
// time spent in throttle metric is updated by throttle impl
let _ = throttle;

inner.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed);
// update metrics for execution latency
inner.global_execution_latency_histo.observe(execution);
if let Some(per_timeline_execution_latency_histo) =
&inner.per_timeline_execution_latency_histo
{
per_timeline_execution_latency_histo.observe(execution);
}

Some((now, inner))
Expand All @@ -1325,12 +1360,12 @@ impl SmgrOpFlushInProgress {
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let elapsed = now - self.base;
let elapsed = now - self.flush_started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.base = now;
self.flush_started_at = now;
},
|mut observe| {
observe();
Expand Down Expand Up @@ -1377,6 +1412,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_batch_size: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
global_batch_wait_time: Histogram,
per_timeline_batch_wait_time: Histogram,
}

static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
Expand All @@ -1399,12 +1436,15 @@ static SMGR_QUERY_STARTED_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|
.expect("failed to define a metric")
});

// Alias so all histograms recording per-timeline smgr timings use the same buckets.
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS: &[f64] = CRITICAL_OP_BUCKETS;

static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_smgr_query_seconds",
"Time spent on smgr query handling, aggegated by query type and tenant/timeline.",
"Time spent _executing_ smgr query handling, excluding batch and throttle delays.",
&["smgr_query_type", "tenant_id", "shard_id", "timeline_id"],
CRITICAL_OP_BUCKETS.into(),
SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS.into(),
)
.expect("failed to define a metric")
});
Expand Down Expand Up @@ -1462,7 +1502,7 @@ static SMGR_QUERY_TIME_GLOBAL_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
static SMGR_QUERY_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_smgr_query_seconds_global",
"Time spent on smgr query handling, aggregated by query type.",
"Like pageserver_smgr_query_seconds, but aggregated to instance level.",
&["smgr_query_type"],
SMGR_QUERY_TIME_GLOBAL_BUCKETS.clone(),
)
Expand Down Expand Up @@ -1559,6 +1599,25 @@ static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy
.expect("failed to define a metric")
});

static PAGE_SERVICE_SMGR_BATCH_WAIT_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_page_service_pagestream_batch_wait_time_seconds",
"Time a request spent waiting in its batch until the batch moved to throttle&execution.",
&["tenant_id", "shard_id", "timeline_id"],
SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS.into(),
)
.expect("failed to define a metric")
});

static PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_service_pagestream_batch_wait_time_seconds_global",
"Like pageserver_page_service_pagestream_batch_wait_time_seconds, but aggregated to instance level.",
SMGR_QUERY_TIME_GLOBAL_BUCKETS.to_vec(),
)
.expect("failed to define a metric")
});

impl SmgrQueryTimePerTimeline {
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
Expand Down Expand Up @@ -1599,6 +1658,11 @@ impl SmgrQueryTimePerTimeline {
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();

let global_batch_wait_time = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL.clone();
let per_timeline_batch_wait_time = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();

let global_flush_in_progress_micros =
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
Expand All @@ -1614,9 +1678,11 @@ impl SmgrQueryTimePerTimeline {
per_timeline_batch_size,
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
global_batch_wait_time,
per_timeline_batch_wait_time,
}
}
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, received_at: Instant) -> SmgrOpTimer {
self.global_started[op as usize].inc();

let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) {
Expand All @@ -1627,15 +1693,15 @@ impl SmgrQueryTimePerTimeline {
};

SmgrOpTimer(Some(SmgrOpTimerInner {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
start: started_at,
op,
throttled: Duration::ZERO,
global_execution_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_execution_latency_histo: per_timeline_latency_histo,
timings: SmgrOpTimerState::Received { received_at },
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
global_batch_wait_time: self.global_batch_wait_time.clone(),
per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(),
}))
}

Expand Down Expand Up @@ -2889,6 +2955,11 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
let _ = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
}
}

Expand Down Expand Up @@ -2919,6 +2990,7 @@ use crate::context::{PageContentKind, RequestContext};
use crate::task_mgr::TaskKind;
use crate::tenant::mgr::TenantSlot;
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::throttle::ThrottleResult;
use crate::tenant::Timeline;

/// Maintain a per timeline gauge in addition to the global gauge.
Expand Down Expand Up @@ -3773,6 +3845,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
&REMOTE_ONDEMAND_DOWNLOADED_BYTES,
&CIRCUIT_BREAKERS_BROKEN,
&CIRCUIT_BREAKERS_UNBROKEN,
&PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL,
]
.into_iter()
.for_each(|c| {
Expand Down Expand Up @@ -3820,6 +3893,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
&WAL_REDO_BYTES_HISTOGRAM,
&WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
&PAGE_SERVICE_BATCH_SIZE_GLOBAL,
&PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL,
]
.into_iter()
.for_each(|h| {
Expand Down
13 changes: 9 additions & 4 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,10 @@ enum BatchedFeMessage {
}

impl BatchedFeMessage {
async fn throttle(&mut self, cancel: &CancellationToken) -> Result<(), QueryError> {
async fn throttle_and_record_start_processing(
&mut self,
cancel: &CancellationToken,
) -> Result<(), QueryError> {
let (shard, tokens, timers) = match self {
BatchedFeMessage::Exists { shard, timer, .. }
| BatchedFeMessage::Nblocks { shard, timer, .. }
Expand Down Expand Up @@ -603,7 +606,7 @@ impl BatchedFeMessage {
}
};
for timer in timers {
timer.deduct_throttle(&throttled);
timer.observe_throttle_done_execution_starting(&throttled);
}
Ok(())
}
Expand Down Expand Up @@ -1230,7 +1233,7 @@ impl PageServerHandler {
}
};

if let Err(cancelled) = msg.throttle(&self.cancel).await {
if let Err(cancelled) = msg.throttle_and_record_start_processing(&self.cancel).await {
break cancelled;
}

Expand Down Expand Up @@ -1397,7 +1400,9 @@ impl PageServerHandler {
return Err(e);
}
};
batch.throttle(&self.cancel).await?;
batch
.throttle_and_record_start_processing(&self.cancel)
.await?;
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.await?;
}
Expand Down
17 changes: 11 additions & 6 deletions pageserver/src/tenant/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ pub struct Stats {
pub sum_throttled_usecs: u64,
}

pub enum ThrottleResult {
NotThrottled { start: Instant },
problame marked this conversation as resolved.
Show resolved Hide resolved
Throttled { start: Instant, end: Instant },
}

impl<M> Throttle<M>
where
M: Metric,
Expand Down Expand Up @@ -122,15 +127,15 @@ where
self.inner.load().rate_limiter.steady_rps()
}

pub async fn throttle(&self, key_count: usize) -> Option<Duration> {
pub async fn throttle(&self, key_count: usize) -> ThrottleResult {
let inner = self.inner.load_full(); // clones the `Inner` Arc

let start = std::time::Instant::now();

if !inner.enabled {
return None;
return ThrottleResult::NotThrottled { start };
}

let start = std::time::Instant::now();

self.metric.accounting_start();
self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
let did_throttle = inner.rate_limiter.acquire(key_count).await;
Expand All @@ -145,9 +150,9 @@ where
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time };
self.metric.observe_throttling(&observation);
Some(wait_time)
ThrottleResult::Throttled { start, end: now }
} else {
None
ThrottleResult::NotThrottled { start }
}
}
}
Loading
Loading