Skip to content

Commit

Permalink
add a counter group metric type
Browse files Browse the repository at this point in the history
Adds a new metric type designed to more efficiently represent the
per-core metrics by reducing the number of metric entries.
  • Loading branch information
brayniac committed Dec 13, 2024
1 parent 3de5e45 commit 595aa2f
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 155 deletions.
4 changes: 2 additions & 2 deletions src/common/bpf/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct Builder<T: 'static + SkelBuilder<'static>> {
counters: Vec<(&'static str, Vec<&'static LazyCounter>)>,
histograms: Vec<(&'static str, &'static RwLockHistogram)>,
maps: Vec<(&'static str, Vec<u64>)>,
cpu_counters: Vec<(&'static str, Vec<&'static LazyCounter>, ScopedCounters)>,
cpu_counters: Vec<(&'static str, Vec<&'static LazyCounter>, Vec<&'static CounterGroup>)>,
perf_events: Vec<(&'static str, PerfEvent)>,
}

Expand Down Expand Up @@ -280,7 +280,7 @@ where
mut self,
name: &'static str,
totals: Vec<&'static LazyCounter>,
individual: ScopedCounters,
individual: Vec<&'static CounterGroup>,
) -> Self {
self.cpu_counters.push((name, totals, individual));
self
Expand Down
8 changes: 4 additions & 4 deletions src/common/bpf/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl<'a> Counters<'a> {
pub struct CpuCounters<'a> {
counter_map: CounterMap<'a>,
totals: Vec<&'static LazyCounter>,
individual: ScopedCounters,
individual: Vec<&'static CounterGroup>,
values: Vec<u64>,
}

Expand All @@ -138,7 +138,7 @@ impl<'a> CpuCounters<'a> {
pub fn new(
map: &'a Map,
totals: Vec<&'static LazyCounter>,
individual: ScopedCounters,
individual: Vec<&'static CounterGroup>,
) -> Self {
// we need temporary buffer so we can total up the per-CPU values
let values = vec![0; totals.len()];
Expand Down Expand Up @@ -167,14 +167,14 @@ impl<'a> CpuCounters<'a> {

// iterate through and increment our local value for each cpu counter
for cpu in 0..MAX_CPUS {
for idx in 0..self.totals.len() {
for idx in 0..self.individual.len() {
let value = counters[idx + cpu * bank_width];

// add this CPU's counter to the combined value for this counter
self.values[idx] = self.values[idx].wrapping_add(value);

// set this CPU's counter to the new value
let _ = self.individual.set(cpu, idx, value);
let _ = self.individual[idx].set(cpu, value);
}
}

Expand Down
63 changes: 63 additions & 0 deletions src/common/counters/group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use metriken::Metric;
use parking_lot::RwLock;
use std::sync::OnceLock;
use thiserror::Error;
use metriken::Value;

#[derive(Error, Debug, PartialEq)]
pub enum CounterGroupError {
#[error("the index is higher than the counter group size")]
InvalidIndex,
}

/// A group of counters that's protected by a reader-writer lock.
pub struct CounterGroup {
inner: OnceLock<RwLock<Vec<u64>>>,
entries: usize,
}

impl Metric for CounterGroup {
fn as_any(&self) -> std::option::Option<&(dyn std::any::Any + 'static)> {
Some(self)
}

fn value(&self) -> std::option::Option<metriken::Value<'_>> {
Some(Value::Other(self))
}
}

impl CounterGroup {
/// Create a new counter group
pub const fn new(entries: usize) -> Self {
Self {
inner: OnceLock::new(),
entries,
}
}

/// Sets the counter at a given index to the provided value
pub fn set(&self, idx: usize, value: u64) -> Result<(), CounterGroupError> {
if idx >= self.entries {
Err(CounterGroupError::InvalidIndex)
} else {
let mut inner = self.get_or_init().write();

inner[idx] = value;

Ok(())
}
}

/// Load the counter values
pub fn load(&self) -> Option<Vec<u64>> {
self.inner.get().map(|v| v.read().clone())
}

pub fn len(&self) -> usize {
self.entries
}

fn get_or_init(&self) -> &RwLock<Vec<u64>> {
self.inner.get_or_init(|| vec![0; self.entries].into())
}
}
2 changes: 2 additions & 0 deletions src/common/counters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#![allow(unused_imports)]

mod dynamic;
mod group;
mod scoped;

pub use dynamic::{DynamicCounter, DynamicCounterBuilder};
pub use group::{CounterGroup, CounterGroupError};
pub use scoped::ScopedCounters;
155 changes: 93 additions & 62 deletions src/exposition/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::HISTOGRAM_GROUPING_POWER;
use crate::common::{HISTOGRAM_GROUPING_POWER, CounterGroup};
use crate::{Arc, Config, Sampler};
use axum::extract::State;
use axum::routing::get;
Expand Down Expand Up @@ -136,76 +136,94 @@ async fn prometheus(State(state): State<Arc<AppState>>) -> String {
data.push(format!(
"# TYPE {name} counter\n{name_with_metadata} {value} {timestamp}"
));
continue;
}
Some(Value::Gauge(value)) => {
data.push(format!(
"# TYPE {name} gauge\n{name_with_metadata} {value} {timestamp}"
));
continue;
}
Some(_) => {}
None => {
continue;
}
}

let any = match metric.as_any() {
Some(any) => any,
None => {
continue;
}
};

if let Some(histogram) = any.downcast_ref::<RwLockHistogram>() {
if state.config.prometheus().histograms() {
if let Some(histogram) = histogram.load() {
let current = HISTOGRAM_GROUPING_POWER;
let target = state.config.prometheus().histogram_grouping_power();

// downsample the histogram if necessary
let downsampled: Option<histogram::Histogram> = if current == target {
// the powers matched, we don't need to downsample
None
} else {
Some(histogram.downsample(target).unwrap())
};

// reassign to either use the downsampled histogram or the original
let histogram = if let Some(histogram) = downsampled.as_ref() {
histogram
} else {
&histogram
};

// we need to export a total count (free-running)
let mut count = 0;
// we also need to export a total sum of all observations
// which is also free-running
let mut sum = 0;

let mut entry = format!("# TYPE {name}_distribution histogram\n");
for bucket in histogram {
// add this bucket's sum of observations
sum += bucket.count() * bucket.end();

// add the count to the aggregate
count += bucket.count();

entry += &format!(
"{name}_distribution_bucket{{le=\"{}\"}} {count} {timestamp}\n",
bucket.end()
);
Some(Value::Other(any)) => {
if let Some(histogram) = any.downcast_ref::<RwLockHistogram>() {
if state.config.prometheus().histograms() {
if let Some(histogram) = histogram.load() {
let current = HISTOGRAM_GROUPING_POWER;
let target = state.config.prometheus().histogram_grouping_power();

// downsample the histogram if necessary
let downsampled: Option<histogram::Histogram> = if current == target {
// the powers matched, we don't need to downsample
None
} else {
Some(histogram.downsample(target).unwrap())
};

// reassign to either use the downsampled histogram or the original
let histogram = if let Some(histogram) = downsampled.as_ref() {
histogram
} else {
&histogram
};

// we need to export a total count (free-running)
let mut count = 0;
// we also need to export a total sum of all observations
// which is also free-running
let mut sum = 0;

let mut entry = format!("# TYPE {name}_distribution histogram\n");
for bucket in histogram {
// add this bucket's sum of observations
sum += bucket.count() * bucket.end();

// add the count to the aggregate
count += bucket.count();

entry += &format!(
"{name}_distribution_bucket{{le=\"{}\"}} {count} {timestamp}\n",
bucket.end()
);
}

entry += &format!(
"{name}_distribution_bucket{{le=\"+Inf\"}} {count} {timestamp}\n"
);
entry += &format!("{name}_distribution_count {count} {timestamp}\n");
entry += &format!("{name}_distribution_sum {sum} {timestamp}");

data.push(entry);
}
}
} else if let Some(counters) = any.downcast_ref::<CounterGroup>() {
if let Some(counters) = counters.load() {
let mut entry = format!("# TYPE {name} counter");

let metadata: Vec<String> = metric
.metadata()
.iter()
.map(|(key, value)| format!("{key}=\"{value}\""))
.collect();

let metadata = metadata.join(", ");

for (id, value) in counters.iter().enumerate() {
if *value == 0 {
continue;
}

if metadata.is_empty() {
entry += &format!("\n{name}{{id=\"{id}\"}} {value} {timestamp}");
} else {
entry += &format!(
"\n{name}{{{metadata}, id=\"{id}\"}} {value} {timestamp}"
);
}
}

data.push(entry);
}

entry +=
&format!("{name}_distribution_bucket{{le=\"+Inf\"}} {count} {timestamp}\n");
entry += &format!("{name}_distribution_count {count} {timestamp}\n");
entry += &format!("{name}_distribution_sum {sum} {timestamp}");

data.push(entry);
}
}
_ => {}
}
}

Expand Down Expand Up @@ -253,6 +271,19 @@ fn simple_stats(quoted: bool) -> Vec<String> {
Some(Value::Gauge(value)) => {
data.push(format!("{q}{simple_name}{q}: {value}"));
}
Some(Value::Other(any)) => {
if let Some(counters) = any.downcast_ref::<CounterGroup>() {
if let Some(counters) = counters.load() {
for (id, value) in counters.iter().enumerate() {
if *value == 0 {
continue;
}

data.push(format!("{q}{simple_name}/{id}{q}: {value}"));
}
}
}
}
Some(_) | None => {}
}
}
Expand Down
21 changes: 1 addition & 20 deletions src/samplers/cpu/linux/frequency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,8 @@ fn init(config: Arc<Config>) -> SamplerResult {
return Ok(None);
}

let cpus = crate::common::linux::cpus()?;

let totals = vec![&CPU_APERF, &CPU_MPERF, &CPU_TSC];

let metrics = ["cpu/aperf", "cpu/mperf", "cpu/tsc"];

let mut individual = ScopedCounters::new();

for cpu in cpus {
for metric in metrics {
individual.push(
cpu,
DynamicCounterBuilder::new(metric)
.metadata("id", format!("{}", cpu))
.formatter(cpu_metric_percore_formatter)
.build(),
);
}
}

println!("initializing bpf for: {NAME}");
let individual = vec![&CPU_APERF_PERCORE, &CPU_MPERF_PERCORE, &CPU_TSC_PERCORE];

let bpf = BpfBuilder::new(ModSkelBuilder::default)
.perf_event("aperf", PerfEvent::msr(MsrId::APERF)?)
Expand Down
21 changes: 1 addition & 20 deletions src/samplers/cpu/linux/perf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,8 @@ fn init(config: Arc<Config>) -> SamplerResult {
return Ok(None);
}

let cpus = crate::common::linux::cpus()?;

let totals = vec![&CPU_CYCLES, &CPU_INSTRUCTIONS];

let metrics = ["cpu/cycles", "cpu/instructions"];

let mut individual = ScopedCounters::new();

for cpu in cpus {
for metric in metrics {
individual.push(
cpu,
DynamicCounterBuilder::new(metric)
.metadata("id", format!("{}", cpu))
.formatter(cpu_metric_percore_formatter)
.build(),
);
}
}

println!("initializing bpf for: {NAME}");
let individual = vec![&CPU_CYCLES_PERCORE, &CPU_INSTRUCTIONS_PERCORE];

let bpf = BpfBuilder::new(ModSkelBuilder::default)
.perf_event("cycles", PerfEvent::cpu_cycles())
Expand Down
Loading

0 comments on commit 595aa2f

Please sign in to comment.