From 3af3d2da5757384496d3e995f58a0c57dc535fe9 Mon Sep 17 00:00:00 2001 From: Adam Wegrzynek Date: Thu, 3 Aug 2017 11:34:46 +0200 Subject: [PATCH] Sending multiple metrics (values) in a single measurement (#15) --- CMakeLists.txt | 1 + examples/5-Benchmark.cxx | 22 +++++++++++++---- examples/8-Multiple.cxx | 17 ++++++++++++++ include/Monitoring/Collector.h | 8 ++++++- include/Monitoring/Metric.h | 4 ++++ src/Backends/InfluxDB.cxx | 43 +++++++++++++++++++++++++++------- src/Backends/InfluxDB.h | 10 ++++++++ src/Collector.cxx | 31 ++++++++++++++++++++++-- src/Metric.cxx | 5 ++++ 9 files changed, 124 insertions(+), 17 deletions(-) create mode 100644 examples/8-Multiple.cxx diff --git a/CMakeLists.txt b/CMakeLists.txt index 06db6ceb2..177a2c965 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -164,6 +164,7 @@ set(EXAMPLES examples/5-Benchmark.cxx examples/6-DedicatedInstance.cxx examples/7-Latency.cxx + examples/8-Multiple.cxx ) foreach (example ${EXAMPLES}) diff --git a/examples/5-Benchmark.cxx b/examples/5-Benchmark.cxx index 0b06c279b..796a27f4a 100644 --- a/examples/5-Benchmark.cxx +++ b/examples/5-Benchmark.cxx @@ -25,6 +25,7 @@ int main(int argc, char *argv[]) { ("config", boost::program_options::value()->required(), "Config file path") ("id", boost::program_options::value(), "Instance ID") ("count", boost::program_options::value(), "Number of metric bunches (x3)") + ("multiple", boost::program_options::bool_switch()->default_value(false), "Sends multiple metrics per measurement") ; boost::program_options::variables_map vm; @@ -52,10 +53,21 @@ int main(int argc, char *argv[]) { add = 1; } - for (int i = 0; i <= count; i += add) { - Monitoring::Get().send({"string" + std::to_string(intDist(mt)), "stringMetric"}); - Monitoring::Get().send({doubleDist(mt), "doubleMetric"}); - Monitoring::Get().send({intDist(mt), "intMetric"}); - std::this_thread::sleep_for(std::chrono::microseconds(sleep)); + if (!vm["multiple"].as()) { + for (int i = 0; i <= count; i += add) { + Monitoring::Get().send({"string" + std::to_string(intDist(mt)), "stringMetric"}); + Monitoring::Get().send({doubleDist(mt), "doubleMetric"}); + Monitoring::Get().send({intDist(mt), "intMetric"}); + std::this_thread::sleep_for(std::chrono::microseconds(sleep)); + } + } else { + for (int i = 0; i <= count; i += add) { + Monitoring::Get().send("benchmarkMeasurement",{ + {"string" + std::to_string(intDist(mt)), "stringMetric"}, + {doubleDist(mt), "doubleMetric"}, + {intDist(mt), "intMetric"} + }); + std::this_thread::sleep_for(std::chrono::microseconds(sleep)); + } } } diff --git a/examples/8-Multiple.cxx b/examples/8-Multiple.cxx new file mode 100644 index 000000000..763c1221c --- /dev/null +++ b/examples/8-Multiple.cxx @@ -0,0 +1,17 @@ +/// +/// \file 8-Multiple.cxx +/// \author Adam Wegrzynek +/// + +#include "ExampleBoilerplate.cxx" +#include "Monitoring/MonitoringFactory.h" + +using Monitoring = AliceO2::Monitoring::MonitoringFactory; + +int main(int argc, char *argv[]) { + + // configure monitoring (once per process), pass configuration path as parameter + Monitoring::Configure("file://" + GetConfigFromCmdLine(argc, argv)); + + Monitoring::Get().send("measurementName", {{10, "myMetricInt"}, {10.10, "myMetricFloat"}}); +} diff --git a/include/Monitoring/Collector.h b/include/Monitoring/Collector.h index d73319899..cadb3f362 100644 --- a/include/Monitoring/Collector.h +++ b/include/Monitoring/Collector.h @@ -56,7 +56,13 @@ class Collector /// Sends a metric to all avaliabes backends /// If metric has been added to DerivedMetric the derived metric is calculated (see addDerivedMetric method) /// \param metric r-value to metric object - void send(Metric&& metric); + void send(Metric&& metric, std::size_t skipBackend = -1); + + /// Sends multiple metrics to as a single measurement + /// If it's not supported by backend it fallbacks into sending multiple metrics + /// \param name measurement name + /// \param metrics list of metrics + void send(std::string name, std::vector&& metrics); /// Sends a metric with tagset to all avaliabes backends /// If metric has been added to DerivedMetric the derived metric is calculated (see addDerivedMetric method) diff --git a/include/Monitoring/Metric.h b/include/Monitoring/Metric.h index 372d8b77e..38a974c4c 100644 --- a/include/Monitoring/Metric.h +++ b/include/Monitoring/Metric.h @@ -61,6 +61,10 @@ class Metric /// \return metric name std::string getName() const; + /// Name setter + /// \param new name of the metric + void setName(std::string name); + /// Timestamp getter /// \return metric timestamp std::chrono::time_point getTimestamp() const; diff --git a/src/Backends/InfluxDB.cxx b/src/Backends/InfluxDB.cxx index d22f1bcf8..cba44a673 100644 --- a/src/Backends/InfluxDB.cxx +++ b/src/Backends/InfluxDB.cxx @@ -49,6 +49,26 @@ void InfluxDB::escape(std::string& escaped) boost::replace_all(escaped, " ", "\\ "); } +void InfluxDB::sendMultiple(std::string name, std::vector&& metrics) +{ + escape(name); + std::stringstream convert; + convert << name << "," << tagSet << " "; + + for (const auto& metric : metrics) { + std::string value = boost::lexical_cast(metric.getValue()); + prepareValue(value, metric.getType()); + convert << metric.getName() << "=" << value << ","; + } + convert.seekp(-1, std::ios_base::end); + convert << " " << convertTimestamp(metrics.back().getTimestamp()); + + try { + transport->send(convert.str()); + } catch (MonitoringInternalException&) { + } +} + void InfluxDB::send(const Metric& metric) { std::string metricTags{}; @@ -57,15 +77,7 @@ void InfluxDB::send(const Metric& metric) } std::string value = boost::lexical_cast(metric.getValue()); - if (metric.getType() == MetricType::STRING) { - escape(value); - value.insert(value.begin(), '"'); - value.insert(value.end(), '"'); - } - - if (metric.getType() == MetricType::INT) { - value.insert(value.end(), 'i'); - } + prepareValue(value, metric.getType()); std::string name = metric.getName(); escape(name); @@ -78,6 +90,19 @@ void InfluxDB::send(const Metric& metric) } } +void InfluxDB::prepareValue(std::string& value, int type) +{ + if (type == MetricType::STRING) { + escape(value); + value.insert(value.begin(), '"'); + value.insert(value.end(), '"'); + } + + if (type == MetricType::INT) { + value.insert(value.end(), 'i'); + } +} + void InfluxDB::addGlobalTag(std::string name, std::string value) { escape(name); escape(value); diff --git a/src/Backends/InfluxDB.h b/src/Backends/InfluxDB.h index b1ac8c169..7f536d4f6 100644 --- a/src/Backends/InfluxDB.h +++ b/src/Backends/InfluxDB.h @@ -50,6 +50,11 @@ class InfluxDB final : public Backend /// \param metric reference to metric object void send(const Metric& metric) override; + /// Sends multiple values in single measurement + /// \param name measurement name + /// \param metrics list of metrics + void sendMultiple(std::string name, std::vector&& metrics); + /// Adds tag /// \param name tag name /// \param value tag value @@ -62,6 +67,11 @@ class InfluxDB final : public Backend /// Escapes " ", "," and "=" characters /// \param escaped string rerference to escape characters from void escape(std::string& escaped); + + /// Modifies values to Influx Line Protocol format + /// \param value reference to value + /// \param type type of the metric + void prepareValue(std::string& value, int type); }; } // namespace Backends diff --git a/src/Collector.cxx b/src/Collector.cxx index 8e95e527e..9fd631ee7 100644 --- a/src/Collector.cxx +++ b/src/Collector.cxx @@ -79,7 +79,7 @@ Collector::Collector(const std::string& configPath) MonLogger::Get() << "InfluxDB/HTTP backend disabled" << MonLogger::End(); } #endif - + if (configFile->get("Flume/enable").value_or(0) == 1) { mBackends.emplace_back(std::make_unique( configFile->get("Flume/hostname").value(), @@ -154,9 +154,36 @@ void Collector::addDerivedMetric(std::string name, DerivedMetricMode mode) { mDerivedHandler->registerMetric(name, mode); } -void Collector::send(Metric&& metric) +void Collector::send(std::string measurement, std::vector&& metrics) +{ + // find InfluxDB index + size_t influxIndex = -1; + for (auto& b: mBackends) { + if (dynamic_cast(b.get())) { + influxIndex = &b-&mBackends[0]; + } + } + // send single metric to InfluxDB + dynamic_cast( + mBackends[influxIndex].get())->sendMultiple(measurement, std::move(metrics) + ); + + // send multiple metric to all other backends (prepend metric name with measurement name) + for (auto& m : metrics) { + std::string tempName = m.getName(); + m.setName(measurement + "-" + m.getName()); + send(std::move(m), influxIndex); + m.setName(tempName); + } +} + +void Collector::send(Metric&& metric, std::size_t skipBackend) { + std::size_t index = 0; for (auto& b: mBackends) { + if (index++ == skipBackend) { + continue; + } b->send(metric); } if (mDerivedHandler->isRegistered(metric.getName())) { diff --git a/src/Metric.cxx b/src/Metric.cxx index cc3199893..ede20a0d2 100644 --- a/src/Metric.cxx +++ b/src/Metric.cxx @@ -30,6 +30,11 @@ std::string Metric::getName() const return mName; } +void Metric::setName(std::string name) +{ + mName = name; +} + Metric::Metric(int value, const std::string& name, std::chrono::time_point timestamp) : mValue(value), mName(name), mTimestamp(timestamp) {}