Skip to content

Commit

Permalink
Instrument RustyVault with Prometheus (#76)
Browse files Browse the repository at this point in the history
* Instrument RustyVault with Prometheus

* Network data stay zero all the time. Comment out temporarily, fix it later.

* Add test cases to verify Prometheus instrumentation.

* Since load_avg captured by sysinfo is not available Windows, skip it on Windows platform.
  • Loading branch information
cybershang authored Oct 9, 2024
1 parent ec2eac8 commit e22637a
Show file tree
Hide file tree
Showing 12 changed files with 1,053 additions and 112 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ dashmap = "5.5"
tokio = { version = "1.40", features = ["rt-multi-thread", "macros"] }
ctor = "0.2.8"
better_default = "1.0.5"
prometheus-client = "0.22.3"
sysinfo = "0.31.4"

# optional dependencies
openssl = { version = "0.10.64", optional = true }
Expand Down
28 changes: 23 additions & 5 deletions src/cli/command/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{
sync::{Arc, RwLock},
};

use actix_web::{middleware, web, App, HttpResponse, HttpServer};
use actix_web::{
middleware::{self, from_fn},
web, App, HttpResponse, HttpServer,
};
use anyhow::format_err;
use clap::ArgMatches;
use openssl::{
Expand All @@ -17,8 +20,9 @@ use openssl::{
use sysexits::ExitCode;

use crate::{
cli::config, core::Core, errors::RvError, http, storage, EXIT_CODE_INSUFFICIENT_PARAMS,
EXIT_CODE_LOAD_CONFIG_FAILURE, EXIT_CODE_OK,
cli::config, core::Core, errors::RvError, http, storage,
EXIT_CODE_INSUFFICIENT_PARAMS, EXIT_CODE_LOAD_CONFIG_FAILURE, EXIT_CODE_OK,
metrics::{manager::MetricsManager, middleware::metrics_midleware},
};

pub const WORK_DIR_PATH_DEFAULT: &str = "/tmp/rusty_vault";
Expand Down Expand Up @@ -109,7 +113,14 @@ pub fn main(config_path: &str) -> Result<(), RvError> {

let barrier = storage::barrier_aes_gcm::AESGCMBarrier::new(Arc::clone(&backend));

let core = Arc::new(RwLock::new(Core { physical: backend, barrier: Arc::new(barrier), ..Default::default() }));
let metrics_manager = Arc::new(RwLock::new(MetricsManager::new(config.collection_interval)));
let system_metrics = Arc::clone(&metrics_manager.read().unwrap().system_metrics);

let core = Arc::new(RwLock::new(Core {
physical: backend,
barrier: Arc::new(barrier),
..Default::default()
}));

{
let mut c = core.write()?;
Expand All @@ -119,7 +130,9 @@ pub fn main(config_path: &str) -> Result<(), RvError> {
let mut http_server = HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
.wrap(from_fn(metrics_midleware))
.app_data(web::Data::new(Arc::clone(&core)))
.app_data(web::Data::new(Arc::clone(&metrics_manager)))
.configure(http::init_service)
.default_service(web::to(|| HttpResponse::NotFound()))
})
Expand Down Expand Up @@ -182,7 +195,12 @@ pub fn main(config_path: &str) -> Result<(), RvError> {

log::info!("rusty_vault server starts, waiting for request...");

server.block_on(async { http_server.run().await })?;
server.block_on(async {
tokio::spawn(async {
system_metrics.start_collecting().await;
});
http_server.run().await
})?;
let _ = server.run();

Ok(())
Expand Down
6 changes: 6 additions & 0 deletions src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ pub struct Config {
pub daemon_user: String,
#[serde(default)]
pub daemon_group: String,
#[serde(default = "default_collection_interval")]
pub collection_interval: u64,
}

fn default_collection_interval() -> u64 {
15
}

/// A struct that contains several configurable options for networking stuffs
Expand Down
24 changes: 24 additions & 0 deletions src/http/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::sync::{Arc, RwLock};

use actix_web::{web, HttpResponse};
use prometheus_client::encoding::text::encode;
use crate::metrics::manager::MetricsManager;

pub async fn metrics_handler(metrics_manager: web::Data<Arc<RwLock<MetricsManager>>>) -> HttpResponse {
let m = metrics_manager.read().unwrap();
let registry = m.registry.lock().unwrap();

let mut buffer = String::new();
if let Err(e) = encode(&mut buffer, &registry) {
log::error!("Failed to encode metrics: {}", e);
return HttpResponse::InternalServerError().finish();
}

HttpResponse::Ok()
.content_type("text/plain; version=0.0.4")
.body(buffer)
}

pub fn init_metrics_service(cfg: &mut web::ServiceConfig){
cfg.service(web::resource("/metrics").route(web::get().to(metrics_handler)));
}
2 changes: 2 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{core::Core, errors::RvError, logical::Request};

pub mod logical;
pub mod sys;
pub mod metrics;

pub const AUTH_COOKIE_NAME: &str = "token";
pub const AUTH_HEADER_NAME: &str = "X-RustyVault-Token";
Expand Down Expand Up @@ -109,6 +110,7 @@ pub fn request_on_connect_handler(conn: &dyn Any, ext: &mut Extensions) {
pub fn init_service(cfg: &mut web::ServiceConfig) {
sys::init_sys_service(cfg);
logical::init_logical_service(cfg);
metrics::init_metrics_service(cfg);
}

impl ResponseError for RvError {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod schema;
pub mod shamir;
pub mod storage;
pub mod utils;
pub mod metrics;

#[cfg(test)]
pub mod test_utils;
Expand Down
192 changes: 192 additions & 0 deletions src/metrics/http_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
//! Define and implement HTTP metrics and corresponding methods.
use std::fmt::Write;

use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue, LabelValueEncoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{linear_buckets, Histogram};
use prometheus_client::registry::Registry;

pub const HTTP_REQUEST_COUNT: &str = "http_request_count";
pub const HTTP_REQUEST_COUNT_HELP: &str = "Number of HTTP requests received, labeled by method and status";
pub const HTTP_REQUEST_DURATION_SECONDS: &str = "http_request_duration_seconds";
pub const HTTP_REQUEST_DURATION_SECONDS_HELP: &str = "Duration of HTTP requests, labeled by method and status";

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum MetricsMethod {
GET,
POST,
PUT,
DELETE,
LIST,
OTHER,
}

impl EncodeLabelValue for MetricsMethod {
fn encode(&self, writer: &mut LabelValueEncoder<'_>) -> Result<(), std::fmt::Error> {
match self {
MetricsMethod::GET => writer.write_str("get"),
MetricsMethod::POST => writer.write_str("post"),
MetricsMethod::PUT => writer.write_str("put"),
MetricsMethod::DELETE => writer.write_str("delete"),
MetricsMethod::LIST => writer.write_str("list"),
MetricsMethod::OTHER => writer.write_str("other"),
}
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct HttpLabel {
pub path: String,
pub method: MetricsMethod,
pub status: u16,
}

#[derive(Clone)]
pub struct HttpMetrics {
requests: Family<HttpLabel, Counter>,
histogram: Family<HttpLabel, Histogram>,
}

impl HttpMetrics {
pub fn new(registry: &mut Registry) -> Self {
let requests = Family::<HttpLabel, Counter>::default();
let histogram =
Family::<HttpLabel, Histogram>::new_with_constructor(|| Histogram::new(linear_buckets(0.1, 0.1, 10)));

registry.register(HTTP_REQUEST_COUNT, HTTP_REQUEST_COUNT_HELP, requests.clone());

registry.register(HTTP_REQUEST_DURATION_SECONDS, HTTP_REQUEST_DURATION_SECONDS_HELP, histogram.clone());

Self { requests, histogram }
}

pub fn increment_request_count(&self, label: &HttpLabel) {
self.requests.get_or_create(label).inc();
}

pub fn observe_duration(&self, label: &HttpLabel, duration: f64) {
self.histogram.get_or_create(label).observe(duration);
}
}

#[cfg(test)]
mod tests {
use rand::Rng;
use regex::Regex;
use ureq::json;

use crate::test_utils::TestHttpServer;
use std::collections::HashMap;

const PATH: &str = "path";
const METHOD: &str = "method";

const GET: &str = "GET";
const LIST: &str = "LIST";
const POST: &str = "POST";
const PUT: &str = "PUT";
const DELETE: &str = "DELETE";

fn parse_counter(raw: &str) -> HashMap<String, HashMap<String, u32>> {
let lines: Vec<&str> = raw.split('\n').collect();
let mut i = 0;
let mut counter_map: HashMap<String, HashMap<String, u32>> = HashMap::new();
let name_label_re =
Regex::new(r#"\bpath="(?P<path>[^"]+)",method="(?P<method>[^"]+)",status="(?P<status>[^"]+)""#).unwrap();

while i < lines.len() {
let line = lines[i];
if line.ends_with("counter") {
// move to next line, which is counter
i += 1;
let parts: Vec<&str> = lines[i].split("{").collect();
let metric_name = parts[0];

// capture following counter lines
while lines[i].starts_with(metric_name) {
let parts: Vec<&str> = lines[i].split(" ").collect();
let name_label = parts[0];
let value: u32 = parts[1].parse().unwrap();

if let Some(caps) = name_label_re.captures(name_label) {
let path = caps[PATH].to_string();
let method = caps[METHOD].to_string().to_uppercase();
if let Some(req) = counter_map.get_mut(&path) {
req.insert(method, value);
} else {
let mut req: HashMap<String, u32> = HashMap::new();
req.insert(method, value);
println!("path:{}", &path);
counter_map.insert(path, req);
}
}

i += 1;
}
}
i += 1;
}
counter_map
}

#[test]
fn test_http_request() {
let server = TestHttpServer::new_with_prometheus("test_http_request", false);
let root_token = &server.root_token;

let path = ["v1/secret/password-0", "v1/secret/password-1", "v1/secret/password-2", "v1/secret"];
let mock = [
vec![(DELETE, 2)],
vec![(POST, 3), (GET, 5), (PUT, 7), (DELETE, 9)],
vec![(POST, 2), (GET, 8), (PUT, 12), (DELETE, 16)],
vec![(LIST, 1)],
];
let mut mock_map: HashMap<&str, Vec<(&str, u32)>> = HashMap::new();
for (p, m) in path.iter().zip(mock.iter()) {
mock_map.insert(p, m.to_vec());
}

for (path, mock) in &mock_map {
for request in mock {
let method = request.0;
let count = request.1;
for _ in 0..count {
if method == "POST" || method == "PUT" {
let random_number: u32 = rand::thread_rng().gen_range(0..10000);
let data = json!({
"password": random_number,
})
.as_object()
.unwrap()
.clone();
let (_, _) = server.request(method, path, Some(data), Some(&root_token), None).unwrap();
} else {
let (_, _) = server.request(method, path, None, Some(&root_token), None).unwrap();
}
}
}
}

let (status, resp) = server.request_prometheus("GET", "metrics", None, Some(&root_token), None).unwrap();
assert_eq!(status, 200);

let counter_map = parse_counter(resp["metrics"].as_str().unwrap());
println!("counter map len={}", counter_map.len());

for (path, mock) in &mock_map {
for mock_req in mock {
let method = mock_req.0;
let count = mock_req.1;
let path = format!("/{}", path);
assert!(counter_map.contains_key(&path));

let prom = counter_map.get(&path).unwrap();
assert!(prom.contains_key(method));

let value = *prom.get(method).unwrap();
assert_eq!(count, value);
}
}
}
}
21 changes: 21 additions & 0 deletions src/metrics/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! `MetricManager` holds the Prometheus registry and metrics.
use crate::metrics::http_metrics::HttpMetrics;
use crate::metrics::system_metrics::SystemMetrics;
use prometheus_client::registry::Registry;
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct MetricsManager {
pub registry: Arc<Mutex<Registry>>,
pub system_metrics: Arc<SystemMetrics>,
pub http_metrics: Arc<HttpMetrics>,
}

impl MetricsManager {
pub fn new(collection_interval: u64) -> Self {
let registry = Arc::new(Mutex::new(Registry::default()));
let system_metrics = Arc::new(SystemMetrics::new(&mut registry.lock().unwrap(), collection_interval));
let http_metrics = Arc::new(HttpMetrics::new(&mut registry.lock().unwrap()));
MetricsManager { registry, system_metrics, http_metrics }
}
}
Loading

0 comments on commit e22637a

Please sign in to comment.