diff --git a/Cargo.toml b/Cargo.toml index c983a396..7e130003 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ readme = "README.md" [features] default = ["datadog"] -datadog = ["datadog-client"] +datadog = ["ureq"] [dependencies] loggerv = "0.7.2" @@ -29,7 +29,7 @@ protobuf = "2.20.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -datadog-client = { version = "0.1", optional = true } +ureq = { version = "2.0.2", features = ["json"], optional = true } [profile.release] lto = true diff --git a/src/exporters/datadog.rs b/src/exporters/datadog.rs index 3c8abb0d..245bd017 100644 --- a/src/exporters/datadog.rs +++ b/src/exporters/datadog.rs @@ -1,11 +1,178 @@ use crate::exporters::*; use crate::sensors::{Sensor, Topology}; -use datadog_client::client::{Client, Config}; -use datadog_client::metrics::{Point, Serie, Type}; +use serde::ser::SerializeSeq; +use serde::{Serialize, Serializer}; use std::collections::HashMap; use std::thread; use std::time::{Duration, Instant}; +#[derive(Clone, Debug)] +pub enum Type { + Count, + Gauge, + Rate, +} + +impl Type { + pub fn as_str(&self) -> &str { + match self { + Self::Count => "count", + Self::Gauge => "gauge", + Self::Rate => "rate", + } + } +} + +impl Serialize for Type { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(self.as_str()) + } +} + +#[derive(Clone, Debug)] +pub struct Point { + timestamp: u64, + value: f64, +} + +impl Point { + pub fn new(timestamp: u64, value: f64) -> Self { + Self { timestamp, value } + } +} + +impl Serialize for Point { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(Some(2))?; + seq.serialize_element(&self.timestamp)?; + seq.serialize_element(&self.value)?; + seq.end() + } +} + +/// # Examples +/// +/// ``` +/// use datadog_client::metrics::{Point, Serie, Type}; +/// +/// let serie = Serie::new("cpu.usage", Type::Gauge) +/// .set_host("raspberrypi") +/// .set_interval(42) +/// .set_points(vec![]) +/// .add_point(Point::new(123456, 12.34)) +/// .set_tags(vec![]) +/// .add_tag(String::from("whatever:tag")); +/// ``` +#[derive(Debug, Clone, Serialize)] +pub struct Serie { + // The name of the host that produced the metric. + #[serde(skip_serializing_if = "Option::is_none")] + host: Option, + // If the type of the metric is rate or count, define the corresponding interval. + #[serde(skip_serializing_if = "Option::is_none")] + interval: Option, + // The name of the timeseries. + metric: String, + // Points relating to a metric. All points must be tuples with timestamp and a scalar value (cannot be a string). + // Timestamps should be in POSIX time in seconds, and cannot be more than ten minutes in the future or more than one hour in the past. + points: Vec, + // A list of tags associated with the metric. + tags: Vec, + // The type of the metric either count, gauge, or rate. + #[serde(rename = "type")] + dtype: Type, +} + +impl Serie { + pub fn new(metric: &str, dtype: Type) -> Self { + Self { + host: None, + interval: None, + metric: metric.to_string(), + points: Vec::new(), + tags: Vec::new(), + dtype, + } + } +} + +impl Serie { + pub fn set_host(mut self, host: &str) -> Self { + self.host = Some(host.to_string()); + self + } + + pub fn set_interval(mut self, interval: i64) -> Self { + self.interval = Some(interval); + self + } + + pub fn set_points(mut self, points: Vec) -> Self { + self.points = points; + self + } + + pub fn add_point(mut self, point: Point) -> Self { + self.points.push(point); + self + } +} + +impl Serie { + pub fn set_tags(mut self, tags: Vec) -> Self { + self.tags = tags; + self + } + + pub fn add_tag(mut self, tag: String) -> Self { + self.tags.push(tag); + self + } +} + +struct Client { + host: String, + api_key: String, +} + +impl Client { + pub fn new(parameters: &ArgMatches) -> Self { + Self { + host: parameters.value_of("host").unwrap().to_string(), + api_key: parameters.value_of("api_key").unwrap().to_string(), + } + } + + pub fn send(&self, series: &[Serie]) { + let url = format!("{}/api/v1/series", self.host); + let request = ureq::post(url.as_str()) + .set("DD-API-KEY", self.api_key.as_str()) + .send_json(serde_json::json!({ "series": series })); + match request { + Ok(response) => { + if response.status() >= 400 { + log::warn!( + "couldn't send metrics to datadog: status {}", + response.status_text() + ); + if let Ok(body) = response.into_string() { + log::warn!("response from server: {}", body); + } + } else { + log::info!("metrics sent with success"); + } + } + Err(err) => log::warn!("error while sending metrics: {}", err), + }; + } +} + fn merge(first: Vec, second: Vec) -> Vec { second.into_iter().fold(first, |mut res, item| { res.push(item); @@ -79,15 +246,8 @@ impl DatadogExporter { } } - fn build_client(parameters: &ArgMatches) -> Client { - let config = Config::new( - parameters.value_of("host").unwrap().to_string(), - parameters.value_of("api_key").unwrap().to_string(), - ); - Client::new(config) - } - - fn runner(&mut self, parameters: &ArgMatches) { + fn runner(&mut self, parameters: &ArgMatches<'_>) { + let client = Client::new(parameters); if let Some(timeout) = parameters.value_of("timeout") { let now = Instant::now(); let timeout = timeout @@ -110,18 +270,18 @@ impl DatadogExporter { info!("Measurement step is: {}s", step_duration); while now.elapsed().as_secs() <= timeout { - self.iterate(parameters); + self.iterate(&client); thread::sleep(Duration::new(step_duration, step_duration_nano)); } } else { - self.iterate(parameters); + self.iterate(&client); } } - fn iterate(&mut self, parameters: &ArgMatches) { + fn iterate(&mut self, client: &Client) { self.topology.refresh(); - let _series = self.collect_series(); - let _client = Self::build_client(parameters); + let series = self.collect_series(); + client.send(&series); } fn create_consumption_serie(&self) -> Serie { diff --git a/src/lib.rs b/src/lib.rs index 6d56df82..9f1e9431 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,8 +5,8 @@ pub mod exporters; pub mod sensors; use clap::ArgMatches; use exporters::{ - json::JSONExporter, prometheus::PrometheusExporter, qemu::QemuExporter, - riemann::RiemannExporter, stdout::StdoutExporter, Exporter, ExporterOption, + datadog::DatadogExporter, json::JSONExporter, prometheus::PrometheusExporter, + qemu::QemuExporter, riemann::RiemannExporter, stdout::StdoutExporter, Exporter, ExporterOption, }; use sensors::{powercap_rapl::PowercapRAPLSensor, Sensor}; use std::collections::HashMap; @@ -52,32 +52,44 @@ fn get_sensor(matches: &ArgMatches) -> Box { pub fn run(matches: ArgMatches) { loggerv::init_with_verbosity(matches.occurrences_of("v")).unwrap(); - let sensor_boxed = get_sensor(&matches); - let exporter_parameters; - if let Some(stdout_exporter_parameters) = matches.subcommand_matches("stdout") { - exporter_parameters = stdout_exporter_parameters.clone(); - let mut exporter = StdoutExporter::new(sensor_boxed); + let exporter_parameters = stdout_exporter_parameters.clone(); + let mut exporter = StdoutExporter::new(get_sensor(&matches)); exporter.run(exporter_parameters); - } else if let Some(json_exporter_parameters) = matches.subcommand_matches("json") { - exporter_parameters = json_exporter_parameters.clone(); - let mut exporter = JSONExporter::new(sensor_boxed); + return; + } + if let Some(json_exporter_parameters) = matches.subcommand_matches("json") { + let exporter_parameters = json_exporter_parameters.clone(); + let mut exporter = JSONExporter::new(get_sensor(&matches)); exporter.run(exporter_parameters); - } else if let Some(riemann_exporter_parameters) = matches.subcommand_matches("riemann") { - exporter_parameters = riemann_exporter_parameters.clone(); - let mut exporter = RiemannExporter::new(sensor_boxed); + return; + } + if let Some(riemann_exporter_parameters) = matches.subcommand_matches("riemann") { + let exporter_parameters = riemann_exporter_parameters.clone(); + let mut exporter = RiemannExporter::new(get_sensor(&matches)); exporter.run(exporter_parameters); - } else if let Some(prometheus_exporter_parameters) = matches.subcommand_matches("prometheus") { - exporter_parameters = prometheus_exporter_parameters.clone(); - let mut exporter = PrometheusExporter::new(sensor_boxed); + return; + } + if let Some(prometheus_exporter_parameters) = matches.subcommand_matches("prometheus") { + let exporter_parameters = prometheus_exporter_parameters.clone(); + let mut exporter = PrometheusExporter::new(get_sensor(&matches)); exporter.run(exporter_parameters); - } else if let Some(qemu_exporter_parameters) = matches.subcommand_matches("qemu") { - exporter_parameters = qemu_exporter_parameters.clone(); - let mut exporter = QemuExporter::new(sensor_boxed); + return; + } + if let Some(qemu_exporter_parameters) = matches.subcommand_matches("qemu") { + let exporter_parameters = qemu_exporter_parameters.clone(); + let mut exporter = QemuExporter::new(get_sensor(&matches)); exporter.run(exporter_parameters); - } else { - error!("Couldn't determine which exporter has been chosen."); + return; } + #[cfg(feature = "datadog")] + if let Some(datadog_exporter_parameters) = matches.subcommand_matches("datadog") { + let exporter_parameters = datadog_exporter_parameters.clone(); + let mut exporter = DatadogExporter::new(get_sensor(&matches)); + exporter.run(exporter_parameters); + return; + } + error!("Couldn't determine which exporter has been chosen."); } /// Returns options needed for each exporter as a HashMap. @@ -104,6 +116,11 @@ pub fn get_exporters_options() -> HashMap "Prometheus exporter exposes power consumption metrics on an http endpoint (/metrics is default) in prometheus accepted format", "riemann" => "Riemann exporter sends power consumption metrics to a Riemann server", "qemu" => "Qemu exporter watches all Qemu/KVM virtual machines running on the host and exposes metrics of each of them in a dedicated folder", + #[cfg(feature = "datadog")] + "datadog" => "Datadog exporter sends power consumption metrics to Datadog", _ => "Unknown exporter", } );