Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add counter group metadata #397

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ tower = { version = "0.5.0", features = ["tokio"] }
tower-http = { version = "0.5.2", features = ["compression-full", "decompression-full"] }
thiserror = "1.0.63"
walkdir = "2.5.0"
plain = "0.2.3"

[target.'cfg(target_os = "linux")'.dependencies]
libbpf-rs = { version = "0.24.2" }
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ Rust >= 1.70.0

#### Linux

A minimum kernel version of 5.5 is required. The following distributions should
A minimum kernel version of 5.8 is required. The following distributions should
work:

* Debian: Bullseye and newer
* Ubuntu: 20.10 and newer
* Red Hat: RHEL 9 and newer
* Amazon Linux: AL2 w/ 5.10 or newer, AL2023
* Debian: Bullseye and newer (5.10+)
* Ubuntu: 20.10 and newer (5.8+)
* Red Hat: RHEL 9 and newer (5.14+)
* Amazon Linux: AL2 w/ 5.10 or newer, AL2023 (6.1+)
* Any rolling-release distro: Arch, Gentoo, ...

In addition to the base dependencies, the following are needed:
Expand Down
26 changes: 25 additions & 1 deletion src/common/bpf/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::common::*;

use libbpf_rs::skel::{OpenSkel, Skel, SkelBuilder};
use libbpf_rs::{MapCore, MapFlags, OpenObject};
use libbpf_rs::{MapCore, MapFlags, OpenObject, RingBuffer, RingBufferBuilder};
use metriken::{LazyCounter, RwLockHistogram};
use perf_event::ReadFormat;

Expand Down Expand Up @@ -63,6 +63,7 @@
)>,
perf_events: Vec<(&'static str, PerfEvent)>,
packed_counters: Vec<(&'static str, &'static CounterGroup)>,
ringbuf_handler: Vec<(&'static str, fn(&[u8]) -> i32)>,
Dismissed Show dismissed Hide dismissed
Dismissed Show dismissed Hide dismissed
}

impl<T: 'static> Builder<T>
Expand All @@ -80,6 +81,7 @@
cpu_counters: Vec::new(),
perf_events: Vec::new(),
packed_counters: Vec::new(),
ringbuf_handler: Vec::new(),
}
}

Expand Down Expand Up @@ -174,6 +176,18 @@
})
.collect();

let ringbuffer: Option<RingBuffer> = if self.ringbuf_handler.is_empty() {
None
} else {
let mut builder = RingBufferBuilder::new();

for (name, handler) in self.ringbuf_handler.into_iter() {
let _ = builder.add(skel.map(name), handler);
}

Some(builder.build().expect("failed to initialize ringbuffer"))
};

debug!(
"initialized perf events for: {} hardware counters",
perf_events.len()
Expand Down Expand Up @@ -217,6 +231,11 @@
// blocking wait until we are notified to start, no cpu consumed
sync.wait_trigger();

// consume all data from ringbuffers
if let Some(ref rb) = ringbuffer {
let _ = rb.consume();
}

// refresh all the metrics

for v in &mut counters {
Expand Down Expand Up @@ -316,4 +335,9 @@
self.packed_counters.push((name, counters));
self
}

pub fn ringbuf_handler(mut self, name: &'static str, handler: fn(&[u8]) -> i32) -> Self {
self.ringbuf_handler.push((name, handler));
self
}
}
11 changes: 11 additions & 0 deletions src/common/bpf/cgroup_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#ifndef CGROUP_INFO_H
#define CGROUP_INFO_H

#define CGROUP_NAME_LEN 256

struct cgroup_info {
int id;
u8 name[CGROUP_NAME_LEN];
};

#endif //CGROUP_INFO_H
35 changes: 31 additions & 4 deletions src/common/counters/group.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use metriken::Metric;
use metriken::Value;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::OnceLock;
use thiserror::Error;

type OnceLockVec<T> = OnceLock<RwLock<Vec<T>>>;

#[derive(Error, Debug, PartialEq)]
pub enum CounterGroupError {
#[error("the index is higher than the counter group size")]
Expand All @@ -12,7 +15,8 @@ pub enum CounterGroupError {

/// A group of counters that's protected by a reader-writer lock.
pub struct CounterGroup {
inner: OnceLock<RwLock<Vec<u64>>>,
values: OnceLockVec<u64>,
metadata: OnceLockVec<HashMap<String, String>>,
entries: usize,
}

Expand All @@ -30,7 +34,8 @@ impl CounterGroup {
/// Create a new counter group
pub const fn new(entries: usize) -> Self {
Self {
inner: OnceLock::new(),
values: OnceLock::new(),
metadata: OnceLock::new(),
entries,
}
}
Expand All @@ -50,14 +55,36 @@ impl CounterGroup {

/// Load the counter values
pub fn load(&self) -> Option<Vec<u64>> {
self.inner.get().map(|v| v.read().clone())
self.values.get().map(|v| v.read().clone())
}

pub fn len(&self) -> usize {
self.entries
}

fn get_or_init(&self) -> &RwLock<Vec<u64>> {
self.inner.get_or_init(|| vec![0; self.entries].into())
self.values.get_or_init(|| vec![0; self.entries].into())
}

pub fn load_metadata(&self, idx: usize) -> Option<HashMap<String, String>> {
match self.metadata.get() {
Some(metadata) => metadata.read().get(idx).cloned(),
None => None,
}
}

pub fn clear_metadata(&self, idx: usize) {
if let Some(metadata) = self.metadata.get() {
let _ = metadata.write().get(idx).cloned();
}
}

pub fn insert_metadata(&self, idx: usize, key: String, value: String) {
let metadata = self
.metadata
.get_or_init(|| vec![HashMap::new(); self.entries].into());
if let Some(metadata) = metadata.write().get_mut(idx) {
metadata.insert(key, value);
}
}
}
36 changes: 30 additions & 6 deletions src/exposition/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async fn prometheus(State(state): State<Arc<AppState>>) -> String {
}
}
} else if let Some(counters) = any.downcast_ref::<CounterGroup>() {
if let Some(counters) = counters.load() {
if let Some(c) = counters.load() {
let mut entry = format!("# TYPE {name} counter");

let metadata: Vec<String> = metric
Expand All @@ -205,17 +205,34 @@ async fn prometheus(State(state): State<Arc<AppState>>) -> String {

let metadata = metadata.join(", ");

for (id, value) in counters.iter().enumerate() {
for (id, value) in c.iter().enumerate() {
if *value == 0 {
continue;
}

if metadata.is_empty() {
let counter_metadata: Vec<String> =
if let Some(md) = counters.load_metadata(id) {
md.iter().map(|(k, v)| format!("{k}=\"{v}\"")).collect()
} else {
Vec::new()
};

let counter_metadata = counter_metadata.join(", ");

if metadata.is_empty() && counter_metadata.is_empty() {
entry += &format!("\n{name}{{id=\"{id}\"}} {value} {timestamp}");
} else {
} else if counter_metadata.is_empty() {
entry += &format!(
"\n{name}{{{metadata}, id=\"{id}\"}} {value} {timestamp}"
);
} else if metadata.is_empty() {
entry += &format!(
"\n{name}{{{counter_metadata}, id=\"{id}\"}} {value} {timestamp}"
);
} else {
entry += &format!(
"\n{name}{{{metadata}, {counter_metadata}, id=\"{id}\"}} {value} {timestamp}"
);
}
}

Expand Down Expand Up @@ -273,12 +290,19 @@ fn simple_stats(quoted: bool) -> Vec<String> {
}
Some(Value::Other(any)) => {
if let Some(counters) = any.downcast_ref::<CounterGroup>() {
if let Some(counters) = counters.load() {
for (id, value) in counters.iter().enumerate() {
if let Some(c) = counters.load() {
for (id, value) in c.iter().enumerate() {
if *value == 0 {
continue;
}

if let Some(metadata) = counters.load_metadata(id) {
if let Some(name) = metadata.get("name") {
data.push(format!("{q}{simple_name}/{name}{q}: {value}"));
continue;
}
}

data.push(format!("{q}{simple_name}/{id}{q}: {value}"));
}
}
Expand Down
50 changes: 50 additions & 0 deletions src/samplers/cpu/linux/frequency/mod.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) 2023 The Rezolus Authors

#include <vmlinux.h>
#include "../../../common/bpf/cgroup_info.h"
#include "../../../common/bpf/helpers.h"
#include <bpf/bpf_core_read.h>
#include <bpf/bpf_helpers.h>
Expand All @@ -10,6 +11,7 @@
#define COUNTER_GROUP_WIDTH 8
#define MAX_CPUS 1024
#define MAX_CGROUPS 4096
#define RINGBUF_CAPACITY 32768

#define TASK_RUNNING 0

Expand All @@ -18,6 +20,26 @@
#define MPERF 1
#define TSC 2

// dummy instance for skeleton to generate definition
struct cgroup_info _cgroup_info = {};

// ringbuf to pass cgroup info
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(key_size, 0);
__uint(value_size, 0);
__uint(max_entries, RINGBUF_CAPACITY);
} cgroup_info SEC(".maps");

// holds known cgroup serial numbers to help determine new or changed groups
struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
__uint(map_flags, BPF_F_MMAPABLE);
__type(key, u32);
__type(value, u64);
__uint(max_entries, MAX_CGROUPS);
} cgroup_serial_numbers SEC(".maps");

// counters (see constants defined at top)
struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
Expand Down Expand Up @@ -173,8 +195,36 @@ int handle__sched_switch(u64 *ctx)

if (bpf_core_field_exists(prev->sched_task_group)) {
int cgroup_id = prev->sched_task_group->css.id;
u64 serial_nr = prev->sched_task_group->css.serial_nr;

if (cgroup_id && cgroup_id < MAX_CGROUPS) {

// we check to see if this is a new cgroup by checking the serial number

elem = bpf_map_lookup_elem(&cgroup_serial_numbers, &cgroup_id);

if (elem && *elem != serial_nr) {
// zero the counters, they will not be exported until they are non-zero
u64 zero = 0;
bpf_map_update_elem(&cgroup_aperf, &cgroup_id, &zero, BPF_ANY);
bpf_map_update_elem(&cgroup_mperf, &cgroup_id, &zero, BPF_ANY);
bpf_map_update_elem(&cgroup_tsc, &cgroup_id, &zero, BPF_ANY);

// initialize the cgroup info
struct cgroup_info cginfo = {
.id = cgroup_id,
};

// read the cgroup name
bpf_probe_read_kernel_str(&cginfo.name, CGROUP_NAME_LEN, prev->sched_task_group->css.cgroup->kn->name);

// push the cgroup info into the ringbuf
bpf_ringbuf_output(&cgroup_info, &cginfo, sizeof(cginfo), 0);

// update the serial number in the local map
bpf_map_update_elem(&cgroup_serial_numbers, &cgroup_id, &serial_nr, BPF_ANY);
}

// update cgroup aperf

elem = bpf_map_lookup_elem(&aperf_prev, &processor_id);
Expand Down
24 changes: 24 additions & 0 deletions src/samplers/cpu/linux/frequency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ use crate::*;

use std::sync::Arc;

unsafe impl plain::Plain for bpf::types::cgroup_info {}

fn handle_event(data: &[u8]) -> i32 {
let mut cgroup_info = bpf::types::cgroup_info::default();

if plain::copy_from_bytes(&mut cgroup_info, data).is_ok() {
let name = std::str::from_utf8(&cgroup_info.name)
.unwrap()
.trim_end_matches(char::from(0));

let id = cgroup_info.id;

if !name.is_empty() {
CGROUP_CPU_APERF.insert_metadata(id as usize, "name".to_string(), name.to_string());
CGROUP_CPU_MPERF.insert_metadata(id as usize, "name".to_string(), name.to_string());
CGROUP_CPU_TSC.insert_metadata(id as usize, "name".to_string(), name.to_string());
}
}

0
}

#[distributed_slice(SAMPLERS)]
fn init(config: Arc<Config>) -> SamplerResult {
if !config.enabled(NAME) {
Expand All @@ -43,6 +65,7 @@ fn init(config: Arc<Config>) -> SamplerResult {
.packed_counters("cgroup_aperf", &CGROUP_CPU_APERF)
.packed_counters("cgroup_mperf", &CGROUP_CPU_MPERF)
.packed_counters("cgroup_tsc", &CGROUP_CPU_TSC)
.ringbuf_handler("cgroup_info", handle_event)
.build()?;

Ok(Some(Box::new(bpf)))
Expand All @@ -52,6 +75,7 @@ impl SkelExt for ModSkel<'_> {
fn map(&self, name: &str) -> &libbpf_rs::Map {
match name {
"cgroup_aperf" => &self.maps.cgroup_aperf,
"cgroup_info" => &self.maps.cgroup_info,
"cgroup_mperf" => &self.maps.cgroup_mperf,
"cgroup_tsc" => &self.maps.cgroup_tsc,
"counters" => &self.maps.counters,
Expand Down
Loading
Loading