From 2abaabb180c3e0a2028b71926a46c9801581a5fe Mon Sep 17 00:00:00 2001 From: Teddy Astie Date: Fri, 29 Sep 2023 15:16:09 +0200 Subject: [PATCH] WIP Introduce xcp-metrics daemon xcp-metrics exposes the same kind of metrics that xcp-rrdd does, but using the OpenMetrics standard. It uses an reworked version of the v2 protocol currently in use by xcp-rrdd, proposed as v3 protocol in xapi-project/xapi-project.github.io#278 Signed-off-by: Teddy Astie Reviewed-by: Yann Dirson --- Cargo.toml | 2 + xcp-metrics-tools/Cargo.toml | 25 ++ xcp-metrics-tools/README.md | 17 + xcp-metrics-tools/src/bin/xcp-metrics-dump.rs | 73 ++++ .../src/bin/xcp-metrics-get-metrics.rs | 77 +++++ .../src/bin/xcp-metrics-openmetrics-proxy.rs | 63 ++++ xcp-metrics/Cargo.toml | 40 +++ xcp-metrics/README.md | 26 ++ xcp-metrics/src/forwarded/mod.rs | 94 +++++ xcp-metrics/src/forwarded/request.rs | 89 +++++ xcp-metrics/src/forwarded/response.rs | 46 +++ xcp-metrics/src/forwarded/routes.rs | 98 ++++++ xcp-metrics/src/hub.rs | 210 ++++++++++++ xcp-metrics/src/main.rs | 172 ++++++++++ xcp-metrics/src/mappings.rs | 34 ++ xcp-metrics/src/providers/mod.rs | 11 + xcp-metrics/src/providers/protocol_v2.rs | 254 ++++++++++++++ xcp-metrics/src/providers/protocol_v3.rs | 132 +++++++ xcp-metrics/src/publishers/mod.rs | 3 + xcp-metrics/src/publishers/openmetrics.rs | 83 +++++ xcp-metrics/src/publishers/rrdd/entry.rs | 60 ++++ xcp-metrics/src/publishers/rrdd/mod.rs | 88 +++++ .../src/publishers/rrdd/round_robin.rs | 184 ++++++++++ xcp-metrics/src/publishers/rrdd/server.rs | 324 ++++++++++++++++++ xcp-metrics/src/rpc/daemon.rs | 47 +++ xcp-metrics/src/rpc/mod.rs | 45 +++ xcp-metrics/src/rpc/routes/deregister.rs | 45 +++ xcp-metrics/src/rpc/routes/get_formats.rs | 40 +++ xcp-metrics/src/rpc/routes/mod.rs | 98 ++++++ xcp-metrics/src/rpc/routes/next_reading.rs | 28 ++ xcp-metrics/src/rpc/routes/register.rs | 78 +++++ xcp-metrics/src/rpc/routes/register_v3.rs | 71 ++++ 32 files changed, 2657 insertions(+) create mode 100644 xcp-metrics-tools/Cargo.toml create mode 100644 xcp-metrics-tools/README.md create mode 100644 xcp-metrics-tools/src/bin/xcp-metrics-dump.rs create mode 100644 xcp-metrics-tools/src/bin/xcp-metrics-get-metrics.rs create mode 100644 xcp-metrics-tools/src/bin/xcp-metrics-openmetrics-proxy.rs create mode 100644 xcp-metrics/Cargo.toml create mode 100644 xcp-metrics/README.md create mode 100644 xcp-metrics/src/forwarded/mod.rs create mode 100644 xcp-metrics/src/forwarded/request.rs create mode 100644 xcp-metrics/src/forwarded/response.rs create mode 100644 xcp-metrics/src/forwarded/routes.rs create mode 100644 xcp-metrics/src/hub.rs create mode 100644 xcp-metrics/src/main.rs create mode 100644 xcp-metrics/src/mappings.rs create mode 100644 xcp-metrics/src/providers/mod.rs create mode 100644 xcp-metrics/src/providers/protocol_v2.rs create mode 100644 xcp-metrics/src/providers/protocol_v3.rs create mode 100644 xcp-metrics/src/publishers/mod.rs create mode 100644 xcp-metrics/src/publishers/openmetrics.rs create mode 100644 xcp-metrics/src/publishers/rrdd/entry.rs create mode 100644 xcp-metrics/src/publishers/rrdd/mod.rs create mode 100644 xcp-metrics/src/publishers/rrdd/round_robin.rs create mode 100644 xcp-metrics/src/publishers/rrdd/server.rs create mode 100644 xcp-metrics/src/rpc/daemon.rs create mode 100644 xcp-metrics/src/rpc/mod.rs create mode 100644 xcp-metrics/src/rpc/routes/deregister.rs create mode 100644 xcp-metrics/src/rpc/routes/get_formats.rs create mode 100644 xcp-metrics/src/rpc/routes/mod.rs create mode 100644 xcp-metrics/src/rpc/routes/next_reading.rs create mode 100644 xcp-metrics/src/rpc/routes/register.rs create mode 100644 xcp-metrics/src/rpc/routes/register_v3.rs diff --git a/Cargo.toml b/Cargo.toml index b4e1f25..3f2b02f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,10 +3,12 @@ package.version = "0.1.0" package.repository = "https://github.com/xcp-ng/xcp-metrics" package.categories = ["virtualization"] members = [ + "xcp-metrics", "xcp-metrics-common", "xapi-rs", "plugins/xcp-metrics-plugin-common", "plugins/xcp-metrics-plugin-squeezed", + "xcp-metrics-tools", ] [profile.release] lto = true \ No newline at end of file diff --git a/xcp-metrics-tools/Cargo.toml b/xcp-metrics-tools/Cargo.toml new file mode 100644 index 0000000..5522577 --- /dev/null +++ b/xcp-metrics-tools/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "xcp-metrics-tools" +description = "Various xcp-metrics utilities" +version.workspace = true +license = "AGPL-3.0-only" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +xcp-metrics-common = { path = "../xcp-metrics-common" } +xapi = { path = "../xapi-rs" } + +anyhow = "1.0" +serde_json = "1.0" + +tokio = { version = "1", features = ["full"] } + +[dependencies.clap] +version = "4.3" +features = ["derive"] + +[dependencies.hyper] +version = "0.14" +features = ["full"] diff --git a/xcp-metrics-tools/README.md b/xcp-metrics-tools/README.md new file mode 100644 index 0000000..660d83a --- /dev/null +++ b/xcp-metrics-tools/README.md @@ -0,0 +1,17 @@ +# xcp-metrics tools + +Set of various xcp-metrics utilities. + +## Project binaries + +### xcp-metrics-dump + +Utility to dump plugin protocol v2 or protocol v3 files. + +### xcp-metrics-get-metrics + +Tool that fetches current OpenMetrics from xcp-metrics daemon using either the protobuf or text format. + +### xcp-metrics-openmetrics-proxy + +Small program that redirects HTTP requests into OpenMetrics RPC route, meant to be used with Prometheus or similar projects. \ No newline at end of file diff --git a/xcp-metrics-tools/src/bin/xcp-metrics-dump.rs b/xcp-metrics-tools/src/bin/xcp-metrics-dump.rs new file mode 100644 index 0000000..a316d37 --- /dev/null +++ b/xcp-metrics-tools/src/bin/xcp-metrics-dump.rs @@ -0,0 +1,73 @@ +use anyhow::Result; +use std::{ + fs, + io::{Read, Seek}, +}; +use xcp_metrics_common::{ + self, protocol_v3, + rrdd::protocol_v2::{self, RrddMessageHeader, RrddMetadata, RrddMetadataRaw}, +}; + +fn read_protocol_v2(mut file: fs::File) -> Result<(), anyhow::Error> { + let header = RrddMessageHeader::parse_from(&mut file)?; + + println!("{:#?}", &header); + + let mut buffer = vec![0u8; header.metadata_length as usize]; + file.read_exact(&mut buffer)?; + + let metadata_raw: RrddMetadataRaw = serde_json::from_slice(&buffer)?; + let metadata = RrddMetadata::try_from(metadata_raw); + + println!("{metadata:#?}"); + + Ok(()) +} + +fn read_protocol_v3(mut file: fs::File) -> Result<(), anyhow::Error> { + match protocol_v3::parse_v3(&mut file) { + Ok((header, metrics)) => { + println!("{header:#?}"); + println!("{metrics:#?}"); + } + Err(e) => { + println!("Read failure ({e}), try reading header, skipping crc checks."); + + file.rewind()?; + println!( + "{:#?}", + protocol_v3::ProtocolV3Header::parse_from(&mut file) + ); + } + } + + Ok(()) +} + +fn main() -> Result<()> { + let args: Vec = std::env::args().collect(); + + if let Some(path) = args.get(1) { + println!("Trying to read message header..."); + let mut file = fs::File::open(path)?; + + let mut file_header = [0u8; 12]; + file.read_exact(&mut file_header)?; + + file.rewind()?; + + if file_header == *protocol_v3::PROTOCOL_V3_HEADER { + println!("Detected protocol v3"); + read_protocol_v3(file)?; + } else if file_header[..11] == *protocol_v2::PROTOCOL_V2_HEADER { + println!("Detected protocol v2"); + read_protocol_v2(file)?; + } else { + println!("Unknown file header"); + } + } else { + println!("Usage: xcp-metrics-dump /dev/shm/metrics/"); + } + + Ok(()) +} diff --git a/xcp-metrics-tools/src/bin/xcp-metrics-get-metrics.rs b/xcp-metrics-tools/src/bin/xcp-metrics-get-metrics.rs new file mode 100644 index 0000000..36172a5 --- /dev/null +++ b/xcp-metrics-tools/src/bin/xcp-metrics-get-metrics.rs @@ -0,0 +1,77 @@ +use std::path::PathBuf; + +use clap::Parser; +use tokio::io::{stdout, AsyncWriteExt}; +use xapi::{ + hyper::{self, body, Body}, + hyperlocal, + rpc::{ + message::RpcKind, methods::OpenMetricsMethod, write_method_jsonrpc, write_method_xmlrpc, + }, +}; + +/// Tool to get metrics from xcp-metrics in OpenMetrics format. +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Path to the xcp-metrics daemon socket to fetch metrics from. + #[arg(short, long)] + daemon_path: Option, + + /// RPC format to use + #[arg(long, default_value_t = RpcKind::JsonRpc)] + rpc_format: RpcKind, + + /// Whether to use protocol buffers binary format. + #[arg(short, long, default_value_t = false)] + binary: bool, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let daemon_path = args + .daemon_path + .unwrap_or_else(|| xapi::get_module_path("xcp-metrics")); + + let module_uri = hyperlocal::Uri::new(daemon_path, "/"); + + let mut rpc_buffer = vec![]; + let method = OpenMetricsMethod { + protobuf: args.binary, + }; + + match args.rpc_format { + RpcKind::JsonRpc => write_method_jsonrpc(&mut rpc_buffer, &method).unwrap(), + RpcKind::XmlRpc => write_method_xmlrpc(&mut rpc_buffer, &method).unwrap(), + }; + + let content_type = match args.rpc_format { + RpcKind::JsonRpc => "application/json-rpc", + RpcKind::XmlRpc => "application/xml", + }; + + eprintln!("Sent: {}", String::from_utf8_lossy(&rpc_buffer)); + + let request = hyper::Request::builder() + .uri(hyper::Uri::from(module_uri)) + .method("POST") + .header("User-agent", "xcp-metrics-get-metrics") + .header("content-length", rpc_buffer.len()) + .header("content-type", content_type) + .header("host", "localhost") + .body(Body::from(rpc_buffer)) + .unwrap(); + + let response = hyper::Client::builder() + .build(hyperlocal::UnixConnector) + .request(request) + .await; + + eprintln!("{response:#?}"); + + let response = response.unwrap(); + let data = body::to_bytes(response.into_body()).await.unwrap(); + + stdout().write_all(&data).await.unwrap(); +} diff --git a/xcp-metrics-tools/src/bin/xcp-metrics-openmetrics-proxy.rs b/xcp-metrics-tools/src/bin/xcp-metrics-openmetrics-proxy.rs new file mode 100644 index 0000000..87b2b0e --- /dev/null +++ b/xcp-metrics-tools/src/bin/xcp-metrics-openmetrics-proxy.rs @@ -0,0 +1,63 @@ +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::{Path, PathBuf}, +}; + +use clap::{command, Parser}; +use hyper::{ + server::{conn::AddrStream, Server}, + service::{make_service_fn, service_fn}, + Body, Request, Response, +}; + +use xapi::rpc::methods::OpenMetricsMethod; + +/// OpenMetrics http proxy, used to provide metrics for collectors such as Prometheus. +#[derive(Clone, Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Adress to bind the HTTP server to. + #[arg(short, long, default_value_t = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 8080))] + addr: SocketAddr, + + /// Path to the xcp-metrics daemon socket to fetch metrics from. + #[arg(short, long)] + daemon_path: Option, +} + +async fn redirect_openmetrics( + request: Request, + daemon_path: &Path, +) -> anyhow::Result> { + xapi::send_jsonrpc_to( + daemon_path, + "POST", + &OpenMetricsMethod::default(), + "xcp-metrics-openmetrics-proxy", + ) + .await +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + let daemon_path = args + .daemon_path + .unwrap_or_else(|| xapi::get_module_path("xcp-metrics")); + + let service_fn = make_service_fn(|addr: &AddrStream| { + println!("Handling request {:?}", addr); + let daemon_path = daemon_path.clone(); + + async { + anyhow::Ok(service_fn(move |request| { + let daemon_path = daemon_path.clone(); + async move { redirect_openmetrics(request, &daemon_path).await } + })) + } + }); + + let server = Server::bind(&args.addr).serve(service_fn); + + server.await.expect("Proxy server failure"); +} diff --git a/xcp-metrics/Cargo.toml b/xcp-metrics/Cargo.toml new file mode 100644 index 0000000..bac3723 --- /dev/null +++ b/xcp-metrics/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "xcp-metrics" +description = "Main xcp-metrics daemon" +version.workspace = true +license = "AGPL-3.0-only" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +xcp-metrics-common = { path = "../xcp-metrics-common" } +xapi = { path = "../xapi-rs" } + +serde_json = "1.0" +anyhow = "1.0" +futures = "0.3" +dashmap = "5.4.0" +tracing = "0.1" +tracing-subscriber = "0.3" +maplit = "1.0.2" + +[dependencies.tokio] +version = "1" +features = ["full"] + +[dependencies.serde] +version = "1.0" +features = ["std", "derive"] + +[dependencies.uuid] +version = "1.4" +features = ["std", "serde", "v4", "fast-rng"] + +[dependencies.sendfd] +version = "0.4.3" +features = ["tokio"] + +[dependencies.clap] +version = "4.3" +features = ["derive"] diff --git a/xcp-metrics/README.md b/xcp-metrics/README.md new file mode 100644 index 0000000..ef12983 --- /dev/null +++ b/xcp-metrics/README.md @@ -0,0 +1,26 @@ +# xcp-metrics main daemon + +This is the main xcp-metrics daemon that uses a RPC interface similar to xcp-rrdd (and what some other XAPI project uses). +In addition to support XML-RPC, it also supports JSON-RPC. + +## Main modules + +### forwarded + +Forwarded implementation and routes (e.g `rrd_updates`) that manages the forwarded socket (e.g `xcp-rrdd.forwarded`). + +### hub + +Small module that aggregate metrics. + +### providers + +Metrics providers implementations (e.g protocol v2 and v3) that pushes metrics to hub. + +### publishers + +Modules that pulls metrics from hub and distribute them (using RPC, forwarded route or something else). + +### rpc + +RPC server implementation and routes. \ No newline at end of file diff --git a/xcp-metrics/src/forwarded/mod.rs b/xcp-metrics/src/forwarded/mod.rs new file mode 100644 index 0000000..2c7e52c --- /dev/null +++ b/xcp-metrics/src/forwarded/mod.rs @@ -0,0 +1,94 @@ +//! Forwarded request support. +mod request; +mod response; +mod routes; + +use std::{ + os::fd::{FromRawFd, RawFd}, + path::Path, + slice, + sync::Arc, +}; + +use sendfd::RecvWithFd; +use tokio::{ + io::AsyncWriteExt, + net::{TcpStream, UnixListener, UnixStream}, + task::{self, JoinHandle}, +}; + +use crate::{ + forwarded::{request::ForwardedRequest, routes::route_forwarded}, + XcpMetricsShared, +}; + +async fn forwarded_handler( + stream: UnixStream, + shared: Arc, +) -> anyhow::Result<()> { + let (buffer, fd) = task::spawn_blocking(move || { + let mut buffer = vec![0u8; 10240]; + let mut fd: RawFd = RawFd::default(); + + let std_stream = stream + .into_std() + .expect("Unable to convert tokio stream to std stream."); + let (readed, fds_count) = std_stream + .recv_with_fd(&mut buffer, slice::from_mut(&mut fd)) + .expect("recv_with_fd failure"); + + assert_eq!(fds_count, 1, "fds_count is not 1"); + + buffer.resize(readed, 0); + + (buffer.into_boxed_slice(), fd) + }) + .await?; + + // Get the fd from the forwarded response. + let std_destination = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + std_destination.set_nonblocking(true)?; + + let mut destination = TcpStream::from_std(std_destination)?; + + let request: ForwardedRequest = serde_json::from_slice(&buffer)?; + tracing::info!("Captured request: {request:?}"); + + let response = route_forwarded(shared.clone(), request).await; + + let mut string: Vec = Vec::new(); + response::write_response(&mut string, response?) + .await + .unwrap(); + + tracing::trace!( + "Sending request to socket:\n{}", + String::from_utf8_lossy(&string) + ); + + destination.write_all(&string).await.unwrap(); + + Ok(()) +} + +pub async fn start_forwarded_socket( + daemon_path: &Path, + shared: Arc, +) -> anyhow::Result> { + let daemon_path = daemon_path.to_path_buf(); + let listener = UnixListener::bind(daemon_path)?; + + tracing::info!("Starting forwarded"); + + Ok(task::spawn(async move { + while let Ok((stream, addr)) = listener.accept().await { + tracing::info!("Forwarded request from {addr:?}"); + + let shared = shared.clone(); + + if let Err(e) = forwarded_handler(stream, shared).await { + tracing::error!("Forwarded handler failure {e}"); + } + } + })) +} diff --git a/xcp-metrics/src/forwarded/request.rs b/xcp-metrics/src/forwarded/request.rs new file mode 100644 index 0000000..1578aaf --- /dev/null +++ b/xcp-metrics/src/forwarded/request.rs @@ -0,0 +1,89 @@ +//! [ForwardedRequest] implementation. +use std::{collections::HashMap, str::FromStr}; + +use serde::Deserialize; +use xapi::hyper::{ + header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE, HOST, TRANSFER_ENCODING, USER_AGENT}, + http::uri::PathAndQuery, + Body, Request, Version, +}; + +/// xapi-project/xen-api/blob/master/ocaml/libs/http-lib/http.ml for reference +#[derive(Clone, Debug, Deserialize)] +pub struct ForwardedRequest { + pub m: Box, + pub uri: Box, + pub query: HashMap, Box>, + pub version: Box, + pub frame: bool, + pub transfer_encoding: Option>, + pub accept: Option>, + pub content_length: Option, + pub auth: Option]>>, + pub cookie: HashMap, Box>, + pub task: Option>, + pub subtask_of: Option>, + pub content_type: Option>, + pub host: Option>, + pub user_agent: Option>, + pub close: bool, + pub additional_headers: HashMap, Box>, + pub body: Option>, + pub traceparent: Option>, +} + +impl TryFrom for Request { + type Error = anyhow::Error; + + fn try_from(request: ForwardedRequest) -> Result { + let mut builder = Request::builder(); + + if let Some(version) = match request.version.as_ref() { + "HTTP/0.9" => Some(Version::HTTP_09), + "HTTP/1.0" => Some(Version::HTTP_10), + "HTTP/1.1" => Some(Version::HTTP_11), + "HTTP/2.0" => Some(Version::HTTP_2), + "HTTP/3.0" => Some(Version::HTTP_3), + _ => None, + } { + builder = builder.version(version); + } + + builder = builder.method(request.m.as_ref()); + + builder = builder.uri(PathAndQuery::from_str(&request.uri)?); + + if let Some(value) = request.content_length { + builder = builder.header(CONTENT_LENGTH, value); + } + + if let Some(transfer_encoding) = request.transfer_encoding { + builder = builder.header(TRANSFER_ENCODING, transfer_encoding.as_ref()); + } + + if let Some(accept) = request.accept { + builder = builder.header(ACCEPT, accept.as_ref()); + } + + if let Some(content_type) = request.content_type { + builder = builder.header(CONTENT_TYPE, content_type.as_ref()); + } + + if let Some(host) = request.host { + builder = builder.header(HOST, host.as_ref()); + } + + if let Some(user_agent) = request.user_agent { + builder = builder.header(USER_AGENT, user_agent.as_ref()); + } + + for (name, value) in request.additional_headers.iter() { + builder = builder.header(name.as_ref(), value.as_ref()); + } + + Ok(builder.body(match request.body { + Some(content) => Body::from(content.as_bytes().to_vec()), + None => Body::empty(), + })?) + } +} diff --git a/xcp-metrics/src/forwarded/response.rs b/xcp-metrics/src/forwarded/response.rs new file mode 100644 index 0000000..904b8ea --- /dev/null +++ b/xcp-metrics/src/forwarded/response.rs @@ -0,0 +1,46 @@ +//! [write_response] implementation +use std::{fmt::Debug, io::Write}; + +use xapi::hyper::{body, http::Response, Body}; + +/// Write the HTTP response into some writer. +pub async fn write_response( + writer: &mut W, + mut response: Response, +) -> Result<(), anyhow::Error> +where + W: Write + Debug, +{ + tracing::trace!("Sending HTTP response {response:?} to {writer:?}"); + + write!( + writer, + "HTTP/1.1 {} {}\r\n", + response.status().as_u16(), + response.status().canonical_reason().unwrap_or_default() + )?; + + let body = body::to_bytes(response.body_mut()).await?; + + // Add content-length if not defined + if !response.headers().contains_key("content-length") { + let body_length = body.len(); + response + .headers_mut() + .insert("content-length", body_length.into()); + } + + for (name, value) in response.headers() { + write!( + writer, + "{}: {}\r\n", + name.as_str(), + String::from_utf8_lossy(value.as_bytes()) + )?; + } + + write!(writer, "\r\n")?; + writer.write_all(&body)?; + + Ok(()) +} diff --git a/xcp-metrics/src/forwarded/routes.rs b/xcp-metrics/src/forwarded/routes.rs new file mode 100644 index 0000000..48d65f1 --- /dev/null +++ b/xcp-metrics/src/forwarded/routes.rs @@ -0,0 +1,98 @@ +//! Forwarded routes +use std::{ + sync::Arc, + time::{Duration, SystemTime}, +}; + +use tokio::sync::mpsc; + +use xapi::hyper::{Body, Response}; + +use crate::{ + publishers::rrdd::{server::RrddServerMessage, RrdXportFilter, RrdXportParameters}, + rpc, XcpMetricsShared, +}; + +use super::request::ForwardedRequest; + +pub(super) async fn route_forwarded( + shared: Arc, + request: ForwardedRequest, +) -> anyhow::Result> { + match request.uri.as_ref() { + "/rrd_updates" => rrd_update_handler(shared, request).await, + "/" => rpc::entrypoint(shared, request.try_into()?).await, + _ => Response::builder() + .status(404) + .body("Invalid request".into()) + .map_err(|err| anyhow::anyhow!(err)), + } +} + +async fn rrd_update_handler( + shared: Arc, + request: ForwardedRequest, +) -> anyhow::Result> { + let (tx, mut rx) = mpsc::channel(1); + + let with_host = request + .query + .get("host") + .map(|v| v.as_ref() == "true") + .unwrap_or(false); + + let use_json = request + .query + .get("json") + .map(|v| v.as_ref() == "true") + .unwrap_or(false); + + let filter = if with_host { + RrdXportFilter::All + } else { + RrdXportFilter::AllNoHost + }; + + let start = if let Some(value) = request.query.get("start") { + let since_epoch = value.parse()?; + + SystemTime::UNIX_EPOCH + Duration::from_secs(since_epoch) + } else { + SystemTime::now() + }; + + let interval = if let Some(value) = request.query.get("interval") { + value.parse()? + } else { + 1 + }; + + shared + .rrdd_channel + .send(RrddServerMessage::RequestRrdUpdates( + RrdXportParameters { + start, + interval, + filter, + }, + tx, + ))?; + + let response = rx + .recv() + .await + .ok_or(anyhow::anyhow!("No value received from channel"))??; + + let mut bytes = Vec::with_capacity(1024); + + if use_json { + response.write_json5(&mut bytes)?; + } else { + response.write_xml(&mut bytes)?; + }; + + Response::builder() + .status(200) + .body(Body::from(bytes)) + .map_err(|err| anyhow::anyhow!(err)) +} diff --git a/xcp-metrics/src/hub.rs b/xcp-metrics/src/hub.rs new file mode 100644 index 0000000..a47f171 --- /dev/null +++ b/xcp-metrics/src/hub.rs @@ -0,0 +1,210 @@ +/*! +# Metrics Hub + +This is the part that centralize the metrics from the main daemon. +It communicates metrics with [crate::publishers] (pull) and [crate::providers] (push) using a [`mpsc::UnboundedSender`]. + +All metrics are uniquely identified using a [uuid::Uuid] to ease updating, this identifier must be generated by the provider (using [uuid::Uuid::new_v4]). + +## TODO + +Metrics families should be able to be merged if needed. +*/ +use std::sync::Arc; + +use tokio::{ + sync::mpsc, + task::{self, JoinHandle}, +}; +use xcp_metrics_common::metrics::{Metric, MetricFamily, MetricPoint, MetricSet, MetricType}; + +/// Register a new metric family to the hub. +#[derive(Debug, Clone)] +pub struct CreateFamily { + pub name: Box, + pub metric_type: MetricType, + pub unit: Box, + pub help: Box, +} + +/// Register a new metric to the hub. +#[derive(Debug, Clone)] +pub struct RegisterMetrics { + pub family: Box, + pub metrics: Metric, + pub uuid: uuid::Uuid, +} + +/// Remove a metric from the hub. +#[derive(Debug, Clone)] +pub struct UnregisterMetrics { + pub uuid: uuid::Uuid, +} + +/// Replace the values of a metric. +#[derive(Debug, Clone)] +pub struct UpdateMetrics { + pub uuid: uuid::Uuid, + pub new_values: Box<[MetricPoint]>, +} + +/// Fetch metrics, receiving them in a provided [`mpsc::UnboundedSender`]. +#[derive(Debug, Clone)] +pub struct PullMetrics(pub mpsc::UnboundedSender); + +/// A message that can be sent to the hub. +#[derive(Debug, Clone)] +pub enum HubPushMessage { + CreateFamily(CreateFamily), + RegisterMetrics(RegisterMetrics), + UnregisterMetrics(UnregisterMetrics), + UpdateMetrics(UpdateMetrics), + PullMetrics(PullMetrics), +} + +/// A hub response. +#[derive(Debug, Clone)] +pub enum HubPullResponse { + Metrics(Arc), +} + +/// Metrics Hub +#[derive(Debug, Clone, Default)] +pub struct MetricsHub { + metrics: Arc, +} + +impl MetricsHub { + /// Starts the Metrics Hub in a new [tokio::task], giving the associated [JoinHandle] and hub channel ([`mpsc::UnboundedSender`]). + pub async fn start(self) -> (JoinHandle<()>, mpsc::UnboundedSender) { + let (sender, receiver) = mpsc::unbounded_channel(); + let mut rendez_vous = mpsc::channel(1); + + let handle = task::spawn(async move { self.run(receiver, rendez_vous.0).await }); + + rendez_vous.1.recv().await; + + tracing::info!("Hub ready"); + + (handle, sender) + } + + async fn run( + mut self, + mut receiver: mpsc::UnboundedReceiver, + rendez_vous: mpsc::Sender<()>, + ) { + rendez_vous.send(()).await.unwrap(); + + while let Some(msg) = receiver.recv().await { + match msg { + HubPushMessage::CreateFamily(message) => self.create_family(message).await, + HubPushMessage::RegisterMetrics(message) => self.register(message).await, + HubPushMessage::UnregisterMetrics(message) => self.unregister(message).await, + HubPushMessage::UpdateMetrics(message) => self.update(message).await, + HubPushMessage::PullMetrics(message) => self.pull_metrics(message).await, + } + } + } + + #[tracing::instrument(skip(self))] + async fn create_family( + &mut self, + CreateFamily { + name, + metric_type, + unit, + help, + }: CreateFamily, + ) { + let metrics = Arc::make_mut(&mut self.metrics); + + if let Some(old_family) = metrics.families.insert( + name, + MetricFamily { + metric_type, + unit, + help, + metrics: Default::default(), + }, + ) { + tracing::warn!("Overriden previous family: {old_family:?}"); + } + + tracing::debug!("Inserted family"); + } + + #[tracing::instrument(skip(self))] + async fn register(&mut self, message: RegisterMetrics) { + let metrics = Arc::make_mut(&mut self.metrics); + + let family = match metrics.families.get_mut(&message.family) { + Some(f) => f, + None => { + tracing::warn!("Missing family, creating default one"); + + metrics + .families + .insert(message.family.clone(), Default::default()); + metrics.families.get_mut(&message.family).unwrap() + } + }; + + if let Some(old) = family.metrics.insert(message.uuid, message.metrics) { + tracing::warn!("Overriden {old:?}"); + } + } + + #[tracing::instrument(skip(self))] + async fn unregister(&mut self, message: UnregisterMetrics) { + let metrics = Arc::make_mut(&mut self.metrics); + let mut deprecated_family = None; + + for (family_name, family) in metrics.families.iter_mut() { + if family.metrics.remove(&message.uuid).is_some() { + tracing::info!("Unregistered {}", message.uuid); + + // Remove metric family if now empty. + if family.metrics.is_empty() { + deprecated_family.replace(family_name.clone()); + } + + break; + } + } + + if let Some(name) = &deprecated_family { + tracing::info!("Unregistered empty metric family {name}"); + metrics.families.remove(name); + } + } + + #[tracing::instrument(skip(self))] + async fn update(&mut self, mut message: UpdateMetrics) { + let metrics = Arc::make_mut(&mut self.metrics); + + // TODO: Do some checks. + + for (_, family) in metrics.families.iter_mut() { + if let Some(metrics) = family.metrics.get_mut(&message.uuid) { + tracing::debug!("Metric {} properly updated", message.uuid); + + /* Rust wizardry */ + std::mem::swap(&mut metrics.metrics_point, &mut message.new_values); + return; + } + } + + tracing::error!("Metric not found"); + } + + #[tracing::instrument(skip(self))] + async fn pull_metrics(&mut self, message: PullMetrics) { + let sender = message.0; + tracing::debug!("Pulling metrics"); + + if let Err(e) = sender.send(HubPullResponse::Metrics(Arc::clone(&self.metrics))) { + tracing::error!("Error occured while sending metrics {e}"); + } + } +} diff --git a/xcp-metrics/src/main.rs b/xcp-metrics/src/main.rs new file mode 100644 index 0000000..9154397 --- /dev/null +++ b/xcp-metrics/src/main.rs @@ -0,0 +1,172 @@ +pub mod forwarded; +pub mod hub; +mod mappings; +pub mod providers; +pub mod publishers; +pub mod rpc; + +use std::{ + collections::HashMap, + fs, + path::{Path, PathBuf}, + sync::Arc, +}; + +use clap::{command, Parser}; +use dashmap::DashMap; +use rpc::routes::RpcRoutes; +use tokio::{net::UnixStream, select, sync::mpsc, task::JoinHandle}; + +use publishers::rrdd::server::{RrddServer, RrddServerMessage}; + +use xcp_metrics_common::utils::mapping::CustomMapping; + +/// Shared xcp-metrics structure. +#[derive(Debug)] +pub struct XcpMetricsShared { + /// Handles of the tasks associated to each [providers] + pub plugins: DashMap, JoinHandle<()>>, + + /// Channel to communicate with hub. + pub hub_channel: mpsc::UnboundedSender, + + /// Channel to communicate with the rrdd compatibility server. + pub rrdd_channel: mpsc::UnboundedSender, + + /// List of RPC routes + pub rpc_routes: RpcRoutes, +} + +/// xcp-metrics main daemon +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Logging level + #[arg(short, long, default_value_t = tracing::Level::INFO)] + log_level: tracing::Level, + + /// xcp-metrics socket path + #[arg(long)] + daemon_path: Option, + + /// Custom RrddServer v3-to-v2 mapping file. + #[arg(short, long)] + mapping_file: Option, +} + +/// Check if the XAPI socket is active and unlink it if it isn't. +/// +/// Returns true if the socket is active. +async fn check_unix_socket(socket_path: &Path) -> anyhow::Result { + if !tokio::fs::try_exists(&socket_path).await? { + // Socket doesn't exist. + return Ok(false); + } + + match UnixStream::connect(&socket_path).await { + Ok(_) => Ok(true), + Err(e) => { + if matches!(e.kind(), std::io::ErrorKind::ConnectionRefused) { + // Unlink socket + tracing::warn!( + socket = socket_path.to_str(), + "Unlinking inactive XAPI socket" + ); + fs::remove_file(socket_path)?; + Ok(false) + } else { + tracing::error!( + socket = socket_path.to_str(), + "Unable to check XAPI socket status: {e}" + ); + Err(e.into()) + } + } + } +} + +/// Load the mappings from file or use default ones (for now). +async fn get_mappings( + mapping_path: Option<&Path>, +) -> anyhow::Result, CustomMapping>> { + if let Some(path) = mapping_path { + Ok(serde_json::from_str(&fs::read_to_string(path)?)?) + } else { + Ok(mappings::default_mappings()) + } +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + let text_subscriber = tracing_subscriber::fmt() + .with_ansi(true) + .with_max_level(args.log_level) + .compact() + .finish(); + + tracing::subscriber::set_global_default(text_subscriber).unwrap(); + + // Use xcp-rrdd socket path if arg0 is xcp-rrdd and not specified in Args. + let socket_path = args.daemon_path.unwrap_or_else(|| { + let Some(arg0) = std::env::args().next() else { + return xapi::get_module_path("xcp-metrics"); + }; + + let arg0_pathname = Path::new(&arg0) + .file_name() + .unwrap_or_default() + .to_string_lossy(); + + if arg0_pathname == "xcp-rrdd" { + tracing::info!("Program name is xcp-rrdd, use xcp-rrdd socket path by default"); + return xapi::get_module_path("xcp-rrdd"); + } + + xapi::get_module_path("xcp-metrics") + }); + + let forwarded_path = format!("{}.forwarded", socket_path.to_string_lossy()); + + if check_unix_socket(Path::new(&socket_path)).await.unwrap() { + tracing::error!("Unable to start: xcp-metrics socket is active"); + panic!("Unable to start: is xcp-metrics already running ?"); + } + + if check_unix_socket(Path::new(&forwarded_path)).await.unwrap() { + tracing::error!("Unable to start: xcp-metrics.forwarded socket is active"); + panic!("Unable to start: is xcp-metrics already running ?"); + } + + let (hub, hub_channel) = hub::MetricsHub::default().start().await; + let (rrdd_server, rrdd_channel) = + RrddServer::new(get_mappings(args.mapping_file.as_deref()).await.unwrap()); + + let shared = Arc::new(XcpMetricsShared { + hub_channel, + plugins: Default::default(), + rrdd_channel, + rpc_routes: Default::default(), + }); + + let socket = rpc::daemon::start_daemon(&socket_path, shared.clone()) + .await + .unwrap(); + + let socket_forwarded = + forwarded::start_forwarded_socket(Path::new(&forwarded_path), shared.clone()) + .await + .unwrap(); + + let rrdd = rrdd_server.start(shared.hub_channel.clone()); + + select! { + res = hub => tracing::warn!("Hub returned: {res:?}"), + res = socket => tracing::warn!("RPC Socket returned: {res:?}"), + res = socket_forwarded => tracing::warn!("RPC Forwarded Socket returned {res:?}"), + res = rrdd => tracing::warn!("RRDD server returned {res:?}") + }; + + tracing::info!("Stopping"); +} diff --git a/xcp-metrics/src/mappings.rs b/xcp-metrics/src/mappings.rs new file mode 100644 index 0000000..70c9c50 --- /dev/null +++ b/xcp-metrics/src/mappings.rs @@ -0,0 +1,34 @@ +//! Protocol v2 to protocol v3 predefined [CustomMapping]s. +use std::collections::HashMap; + +use maplit::hashmap; +use xcp_metrics_common::utils::mapping::CustomMapping; + +pub fn default_mappings() -> HashMap, CustomMapping> { + hashmap! { + "cpu-cstate".into() => CustomMapping { + pattern: "cpu{id}-C{state}".into(), + min: 0.0, + max: f32::INFINITY, + default: true, + }, + "cpu-pstate".into() => CustomMapping { + pattern: "cpu{id}-P{state}".into(), + min: 0.0, + max: f32::INFINITY, + default: true, + }, + "cpu".into() => CustomMapping { + pattern: "cpu{id}".into(), + min: 0.0, + max: 1.0, + default: true, + }, + "cpu-freq".into() => CustomMapping { + pattern: "CPU{id}-avg-freq".into(), + min: 0.0, + max: f32::INFINITY, + default: true + }, + } +} diff --git a/xcp-metrics/src/providers/mod.rs b/xcp-metrics/src/providers/mod.rs new file mode 100644 index 0000000..e932c6d --- /dev/null +++ b/xcp-metrics/src/providers/mod.rs @@ -0,0 +1,11 @@ +//! Metrics providers. +use tokio::{sync::mpsc, task::JoinHandle}; + +use crate::hub::HubPushMessage; + +pub mod protocol_v2; +pub mod protocol_v3; + +pub trait Provider { + fn start_provider(self, hub_channel: mpsc::UnboundedSender) -> JoinHandle<()>; +} diff --git a/xcp-metrics/src/providers/protocol_v2.rs b/xcp-metrics/src/providers/protocol_v2.rs new file mode 100644 index 0000000..0fa84de --- /dev/null +++ b/xcp-metrics/src/providers/protocol_v2.rs @@ -0,0 +1,254 @@ +//! Protocol v2 plugin metrics provider +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + time::{Duration, SystemTime}, +}; + +use tokio::{ + fs::File, + io::AsyncReadExt, + sync::mpsc, + task::{self, JoinHandle}, + time, +}; +use xcp_metrics_common::{ + metrics::{Metric, MetricPoint}, + rrdd::{ + protocol_common::DataSourceValue, + protocol_v2::{RrddMessageHeader, RrddMetadata, RrddMetadataRaw}, + }, +}; + +use super::Provider; +use crate::hub::{CreateFamily, HubPushMessage, RegisterMetrics, UnregisterMetrics, UpdateMetrics}; + +const METRICS_SHM_PATH: &str = "/dev/shm/metrics/"; + +#[derive(Debug, Clone)] +struct PluginData { + metadata: RrddMetadata, + values: Box<[DataSourceValue]>, + metadata_checksum: u32, + timestamp: SystemTime, +} + +#[derive(Debug, Clone)] +pub struct ProtocolV2Provider { + name: Box, + path: PathBuf, + state: Option, + registered_metrics: HashMap, uuid::Uuid>, + hub_channel: Option>, + last_reset: SystemTime, +} + +impl ProtocolV2Provider { + pub fn new(plugin_name: &str) -> Self { + Self { + name: plugin_name.into(), + path: Path::new(METRICS_SHM_PATH).join(plugin_name), + state: None, + registered_metrics: HashMap::new(), + hub_channel: None, + last_reset: SystemTime::now(), + } + } + + async fn collect_plugin_metrics(&mut self) -> anyhow::Result { + let mut file = File::open(&self.path).await?; + let header = RrddMessageHeader::parse_async(&mut file).await; + let mut updated_metadata = false; + + tracing::debug!("Readed {header:?}"); + + if let Ok(header) = header { + // Get the most up to date PluginData. + let mut data = match self.state.as_ref() { + /* matching checksums, no need to update metadata */ + Some( + data @ &PluginData { + metadata_checksum, .. + }, + ) if metadata_checksum == header.metadata_checksum => (*data).clone(), + + /* Regenerate data */ + _ => { + updated_metadata = true; + self.last_reset = SystemTime::now(); + + // Read metadata + let mut metadata_string = vec![0u8; header.metadata_length as usize]; + file.read_exact(&mut metadata_string).await?; + let metadata: RrddMetadata = + serde_json::from_slice::(&metadata_string)?.try_into()?; + + PluginData { + values: vec![DataSourceValue::Undefined; metadata.datasources.len()] + .into_boxed_slice(), + metadata, + metadata_checksum: header.metadata_checksum, + timestamp: header.timestamp, + } + } + }; + + // Update data value slice using raw values in header along with metadata. + data.values + .iter_mut() + .zip(data.metadata.datasources.values()) + .zip(header.values.iter()) + .for_each(|((dest, meta), &raw)| { + *dest = match meta.value { + DataSourceValue::Int64(_) => { + DataSourceValue::Int64(i64::from_be_bytes(raw)) + } + DataSourceValue::Float(_) => { + DataSourceValue::Float(f64::from_be_bytes(raw)) + } + DataSourceValue::Undefined => DataSourceValue::Undefined, + } + }); + + data.timestamp = header.timestamp; + + self.state.replace(data); + } + + Ok(updated_metadata) + } + + /// Send metrics to hub, registering them if they are not. + async fn send_values(&mut self, hub_channel: &mpsc::UnboundedSender) { + let Some(state) = self.state.as_ref() else { + return; + }; + + std::iter::zip(state.metadata.datasources.iter(), state.values.iter()).for_each( + |((name, metadata), &value)| { + // Wrap value into its appropriate MetricPoint. + let metric_point = MetricPoint::from_protocol_v2( + metadata, + value, + state.timestamp, + Some(self.last_reset), + ); + + match self.registered_metrics.get(name) { + Some(&uuid) => { + // Update metrics values + let new_values = vec![metric_point].into_boxed_slice(); + + hub_channel + .send(HubPushMessage::UpdateMetrics(UpdateMetrics { + new_values, + uuid, + })) + .unwrap(); + } + None => { + // Not yet registered, register it. + let metric_uuid = uuid::Uuid::new_v4(); + + // Register family + hub_channel + .send(HubPushMessage::CreateFamily(CreateFamily { + name: name.clone(), + metric_type: metric_point.value.get_type(), + unit: metadata.units.clone(), + help: metadata.description.clone(), + })) + .unwrap(); + + // Register metric + hub_channel + .send(HubPushMessage::RegisterMetrics(RegisterMetrics { + family: name.clone(), + uuid: metric_uuid, + metrics: Metric::from_protocol_v2( + metadata, + value, + state.timestamp, + Some(self.last_reset), + ), + })) + .unwrap(); + + self.registered_metrics.insert(name.clone(), metric_uuid); + } + }; + }, + ); + } + + fn check_metrics(&mut self, hub_channel: &mpsc::UnboundedSender) { + let Some(state) = &self.state else { return }; + + self.registered_metrics.retain(|key, &mut uuid| { + // Check if the key exists in the new metadata. + if !state.metadata.datasources.contains_key(key) { + // missing: unregister + hub_channel + .send(HubPushMessage::UnregisterMetrics(UnregisterMetrics { + uuid, + })) + .ok(); + + false + } else { + true + } + }) + } +} + +impl Provider for ProtocolV2Provider { + fn start_provider( + mut self, + hub_channel: mpsc::UnboundedSender, + ) -> JoinHandle<()> { + self.hub_channel.replace(hub_channel.clone()); + + task::spawn(async move { + tracing::trace_span!("plugin (v2) {}", self.name); + + loop { + let updated_metadata = self.collect_plugin_metrics().await; + + if let Ok(true) = updated_metadata { + tracing::info!("Updated metadata"); + } + + tracing::debug!("New state: {:?}", self.state); + + match updated_metadata { + // Check for removed metrics + Ok(true) => self.check_metrics(&hub_channel), + Ok(false) => (), + Err(e) => tracing::error!("{e}"), + } + + self.send_values(&hub_channel).await; + + time::sleep(Duration::from_secs(5)).await + } + }) + } +} + +impl Drop for ProtocolV2Provider { + fn drop(&mut self) { + // Unregister plugins + if let Some(hub_channel) = &self.hub_channel { + self.registered_metrics.iter().for_each(|(name, &uuid)| { + tracing::info!("Unregistering {name}"); + + hub_channel + .send(HubPushMessage::UnregisterMetrics(UnregisterMetrics { + uuid, + })) + .ok(); // ignore failure (destroyed hub ?) + }); + } + } +} diff --git a/xcp-metrics/src/providers/protocol_v3.rs b/xcp-metrics/src/providers/protocol_v3.rs new file mode 100644 index 0000000..0bf2592 --- /dev/null +++ b/xcp-metrics/src/providers/protocol_v3.rs @@ -0,0 +1,132 @@ +//! Protocol v3 plugin metrics provider + +use std::{ + path::{Path, PathBuf}, + time::{Duration, SystemTime}, +}; + +use tokio::{fs::File, sync::mpsc, task::JoinHandle, time}; +use xcp_metrics_common::{metrics::MetricSet, protocol_v3, utils::delta::MetricSetModel}; + +use crate::hub::{CreateFamily, HubPushMessage, RegisterMetrics, UnregisterMetrics, UpdateMetrics}; + +use super::Provider; + +const METRICS_SHM_PATH: &str = "/dev/shm/metrics/"; + +#[derive(Debug, Clone)] +pub struct ProtocolV3Provider { + name: Box, + path: PathBuf, + last_timestamp: SystemTime, + + model: MetricSetModel, +} + +impl ProtocolV3Provider { + pub fn new(plugin_name: &str) -> Self { + Self { + name: plugin_name.into(), + path: Path::new(METRICS_SHM_PATH).join(plugin_name), + last_timestamp: SystemTime::now(), + model: MetricSetModel::default(), + } + } + + async fn fetch_protocol_v3(&self) -> anyhow::Result> { + let mut file = File::open(&self.path).await?; + + // Read metrics + let (header, metrics) = protocol_v3::parse_v3_async(&mut file).await?; + + if header.timestamp == self.last_timestamp { + tracing::debug!("Metrics have not been updated"); + return Ok(None); + } + + Ok(Some(metrics)) + } +} + +impl Provider for ProtocolV3Provider { + fn start_provider( + mut self, + hub_channel: mpsc::UnboundedSender, + ) -> JoinHandle<()> { + tokio::task::spawn(async move { + tracing::debug_span!("Plugin {}", self.name); + + loop { + match self.fetch_protocol_v3().await { + Ok(Some(new_metrics)) => { + let delta = self.model.compute_delta(&new_metrics); + + // Update model + self.model.apply_delta(&delta); + + // Remove metrics + delta.removed_metrics.into_iter().for_each(|uuid| { + if let Err(e) = hub_channel.send(HubPushMessage::UnregisterMetrics( + UnregisterMetrics { uuid }, + )) { + tracing::error!("Unregister error {e}"); + } + }); + + // Add new families + delta.added_families.into_iter().for_each(|(name, family)| { + if let Err(e) = + hub_channel.send(HubPushMessage::CreateFamily(CreateFamily { + name: name.into(), + metric_type: family.metric_type, + unit: family.unit.clone(), + help: family.help.clone(), + })) + { + tracing::error!("Register error {e}"); + } + }); + + // Add new metrics + delta + .added_metrics + .into_iter() + .for_each(|(family, metrics, uuid)| { + if let Err(e) = hub_channel.send(HubPushMessage::RegisterMetrics( + RegisterMetrics { + family: family.into(), + metrics: (*metrics).clone(), + uuid, + }, + )) { + tracing::error!("Unregister error {e}"); + } + }); + + // Update all metrics + new_metrics.families.into_iter().for_each(|(name, family)| { + family.metrics.into_iter().for_each(|(_, metric)| { + let uuid = self.model.metrics_map[&(name.clone(), metric.labels)]; + + if let Err(e) = + hub_channel.send(HubPushMessage::UpdateMetrics(UpdateMetrics { + uuid, + new_values: metric.metrics_point.clone(), + })) + { + tracing::error!("Update error {e}"); + } + }); + }); + } + Ok(None) => {} + Err(e) => { + tracing::warn!("Unable to fetch metrics: {e}") + } + } + + time::sleep(Duration::from_secs(5)).await + } + }) + } +} diff --git a/xcp-metrics/src/publishers/mod.rs b/xcp-metrics/src/publishers/mod.rs new file mode 100644 index 0000000..c6c7f9c --- /dev/null +++ b/xcp-metrics/src/publishers/mod.rs @@ -0,0 +1,3 @@ +//! Metrics publishers +pub mod openmetrics; +pub mod rrdd; diff --git a/xcp-metrics/src/publishers/openmetrics.rs b/xcp-metrics/src/publishers/openmetrics.rs new file mode 100644 index 0000000..56b2b40 --- /dev/null +++ b/xcp-metrics/src/publishers/openmetrics.rs @@ -0,0 +1,83 @@ +//! OpenMetrics based metrics publisher +use std::sync::Arc; + +use futures::future::BoxFuture; +use tokio::sync::mpsc; +use xapi::{ + hyper::{Body, Response}, + rpc::{message::RpcRequest, methods::OpenMetricsMethod, XcpRpcMethodNamed}, +}; +use xcp_metrics_common::{ + metrics::MetricSet, + openmetrics::{self, prost::Message, text}, +}; + +use crate::{ + hub::{HubPullResponse, HubPushMessage, PullMetrics}, + rpc::routes::XcpRpcRoute, + XcpMetricsShared, +}; + +fn generate_openmetrics_message(metrics: MetricSet) -> Vec { + openmetrics::MetricSet::from(metrics).encode_to_vec() +} + +fn generate_openmetrics_text_message(metrics: MetricSet) -> Vec { + let mut output = String::new(); + + text::write_metrics_set_text(&mut output, &metrics).unwrap(); + + output.into_bytes() +} + +const OPENMETRICS_TEXT_CONTENT_TYPE: &str = + "application/openmetrics-text; version=1.0.0; charset=utf-8"; +const OPENMETRICS_PROTOBUF_CONTENT_TYPE: &str = "application/openmetrics-protobuf; version=1.0.0"; + +#[derive(Copy, Clone, Default)] +pub struct OpenMetricsRoute; + +impl XcpRpcRoute for OpenMetricsRoute { + fn run( + &self, + shared: Arc, + message: RpcRequest, + ) -> BoxFuture<'static, anyhow::Result>> { + tracing::info_span!("Open Metrics query"); + tracing::debug!("Preparing query"); + + Box::pin(async move { + let use_protobuf = message + .try_into_method::() + .map_or(false, |method| method.protobuf); + + let (sender, mut receiver) = mpsc::unbounded_channel(); + + shared + .hub_channel + .send(HubPushMessage::PullMetrics(PullMetrics(sender)))?; + + let Some(HubPullResponse::Metrics(metrics)) = receiver.recv().await else { + anyhow::bail!("Unable to fetch metrics from hub") + }; + + if use_protobuf { + let message = generate_openmetrics_message((*metrics).clone()); + + Ok(Response::builder() + .header("content-type", OPENMETRICS_PROTOBUF_CONTENT_TYPE) + .body(message.into())?) + } else { + let message = generate_openmetrics_text_message((*metrics).clone()); + + Ok(Response::builder() + .header("content-type", OPENMETRICS_TEXT_CONTENT_TYPE) + .body(message.into())?) + } + }) + } + + fn get_name(&self) -> &'static str { + OpenMetricsMethod::get_method_name() + } +} diff --git a/xcp-metrics/src/publishers/rrdd/entry.rs b/xcp-metrics/src/publishers/rrdd/entry.rs new file mode 100644 index 0000000..a0c7a3c --- /dev/null +++ b/xcp-metrics/src/publishers/rrdd/entry.rs @@ -0,0 +1,60 @@ +//! [RrdEntry] definition +use serde::{Deserialize, Serialize}; +use xcp_metrics_common::rrdd::protocol_common::DataSourceMetadata; + +use super::{round_robin::RoundRobinBuffer, Granuality}; + +/// A xcp-rrdd metric entry. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RrdEntry { + /// Full entry name (KIND:owner:uuid:metric_name) + pub name: Box, + + /// Protocol v2 metadata + pub metadata: DataSourceMetadata, + + /// Metrics per five seconds (for the past ten minutes) + pub five_seconds: RoundRobinBuffer, + + /// Metrics per minute (for the past two hours) + pub minute: RoundRobinBuffer, + + /// Metrics per hour (for the past week). + pub hour: RoundRobinBuffer, + + /// Metrics per day (for the past year). + pub day: RoundRobinBuffer, +} + +impl RrdEntry { + pub fn new(name: Box, metadata: DataSourceMetadata) -> Self { + Self { + name, + metadata, + + // Per five seconds, past ten minutes. + five_seconds: RoundRobinBuffer::new( + Granuality::FiveSeconds.get_buffer_size(), + f64::NAN, + ), + + // Per minute, past ten minutes. + minute: RoundRobinBuffer::new(Granuality::Minute.get_buffer_size(), f64::NAN), + + // Per hour, past week. + hour: RoundRobinBuffer::new(Granuality::Hour.get_buffer_size(), f64::NAN), + + // Per day, past year. + day: RoundRobinBuffer::new(Granuality::Day.get_buffer_size(), f64::NAN), + } + } + + pub(super) fn get_buffer(&self, granuality: Granuality) -> &RoundRobinBuffer { + match granuality { + Granuality::FiveSeconds => &self.five_seconds, + Granuality::Minute => &self.minute, + Granuality::Hour => &self.hour, + Granuality::Day => &self.day, + } + } +} diff --git a/xcp-metrics/src/publishers/rrdd/mod.rs b/xcp-metrics/src/publishers/rrdd/mod.rs new file mode 100644 index 0000000..6ddb614 --- /dev/null +++ b/xcp-metrics/src/publishers/rrdd/mod.rs @@ -0,0 +1,88 @@ +/*! +xcp-rrdd compatible publisher + +This module is meant to be a compatibility layer on top of [crate::hub::MetricsHub], instead of being a full blown xcp-rrdd server. + +It is exposed mostly through [server::RrddServer], which contains a limited implementation of xcp-rrdd that communicates +periodically with an external [crate::hub::MetricsHub] to fetch latest metrics. +As xcp-rrdd uses a protocol-v2-alike metrics representation (instead of [xcp_metrics_common::metrics]), all metrics are passed through a +[xcp_metrics_common::utils::mapping::MetadataMapping] specified on [server::RrddServer] creation. + +All requests to this server uses [server::RrddServerMessage] using the channel provided on [server::RrddServer] creation in a pull-fashion. +Using the [server::RrddServerMessage::RequestRrdUpdates] message, it is possible to get [xcp_metrics_common::rrdd::rrd_updates::RrdXport] exports, +that can be used to implement `/rrd_updates`. +*/ +mod entry; +pub mod round_robin; +pub mod server; + +use std::time::{Duration, SystemTime}; + +/// Rrdd Xport metrics filter. +#[derive(Debug, Clone, Copy)] +pub enum RrdXportFilter { + All, + AllNoHost, + VM(uuid::Uuid), + SR(uuid::Uuid), +} + +/// Rrdd Xport parameters. +#[derive(Debug, Clone)] +pub struct RrdXportParameters { + pub start: SystemTime, + pub interval: u32, + pub filter: RrdXportFilter, +} + +#[derive(Copy, Clone)] +enum Granuality { + FiveSeconds, + Minute, + Hour, + Day, +} + +impl Granuality { + /// Get the duration covered by this level of granuality. + pub const fn get_covered_duration(self) -> Duration { + match self { + // Duration that can cover the five_seconds buffer (10 minutes). + Self::FiveSeconds => Duration::from_secs(10 * 60), + // Duration that can cover the minute buffer (2 hours). + Self::Minute => Duration::from_secs(2 * 3600), + // Duration that can cover the hour buffer (1 weeks). + Self::Hour => Duration::from_secs(24 * 7 * 3600), + // Duration that can cover the day buffer (1 year). + Self::Day => Duration::from_secs(24 * 3600 * 365), + } + } + + pub const fn get_buffer_size(self) -> usize { + match self { + // Size of the per five seconds samples buffer. + Self::FiveSeconds => 10 * 60 / 5, + // Size of the per minute samples buffer. + Self::Minute => 2 * 60, + // Size of the per hour samples buffer. + Self::Hour => 7 * 24, + // Size of the per day samples buffer. + Self::Day => 365, + } + } + + /// Number of five-seconds updates between metrics. + pub const fn get_five_seconds_interval(self) -> u32 { + match self { + Self::FiveSeconds => 1, + Self::Minute => 60 / 5, + Self::Hour => 3600 / 5, + Self::Day => 3600 * 24 / 5, + } + } + + /// Interval between metrics updates. + pub const fn get_interval(self) -> Duration { + Duration::from_secs(5 * self.get_five_seconds_interval() as u64) + } +} diff --git a/xcp-metrics/src/publishers/rrdd/round_robin.rs b/xcp-metrics/src/publishers/rrdd/round_robin.rs new file mode 100644 index 0000000..3268c26 --- /dev/null +++ b/xcp-metrics/src/publishers/rrdd/round_robin.rs @@ -0,0 +1,184 @@ +use serde::{Deserialize, Serialize}; + +// TODO: Redesign parts of it with const generic arrays when serde supports it. +// https://github.com/serde-rs/serde/issues/1937 + +/** +Round-robin buffer. + +# Note + +Overwrite old data after writing `size` items. + +# Design + +```plain +------------------> Items order + ++-----------------+ +|....P=>..........| ++-----------------+ + +P: Round Robin position +=> : Next position after push() +``` + +New items are written at `P` then increment position (or wrap it to 0 if `P >= size`). + +*/ +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RoundRobinBuffer { + pos: usize, + size: usize, + buffer: Box<[T]>, +} + +impl RoundRobinBuffer +where + T: Sized + Default + Copy, +{ + pub fn new(size: usize, default: T) -> Self { + Self { + pos: 0, + size, + buffer: vec![default; size].into_boxed_slice(), + } + } + + pub fn push(&mut self, value: T) { + self.buffer[self.pos] = value; + self.pos = (self.pos + 1) % self.size; + } + + pub fn iter(&self) -> RoundRobinIterator { + RoundRobinIterator { + rrb: self, + pos: self.pos, + done: false, + } + } +} + +/** +Iterator for Round-Robin buffer. + +# Design + +```plain +------------------> Items order + ++-----------------+ +|...........PS=>..| ++-----------------+ + +S: Initial iterator position +P: Iterator position +``` + +Iterate over the buffer, wrapping once until reaching `P` where in this +case `done` is set, and iterator ends at the following `next()`. + +*/ +#[derive(Debug, Clone)] +pub struct RoundRobinIterator<'a, T: Sized> { + /// Round-robin buffer being iterated. + rrb: &'a RoundRobinBuffer, + + /// Position of the iterator in the buffer. + pos: usize, + + /// Indicate if the iterator has completed. + /// We need this as self.rrb.pos contains valid information so we can't rely on `self.pos == self.rrb.pos` + /// to report end; and we don't want to discard this value. + done: bool, +} + +impl<'a, T: Sized> Iterator for RoundRobinIterator<'a, T> { + type Item = &'a T; + + fn next(&mut self) -> Option { + if self.done { + return None; + } + + let value = &self.rrb.buffer[self.pos]; + + self.pos = (self.pos + 1) % self.rrb.size; + + if self.pos == self.rrb.pos { + self.done = true; + } + + Some(value) + } + + /** + # Design + + ```plain + * I < B + +-----------------+ + |.....I.....B.....| + +-----------------+ + + 'I' already wrapped arround, so B - I + 1 is remaining count (include element at B). + + * I > B + +-----------------+ + |.....B.....I.....| + +-----------------+ + + 'I' haven't wrapped arround, so size - I + B + 1 (include element at B) is remaining. + + * I = B + + 0 if done, 1 otherwise + + I: Buffer position + B: Iterator position + ``` + */ + fn size_hint(&self) -> (usize, Option) { + let remaining = if self.pos < self.rrb.pos { + self.rrb.pos - self.pos + 1 + } else if self.pos > self.rrb.pos { + self.rrb.size - self.pos + self.rrb.pos + 1 + } else if self.done { + 0 + } else { + 1 + }; + + (remaining, Some(remaining)) + } +} + +#[test] +fn round_robin_test_insert() { + let mut buffer = RoundRobinBuffer::new(32, f64::NAN); + assert!(f64::is_nan(*buffer.iter().next().unwrap())); + + // Add 32 elements into the buffer. + (0..32).for_each(|i| buffer.push(i as f64)); + + // Elements should come in the same order (we filled the buffer). + (0..32).zip(buffer.iter()).for_each(|(reference, val)| { + assert_eq!(reference, *val as i32); + }); + + // Overwrite all elements. + (32..64).for_each(|i| buffer.push(i as f64)); + + (32..64).zip(buffer.iter()).for_each(|(reference, val)| { + assert_eq!(reference, *val as i32); + }); +} + +#[test] +fn round_robin_test_iter_count() { + let buffer = RoundRobinBuffer::new(32, f64::NAN); + assert_eq!(buffer.iter().count(), 32); + + let buffer = RoundRobinBuffer::new(1, f64::NAN); + assert_eq!(buffer.iter().count(), 1); +} diff --git a/xcp-metrics/src/publishers/rrdd/server.rs b/xcp-metrics/src/publishers/rrdd/server.rs new file mode 100644 index 0000000..981562e --- /dev/null +++ b/xcp-metrics/src/publishers/rrdd/server.rs @@ -0,0 +1,324 @@ +//! [RrddServer] implementation +use std::{ + collections::HashMap, + iter, + sync::Arc, + time::{Duration, SystemTime}, +}; +use tokio::{self, select, sync::mpsc, task::JoinHandle}; + +use xcp_metrics_common::{ + metrics::{Metric, MetricFamily, MetricSet, MetricType, MetricValue, NumberValue}, + rrdd::{ + protocol_common::{DataSourceMetadata, DataSourceOwner}, + rrd_updates::RrdXport, + }, + utils::mapping::{CustomMapping, DefaultMapping, MetadataMapping}, +}; + +use super::{entry::RrdEntry, Granuality, RrdXportFilter, RrdXportParameters}; + +use crate::hub::{HubPullResponse, HubPushMessage, PullMetrics}; + +/// Types of message to communicate with [RrddServer]. +#[derive(Debug)] +pub enum RrddServerMessage { + RequestRrdUpdates(RrdXportParameters, mpsc::Sender>), +} + +/// xcp-rrdd partially compatible server that stores the state of metrics over time. +/// +/// See [super] for more information. +#[derive(Debug)] +pub struct RrddServer { + receiver: mpsc::UnboundedReceiver, + host_uuid: uuid::Uuid, + mappings: HashMap, CustomMapping>, + + /// Map a UUID from hub's MetricSet with a RrdEntry. + entry_database: HashMap, + + latest_update: SystemTime, + + minute_update_counter: u32, + latest_minute_update: SystemTime, + + hour_update_counter: u32, + latest_hour_update: SystemTime, + + day_update_counter: u32, + latest_day_update: SystemTime, +} + +/// Get the owner part of the rrd name (i.e vm:UUID). +fn format_owner_part(metadata: &DataSourceMetadata, host_uuid: uuid::Uuid) -> String { + match metadata.owner { + DataSourceOwner::Host => format!("host:{host_uuid}"), + DataSourceOwner::VM(uuid) => format!("vm:{uuid}"), + DataSourceOwner::SR(uuid) => format!("sr:{uuid}"), + } +} + +impl RrddServer { + pub fn new( + mappings: HashMap, CustomMapping>, + ) -> (Self, mpsc::UnboundedSender) { + let (sender, receiver) = mpsc::unbounded_channel(); + + ( + Self { + receiver, + host_uuid: uuid::Uuid::new_v4(), + mappings, + + entry_database: HashMap::new(), + + latest_update: SystemTime::now(), + + day_update_counter: 0, + latest_day_update: SystemTime::now(), + + hour_update_counter: 0, + latest_hour_update: SystemTime::now(), + + minute_update_counter: 0, + latest_minute_update: SystemTime::now(), + }, + sender, + ) + } + + fn to_name_v2( + mappings: &HashMap, CustomMapping>, + metric: &Metric, + family: &MetricFamily, + family_name: &str, + ) -> Option<(Box, DataSourceMetadata)> { + if let Some(custom_mapping) = mappings.get(family_name) { + custom_mapping.convert(family_name, family, metric) + } else { + DefaultMapping.convert(family_name, family, metric) + } + } + + /// Pull metrics from hub + pub async fn pull_metrics( + &mut self, + hub_channel: &mpsc::UnboundedSender, + ) -> anyhow::Result> { + let (tx, mut rx) = mpsc::unbounded_channel(); + hub_channel.send(HubPushMessage::PullMetrics(PullMetrics(tx)))?; + + let response = rx.recv().await.ok_or(anyhow::anyhow!("No response"))?; + + match response { + HubPullResponse::Metrics(metrics) => Ok(metrics), //r => tracing::error!("Unsupported hub response: {r:?}"), + } + } + + /// Update all metrics + pub fn update_metrics(&mut self, metrics: &MetricSet) { + let do_update_minute = { + self.minute_update_counter = + (self.minute_update_counter + 1) % Granuality::Minute.get_five_seconds_interval(); + self.minute_update_counter == 0 + }; + + let do_update_hour = { + self.hour_update_counter = + (self.hour_update_counter + 1) % Granuality::Hour.get_five_seconds_interval(); + self.hour_update_counter == 0 + }; + + let do_update_day = { + self.day_update_counter = + (self.day_update_counter + 1) % Granuality::Day.get_five_seconds_interval(); + self.day_update_counter == 0 + }; + + // TODO: Take in account removed metrics, as they will no longer exist in latest MetricSet. + // Maybe use a MetricSetModel to track added metrics, and rely on entry_database to iterate ? + metrics + .families + .iter() + .filter(|(_, family)| { + // Only consider gauge and counter metrics. + matches!(family.metric_type, MetricType::Gauge | MetricType::Counter) + }) + .flat_map(|(name, family)| { + iter::zip(iter::repeat((name, family)), family.metrics.iter()) + }) + .for_each(|((family_name, family), (&uuid, metric))| { + self.do_update_metric( + uuid, + metric, + family, + family_name, + do_update_minute, + do_update_hour, + do_update_day, + ) + }); + } + + /// Update the metric in the entry database. + fn do_update_metric( + &mut self, + uuid: uuid::Uuid, + metric: &Metric, + family: &MetricFamily, + family_name: &str, + do_update_minute: bool, + do_update_hour: bool, + do_update_day: bool, + ) { + // Get (or create) the entry. + let entry = self.entry_database.entry(uuid).or_insert_with(|| { + tracing::debug!("New entry {uuid}"); + let (v2_name, metadata) = Self::to_name_v2(&self.mappings, metric, family, family_name) + .expect("Unexpected to_name_v2 failure"); + let owner_part = format_owner_part(&metadata, self.host_uuid); + + // Consider only AVERAGE metrics for now. + RrdEntry::new( + format!("AVERAGE:{owner_part}:{v2_name}").into_boxed_str(), + metadata, + ) + }); + + // Take the first metric. + let first_metric = metric + .metrics_point + .first() + .map(|metric_point| &metric_point.value); + + // Get the value as f64, use NaN is nothing is available. + let value = first_metric.map_or(f64::NAN, |metric| match metric { + MetricValue::Gauge(value) | MetricValue::Counter { total: value, .. } => match *value { + NumberValue::Double(val) => val, + NumberValue::Int64(val) => val as _, + NumberValue::Undefined => f64::NAN, + }, + _ => f64::NAN, + }); + + entry.five_seconds.push(value); + + if do_update_minute { + self.latest_minute_update = SystemTime::now(); + entry.minute.push(value); + } + + if do_update_hour { + self.latest_hour_update = SystemTime::now(); + entry.hour.push(value); + } + + if do_update_day { + self.latest_day_update = SystemTime::now(); + entry.day.push(value); + } + } + + pub async fn process_message(&self, message: RrddServerMessage) { + match message { + RrddServerMessage::RequestRrdUpdates(info, sender) => { + tracing::info!("Processing RrdUpdate request"); + + // TODO: Use interval. + + let granuality = { + let distance_from_now = info.start.elapsed().unwrap_or( + Duration::ZERO, /* if start is in the future, consider now */ + ); + + if distance_from_now < Granuality::FiveSeconds.get_covered_duration() { + Granuality::FiveSeconds + } else if distance_from_now < Granuality::Minute.get_covered_duration() { + Granuality::Minute + } else if distance_from_now < Granuality::Hour.get_covered_duration() { + Granuality::Hour + } else { + Granuality::Day + } + }; + + let (legend, mut data_iterators): (Vec<_>, Vec<_>) = self + .entry_database + .values() + .filter(|entry| { + // Apply filter + match info.filter { + RrdXportFilter::All => true, + RrdXportFilter::AllNoHost => !matches!(entry.metadata.owner, DataSourceOwner::Host), + RrdXportFilter::VM(uuid) => matches!(entry.metadata.owner, DataSourceOwner::VM(entry_uuid) if uuid == entry_uuid), + RrdXportFilter::SR(uuid) => matches!(entry.metadata.owner, DataSourceOwner::SR(entry_uuid) if uuid == entry_uuid), + } + }) + .map(|entry| (entry.name.clone(), entry.get_buffer(granuality).iter())) + .unzip(); + + let (start, end) = { + let start = match granuality { + Granuality::FiveSeconds => self.latest_update, + Granuality::Minute => self.latest_minute_update, + Granuality::Hour => self.latest_hour_update, + Granuality::Day => self.latest_day_update, + }; + + (start, start + granuality.get_covered_duration()) + }; + + let data = (0..granuality.get_buffer_size()) + .map(|i| { + ( + start + (i as u32) * granuality.get_interval(), + data_iterators + .iter_mut() + .map(|iter| iter.next().unwrap_or(&f64::NAN)) + .cloned() + .collect(), + ) + }) + .collect(); + + sender + .send(Ok(RrdXport { + start, + end, + step_secs: granuality.get_five_seconds_interval() * 5, + legend, + data, + })) + .await + .unwrap(); + } + } + } + + #[tracing::instrument] + pub fn start(mut self, hub_channel: mpsc::UnboundedSender) -> JoinHandle<()> { + tokio::task::spawn(async move { + let mut timer = tokio::time::interval(Duration::from_secs(5)); + + loop { + select! { + _ = timer.tick() => { + tracing::debug!("Pulling metrics"); + + match self.pull_metrics(&hub_channel).await { + Ok(metrics) => self.update_metrics(&metrics), + Err(e) => tracing::error!("Unable to pull metrics {e}") + } + }, + msg = self.receiver.recv() => { + match msg { + Some(msg) => self.process_message(msg).await, + None => tracing::error!("Unable to read channel message") + } + } + } + } + }) + } +} diff --git a/xcp-metrics/src/rpc/daemon.rs b/xcp-metrics/src/rpc/daemon.rs new file mode 100644 index 0000000..d70cd42 --- /dev/null +++ b/xcp-metrics/src/rpc/daemon.rs @@ -0,0 +1,47 @@ +//! RPC daemon procedures. +use std::{path::Path, sync::Arc}; + +use tokio::{ + net::UnixStream, + task::{self, JoinHandle}, +}; +use xapi::{ + hyper::{ + self, + service::{make_service_fn, service_fn}, + Body, + }, + hyperlocal::UnixServerExt, +}; + +use crate::{rpc, XcpMetricsShared}; + +pub async fn start_daemon( + daemon_path: &Path, + shared: Arc, +) -> anyhow::Result> { + let daemon_path = daemon_path.to_path_buf(); + + let make_service = make_service_fn(move |socket: &UnixStream| { + let shared = shared.clone(); + tracing::debug!("Accepted unix stream {socket:?}"); + + async move { + anyhow::Ok(service_fn(move |request: hyper::Request| { + rpc::entrypoint(shared.clone(), request) + })) + } + }); + + tracing::info!("Starting"); + + let server_task = task::spawn(async move { + hyper::Server::bind_unix(daemon_path) + .expect("Unable to bind to socket") + .serve(make_service) + .await + .unwrap(); + }); + + Ok(server_task) +} diff --git a/xcp-metrics/src/rpc/mod.rs b/xcp-metrics/src/rpc/mod.rs new file mode 100644 index 0000000..247c27a --- /dev/null +++ b/xcp-metrics/src/rpc/mod.rs @@ -0,0 +1,45 @@ +//! RPC routes and entrypoint. +pub mod daemon; +pub mod routes; + +use std::sync::Arc; + +use xapi::{ + hyper::{Body, Request, Response}, + rpc::message::{RpcError, RpcRequest}, +}; + +use crate::XcpMetricsShared; + +#[tracing::instrument(skip_all)] +pub async fn route( + shared: Arc, + request: RpcRequest, +) -> anyhow::Result> { + tracing::info!("RPC: Message: {request}"); + + if let Some(route) = shared.clone().rpc_routes.get(request.get_name()) { + route.run(shared, request).await + } else { + tracing::error!("RPC: Method not found: {request}"); + RpcError::respond_to::<()>(Some(&request), -32601, "Method not found", None) + } +} + +#[tracing::instrument(skip_all)] +pub async fn entrypoint( + shared: Arc, + request: Request, +) -> anyhow::Result> { + tracing::debug!("RPC: {request:#?}"); + + let request = RpcRequest::from_http(request).await; + + match request { + Ok(request) => route(shared, request).await, + Err(err) => { + tracing::error!("RPC: Parse error: {err}"); + RpcError::respond_to(None, -32700, "Parse error", Some(err.to_string())) + } + } +} diff --git a/xcp-metrics/src/rpc/routes/deregister.rs b/xcp-metrics/src/rpc/routes/deregister.rs new file mode 100644 index 0000000..565de99 --- /dev/null +++ b/xcp-metrics/src/rpc/routes/deregister.rs @@ -0,0 +1,45 @@ +//! RPC route for `Plugin.Local.deregister`. +use std::sync::Arc; + +use futures::future::BoxFuture; +use xapi::{ + hyper::{Body, Response}, + rpc::{ + message::{RpcRequest, RpcResponse}, + methods::PluginLocalDeregister, + XcpRpcMethodNamed, + }, +}; + +use super::XcpRpcRoute; +use crate::XcpMetricsShared; + +#[derive(Clone, Copy, Default)] +pub struct PluginLocalDeregisterRoute; + +impl XcpRpcRoute for PluginLocalDeregisterRoute { + fn run( + &self, + shared: Arc, + request: RpcRequest, + ) -> BoxFuture<'static, anyhow::Result>> { + Box::pin(async move { + let deregister_rpc: PluginLocalDeregister = request + .clone() + .try_into_method() + .ok_or_else(|| anyhow::anyhow!("No value provided"))?; + + if let Some((name, handle)) = shared.plugins.remove(deregister_rpc.uid.as_str()) { + tracing::info!("RPC: Unregistered {name}"); + + handle.abort(); + } + + RpcResponse::respond_to(&request, "Done") + }) + } + + fn get_name(&self) -> &'static str { + PluginLocalDeregister::get_method_name() + } +} diff --git a/xcp-metrics/src/rpc/routes/get_formats.rs b/xcp-metrics/src/rpc/routes/get_formats.rs new file mode 100644 index 0000000..edfba1c --- /dev/null +++ b/xcp-metrics/src/rpc/routes/get_formats.rs @@ -0,0 +1,40 @@ +//! RPC route for `Plugin.Metrics.get_formats`. +use std::sync::Arc; + +use futures::future::BoxFuture; +use xapi::{ + hyper::{Body, Response}, + rpc::{ + message::{RpcRequest, RpcResponse}, + methods::PluginMetricsGetVersions, + response::PluginMetricsVersionsResponse, + XcpRpcMethodNamed, + }, +}; + +use super::XcpRpcRoute; +use crate::XcpMetricsShared; + +#[derive(Default)] +pub struct PluginMetricsGetVersionsRoute; + +impl XcpRpcRoute for PluginMetricsGetVersionsRoute { + fn run( + &self, + _shared: Arc, + request: RpcRequest, + ) -> BoxFuture<'static, anyhow::Result>> { + Box::pin(async move { + RpcResponse::respond_to( + &request, + PluginMetricsVersionsResponse { + versions: vec!["OpenMetrics 1.0.0".to_string()], + }, + ) + }) + } + + fn get_name(&self) -> &'static str { + PluginMetricsGetVersions::get_method_name() + } +} diff --git a/xcp-metrics/src/rpc/routes/mod.rs b/xcp-metrics/src/rpc/routes/mod.rs new file mode 100644 index 0000000..b255a4c --- /dev/null +++ b/xcp-metrics/src/rpc/routes/mod.rs @@ -0,0 +1,98 @@ +//! RPC routes +mod deregister; +mod get_formats; +mod next_reading; +mod register; +mod register_v3; + +use futures::future::BoxFuture; +use std::{collections::HashMap, sync::Arc}; + +use xapi::{ + hyper::{Body, Response}, + rpc::{ + message::RpcRequest, + methods::{ + PluginLocalDeregister, PluginLocalNextReading, PluginLocalRegister, + PluginMetricsDeregister, PluginMetricsGetVersions, PluginMetricsRegister, + }, + XcpRpcMethodNamed, + }, +}; + +use self::{ + deregister::PluginLocalDeregisterRoute, get_formats::PluginMetricsGetVersionsRoute, + next_reading::PluginLocalNextReadingRoute, register::PluginLocalRegisterRoute, + register_v3::PluginMetricsRegisterRoute, +}; +use crate::{publishers::openmetrics::OpenMetricsRoute, XcpMetricsShared}; + +pub trait XcpRpcRoute: 'static + Sync + Send { + fn run( + &self, + shared: Arc, + request: RpcRequest, + ) -> BoxFuture<'static, anyhow::Result>>; + + fn make_route() -> Box + where + Self: Default, + { + Box::::default() + } + + fn get_name(&self) -> &'static str { + "(Unammed)" + } +} + +impl std::fmt::Debug for dyn XcpRpcRoute { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.get_name()) + } +} + +#[derive(Debug)] +pub struct RpcRoutes(HashMap<&'static str, Box>); + +impl Default for RpcRoutes { + fn default() -> Self { + Self( + [ + ("OpenMetrics", OpenMetricsRoute::make_route()), + ( + PluginLocalRegister::get_method_name(), + PluginLocalRegisterRoute::make_route(), + ), + ( + PluginLocalDeregister::get_method_name(), + PluginLocalDeregisterRoute::make_route(), + ), + ( + PluginLocalNextReading::get_method_name(), + PluginLocalNextReadingRoute::make_route(), + ), + ( + PluginMetricsGetVersions::get_method_name(), + PluginMetricsGetVersionsRoute::make_route(), + ), + ( + PluginMetricsRegister::get_method_name(), + PluginMetricsRegisterRoute::make_route(), + ), + ( + PluginMetricsDeregister::get_method_name(), + PluginLocalDeregisterRoute::make_route(), + ), + ] + .into_iter() + .collect(), + ) + } +} + +impl RpcRoutes { + pub fn get(&self, name: &str) -> Option<&dyn XcpRpcRoute> { + self.0.get(name).map(|r| r.as_ref()) + } +} diff --git a/xcp-metrics/src/rpc/routes/next_reading.rs b/xcp-metrics/src/rpc/routes/next_reading.rs new file mode 100644 index 0000000..2098d23 --- /dev/null +++ b/xcp-metrics/src/rpc/routes/next_reading.rs @@ -0,0 +1,28 @@ +//! RPC route for `Plugin.Local.next_reading`. +use std::sync::Arc; + +use futures::future::BoxFuture; +use xapi::{ + hyper::{Body, Response}, + rpc::message::{RpcRequest, RpcResponse}, +}; + +use super::XcpRpcRoute; +use crate::XcpMetricsShared; + +#[derive(Clone, Copy, Default)] +pub struct PluginLocalNextReadingRoute; + +impl XcpRpcRoute for PluginLocalNextReadingRoute { + fn run( + &self, + _shared: Arc, + request: RpcRequest, + ) -> BoxFuture<'static, anyhow::Result>> { + Box::pin(async move { + RpcResponse::respond_to( + &request, /* next_reading: */ 5.0, /* Same as register */ + ) + }) + } +} diff --git a/xcp-metrics/src/rpc/routes/register.rs b/xcp-metrics/src/rpc/routes/register.rs new file mode 100644 index 0000000..528ecc8 --- /dev/null +++ b/xcp-metrics/src/rpc/routes/register.rs @@ -0,0 +1,78 @@ +//! RPC route for `Plugin.Local.register`. +use std::sync::Arc; + +use futures::future::BoxFuture; +use xapi::{ + hyper::{Body, Response}, + rpc::{ + message::{RpcRequest, RpcResponse}, + methods::PluginLocalRegister, + XcpRpcMethodNamed, + }, +}; + +use super::XcpRpcRoute; +use crate::{ + providers::{protocol_v2::ProtocolV2Provider, protocol_v3::ProtocolV3Provider, Provider}, + XcpMetricsShared, +}; + +#[derive(Clone, Copy, Default)] +pub struct PluginLocalRegisterRoute; + +impl XcpRpcRoute for PluginLocalRegisterRoute { + fn run( + &self, + shared: Arc, + request: RpcRequest, + ) -> BoxFuture<'static, anyhow::Result>> { + Box::pin(async move { + let register_rpc: PluginLocalRegister = request + .clone() + .try_into_method() + .ok_or_else(|| anyhow::anyhow!("No value provided"))?; + + if shared // check if plugin exists and is active + .plugins + .get(register_rpc.uid.as_str()) + .map(|handle| !handle.is_finished()) + .is_none() + { + let plugin_handle = match register_rpc.protocol.as_str() { + "V2" => { + tracing::info!(uid = register_rpc.uid, "Starting protocol v2 provider"); + ProtocolV2Provider::new(®ister_rpc.uid) + .start_provider(shared.hub_channel.clone()) + } + "V3" => { + tracing::info!(uid = register_rpc.uid, "Starting protocol v3 provider"); + ProtocolV3Provider::new(®ister_rpc.uid) + .start_provider(shared.hub_channel.clone()) + } + _ => { + anyhow::bail!("Unknown or unsupported protocol {}", register_rpc.protocol); + } + }; + + shared + .plugins + .insert(register_rpc.uid.into(), plugin_handle); + } else { + tracing::warn!( + "Attempted to register an already registered plugin {}", + register_rpc.uid + ); + } + + RpcResponse::respond_to( + &request, + /* next_reading: */ + 5.0, /* all provider readings are independant, thus this is always 5 */ + ) + }) + } + + fn get_name(&self) -> &'static str { + PluginLocalRegister::get_method_name() + } +} diff --git a/xcp-metrics/src/rpc/routes/register_v3.rs b/xcp-metrics/src/rpc/routes/register_v3.rs new file mode 100644 index 0000000..f78b1a6 --- /dev/null +++ b/xcp-metrics/src/rpc/routes/register_v3.rs @@ -0,0 +1,71 @@ +//! RPC route for `Plugin.Metrics.register_v3`. +use std::sync::Arc; + +use futures::future::BoxFuture; +use xapi::{ + hyper::{Body, Response}, + rpc::{ + message::{RpcError, RpcRequest, RpcResponse}, + methods::{PluginLocalRegister, PluginMetricsRegister}, + XcpRpcMethodNamed, + }, +}; + +use super::XcpRpcRoute; +use crate::{ + providers::{protocol_v3::ProtocolV3Provider, Provider}, + XcpMetricsShared, +}; + +#[derive(Clone, Copy, Default)] +pub struct PluginMetricsRegisterRoute; + +impl XcpRpcRoute for PluginMetricsRegisterRoute { + fn run( + &self, + shared: Arc, + request: RpcRequest, + ) -> BoxFuture<'static, anyhow::Result>> { + Box::pin(async move { + let register_rpc: PluginMetricsRegister = request + .clone() + .try_into_method() + .ok_or_else(|| anyhow::anyhow!("No value provided"))?; + + if register_rpc.version != "OpenMetrics 1.0.0" { + return RpcError::respond_to( + Some(&request), + -32000, + "Unsupported OpenMetrics version", + Some(register_rpc.version), + ); + } + + if shared // check if plugin exists and is active + .plugins + .get(register_rpc.uid.as_str()) + .map(|handle| !handle.is_finished()) + .is_none() + { + tracing::info!(uid = register_rpc.uid, "Starting protocol v3 provider"); + let plugin_handle = ProtocolV3Provider::new(®ister_rpc.uid) + .start_provider(shared.hub_channel.clone()); + + shared + .plugins + .insert(register_rpc.uid.into(), plugin_handle); + } else { + tracing::warn!( + "Attempted to register an already registered plugin {}", + register_rpc.uid + ); + } + + RpcResponse::respond_to(&request, "OK") + }) + } + + fn get_name(&self) -> &'static str { + PluginLocalRegister::get_method_name() + } +}