Skip to content

Commit

Permalink
Add prometheus-cpp, drop legacy telemetry API
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed May 4, 2024
1 parent 48660b7 commit 2433a72
Show file tree
Hide file tree
Showing 46 changed files with 367 additions and 4,822 deletions.
46 changes: 46 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
cmake_minimum_required(VERSION 3.15 FATAL_ERROR)
project(broker C CXX)
include(CMakePackageConfigHelpers)
include(FetchContent)
include(GNUInstallDirs)
include(cmake/CommonCMakeConfig.cmake)

Expand Down Expand Up @@ -208,6 +209,51 @@ function(add_bundled_caf)
add_subdirectory(caf EXCLUDE_FROM_ALL)
endfunction()

FetchContent_Declare(
dl_prometheus_cpp
GIT_REPOSITORY https://github.com/jupp0r/prometheus-cpp.git
GIT_TAG v1.2.4)

# Bundle prometheus-cpp setup as an interface library.
add_library(broker-prometheus-cpp INTERFACE)

function(add_prometheus_cpp)
# Skip if already configured.
# Get prometheus-cpp if the target is not already available.
if(NOT TARGET prometheus-cpp::core)
set(BUILD_SHARED_LIBS OFF) # bundle prometheus-cpp as static library
set(ENABLE_PUSH OFF)
set(ENABLE_TESTING OFF)
set(GENERATE_PKGCONFIG OFF)
set(CIVETWEB_ENABLE_DEBUG_TOOLS OFF)
set(OVERRIDE_CXX_STANDARD_FLAGS OFF)
if(NOT dl_prometheus_cpp_POPULATED)
FetchContent_Populate(dl_prometheus_cpp)
add_subdirectory(${dl_prometheus_cpp_SOURCE_DIR}
${dl_prometheus_cpp_BINARY_DIR}
EXCLUDE_FROM_ALL)
endif()
# Tell CMake to look for the headers in the build tree.
target_include_directories(
broker-prometheus-cpp
INTERFACE
$<BUILD_INTERFACE:${dl_prometheus_cpp_SOURCE_DIR}/core/include>
$<BUILD_INTERFACE:${dl_prometheus_cpp_BINARY_DIR}/core/include>
$<BUILD_INTERFACE:${dl_prometheus_cpp_SOURCE_DIR}/pull/include>
$<BUILD_INTERFACE:${dl_prometheus_cpp_BINARY_DIR}/pull/include>)
endif()
# Link against prometheus-cpp.
target_link_libraries(
broker-prometheus-cpp
INTERFACE
prometheus-cpp::core
prometheus-cpp::pull)
# Prometheus-cpp sets cxx_std_11, but we need cxx_std_17.
target_compile_features(broker-prometheus-cpp INTERFACE cxx_std_17)
endfunction()

add_prometheus_cpp()

# NOTE: building and linking against an external CAF version is NOT supported!
# This variable is FOR DEVELOPMENT ONLY. The only officially supported CAF
# version is the bundled version!
Expand Down
41 changes: 23 additions & 18 deletions libbroker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,10 @@ set(BROKER_SRC
broker/internal/json_type_mapper.cc
broker/internal/master_actor.cc
broker/internal/master_resolver.cc
broker/internal/metric_collector.cc
broker/internal/metric_exporter.cc
broker/internal/metric_factory.cc
broker/internal/metric_scraper.cc
broker/internal/metric_view.cc
broker/internal/peering.cc
broker/internal/pending_connection.cc
broker/internal/println.cc
broker/internal/prometheus.cc
broker/internal/store_actor.cc
broker/internal/web_socket.cc
broker/internal/wire_format.cc
Expand All @@ -83,12 +78,6 @@ set(BROKER_SRC
broker/store_event.cc
broker/subnet.cc
broker/subscriber.cc
broker/telemetry/counter.cc
broker/telemetry/gauge.cc
broker/telemetry/histogram.cc
broker/telemetry/metric_family.cc
broker/telemetry/metric_registry.cc
broker/telemetry/metric_registry_impl.cc
broker/time.cc
broker/topic.cc
broker/variant.cc
Expand All @@ -110,7 +99,13 @@ if (ENABLE_SHARED)
MACOSX_RPATH true
OUTPUT_NAME broker)
target_link_libraries(broker PUBLIC ${LINK_LIBS})
target_link_libraries(broker PRIVATE CAF::core CAF::io CAF::net)
target_link_libraries(
broker
PRIVATE
CAF::core
CAF::io
CAF::net
broker-prometheus-cpp)
install(TARGETS broker
EXPORT BrokerTargets
DESTINATION ${CMAKE_INSTALL_LIBDIR})
Expand All @@ -127,7 +122,13 @@ if (ENABLE_STATIC)
set_target_properties(broker_static PROPERTIES POSITION_INDEPENDENT_CODE ON)
endif()
target_link_libraries(broker_static PUBLIC ${LINK_LIBS})
target_link_libraries(broker_static PRIVATE CAF::core CAF::io CAF::net)
target_link_libraries(
broker_static
PRIVATE
CAF::core
CAF::io
CAF::net
broker-prometheus-cpp)
install(TARGETS broker_static
EXPORT BrokerTargets
DESTINATION ${CMAKE_INSTALL_LIBDIR})
Expand Down Expand Up @@ -172,8 +173,6 @@ set(BROKER_TEST_SRC
broker/internal/channel.test.cc
broker/internal/core_actor.test.cc
broker/internal/json.test.cc
broker/internal/metric_collector.test.cc
broker/internal/metric_exporter.test.cc
broker/internal/wire_format.test.cc
broker/master.test.cc
broker/peering.test.cc
Expand All @@ -185,7 +184,6 @@ set(BROKER_TEST_SRC
broker/store.test.cc
broker/store_event.test.cc
broker/subscriber.test.cc
broker/telemetry/histogram.test.cc
broker/topic.test.cc
broker/variant.test.cc
broker/zeek.test.cc
Expand All @@ -199,8 +197,15 @@ set(BROKER_TEST_SRC
# endif()

add_executable(broker-test ${BROKER_TEST_SRC})
target_link_libraries(broker-test PRIVATE
${main_lib_target} CAF::core CAF::io CAF::net CAF::test)
target_link_libraries(
broker-test
PRIVATE
${main_lib_target}
CAF::core
CAF::io
CAF::net
CAF::test
broker-prometheus-cpp)

foreach(file_path ${BROKER_TEST_SRC})
get_filename_component(test_dir ${file_path} DIRECTORY)
Expand Down
183 changes: 24 additions & 159 deletions libbroker/broker/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "broker/internal/json_client.hh"
#include "broker/internal/json_type_mapper.hh"
#include "broker/internal/logger.hh"
#include "broker/internal/metric_exporter.hh"
#include "broker/internal/prometheus.hh"
#include "broker/internal/type_id.hh"
#include "broker/internal/web_socket.hh"
#include "broker/port.hh"
Expand All @@ -27,6 +25,7 @@
#include <caf/config.hpp>
#include <caf/cow_string.hpp>
#include <caf/error.hpp>
#include <caf/event_based_actor.hpp>
#include <caf/exit_reason.hpp>
#include <caf/flow/observable.hpp>
#include <caf/io/network/default_multiplexer.hpp>
Expand All @@ -49,8 +48,6 @@
#include "broker/internal/connector.hh"
#include "broker/internal/core_actor.hh"
#include "broker/internal/logger.hh"
#include "broker/internal/metric_exporter.hh"
#include "broker/internal/prometheus.hh"
#include "broker/publisher.hh"
#include "broker/status_subscriber.hh"
#include "broker/subscriber.hh"
Expand All @@ -60,6 +57,9 @@
#include <memory>
#include <thread>

#include <prometheus/exposer.h>
#include <prometheus/registry.h>

#ifdef BROKER_WINDOWS
# include "Winsock2.h"
#endif
Expand Down Expand Up @@ -270,130 +270,6 @@ endpoint::background_task::~background_task() {
// nop
}

namespace {

class prometheus_http_task : public endpoint::background_task {
public:
prometheus_http_task(caf::actor_system& sys) : mpx_(&sys) {
// nop
}

template <class T>
bool has(caf::string_view name) {
auto res = caf::get_as<T>(mpx_.system().config(), name);
return static_cast<bool>(res);
}

expected<uint16_t> start(uint16_t port, caf::actor core,
const char* in = nullptr, bool reuse = true) {
caf::io::doorman_ptr dptr;
if (auto maybe_dptr = mpx_.new_tcp_doorman(port, in, reuse)) {
dptr = std::move(*maybe_dptr);
} else {
return facade(maybe_dptr.error());
}
auto actual_port = dptr->port();
using impl = internal::prometheus_actor;
mpx_supervisor_ = mpx_.make_supervisor();
caf::actor_config cfg{&mpx_};
worker_ = mpx_.system().spawn_impl<impl, caf::hidden>(cfg, std::move(dptr),
std::move(core));
struct { // TODO: replace with std::latch when available.
std::mutex mx;
std::condition_variable cv;
bool lit = false;
void ignite() {
std::unique_lock<std::mutex> guard{mx};
lit = true;
cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> guard{mx};
while (!lit)
cv.wait(guard);
}
} beacon;
auto run_mpx = [this, &beacon] {
BROKER_TRACE("");
mpx_.thread_id(std::this_thread::get_id());
beacon.ignite();
mpx_.run();
};
thread_ = mpx_.system().launch_thread("broker.prom", run_mpx);
beacon.wait();
return actual_port;
}

~prometheus_http_task() override {
if (mpx_supervisor_) {
mpx_.dispatch([=] {
auto base_ptr = caf::actor_cast<caf::abstract_actor*>(worker_);
auto ptr = static_cast<caf::io::broker*>(base_ptr);
if (!ptr->getf(caf::abstract_actor::is_terminated_flag)) {
ptr->context(&mpx_);
ptr->quit();
ptr->finalize();
}
});
mpx_supervisor_.reset();
thread_.join();
}
}

caf::actor telemetry_exporter() {
return worker_;
}

private:
caf::io::network::default_multiplexer mpx_;
caf::io::network::multiplexer::supervisor_ptr mpx_supervisor_;
caf::actor worker_;
std::thread thread_;
};

} // namespace

// --- metrics_exporter_t::endpoint class --------------------------------------

using string_list = std::vector<std::string>;

void endpoint::metrics_exporter_t::set_interval(caf::timespan new_interval) {
if (new_interval.count() > 0)
caf::anon_send(native(parent_->telemetry_exporter_), atom::put_v,
new_interval);
}

void endpoint::metrics_exporter_t::set_target(topic new_target) {
if (!new_target.empty())
caf::anon_send(native(parent_->telemetry_exporter_), atom::put_v,
std::move(new_target));
}

void endpoint::metrics_exporter_t::set_id(std::string new_id) {
if (!new_id.empty())
caf::anon_send(native(parent_->telemetry_exporter_), atom::put_v,
std::move(new_id));
}

void endpoint::metrics_exporter_t::set_prefixes(string_list new_prefixes) {
// We only wrap the prefixes into a filter to get around assigning a type ID
// to std::vector<std::string> (which technically would require us to change
// Broker ID on the network).
filter_type boxed;
for (auto& str : new_prefixes)
boxed.emplace_back(std::move(str));
caf::anon_send(native(parent_->telemetry_exporter_), atom::put_v,
std::move(boxed));
}

void endpoint::metrics_exporter_t::set_import_topics(string_list new_topics) {
filter_type filter;
for (auto& str : new_topics)
filter.emplace_back(std::move(str));
caf::anon_send(native(parent_->telemetry_exporter_), atom::join_v,
std::move(filter));
}

// --- endpoint class ----------------------------------------------------------

namespace {
Expand Down Expand Up @@ -485,12 +361,16 @@ endpoint::endpoint() : endpoint(configuration{}, endpoint_id::random()) {
// nop
}

endpoint::endpoint(configuration config)
: endpoint(std::move(config), endpoint_id::random()) {
endpoint::endpoint(configuration config, prometheus_registry_ptr registry)
: endpoint(std::move(config), endpoint_id::random(), std::move(registry)) {
// nop
}

endpoint::endpoint(configuration config, endpoint_id id) : id_(id) {
endpoint::endpoint(configuration config, endpoint_id id,
prometheus_registry_ptr registry)
: id_(id), registry_(std::move(registry)) {
if (registry_ == nullptr)
registry_ = std::make_shared<prometheus::Registry>();
// Spin up the actor system.
auto broker_cfg = config.options();
auto ssl_cfg = config.openssl_options();
Expand Down Expand Up @@ -560,31 +440,23 @@ endpoint::endpoint(configuration config, endpoint_id id) : id_(id) {
domain_options adaptation{opts.disable_forwarding};
if (auto sp = caf::get_as<std::string>(cfg, "caf.scheduler.policy");
sp && *sp == "testing") {
core = sys.spawn<core_t>(id_, filter_type{}, clock_.get(), &adaptation,
std::move(conn_ptr));
core = sys.spawn<core_t>(registry_, id_, filter_type{}, clock_.get(),
&adaptation, std::move(conn_ptr));
} else {
core = sys.spawn<core_t, caf::detached>(id_, filter_type{}, clock_.get(),
&adaptation, std::move(conn_ptr));
core = sys.spawn<core_t, caf::detached>(registry_, id_, filter_type{},
clock_.get(), &adaptation,
std::move(conn_ptr));
}
core_ = facade(core);
// Spin up a Prometheus actor if configured or an exporter.
// Spin up a Prometheus exposer if configured.
if (auto port = caf::get_as<broker::port>(cfg, "broker.metrics.port")) {
auto ptask = std::make_unique<prometheus_http_task>(sys);
auto addr = caf::get_or(cfg, "broker.metrics.address", std::string{});
if (auto actual_port =
ptask->start(port->number(), native(core_),
addr.empty() ? nullptr : addr.c_str())) {
BROKER_INFO("expose metrics on port" << *actual_port);
telemetry_exporter_ = facade(ptask->telemetry_exporter());
background_tasks_.emplace_back(std::move(ptask));
} else {
BROKER_ERROR("failed to expose metrics:" << actual_port.error());
}
} else {
using exporter_t = internal::metric_exporter_actor;
auto params = internal::metric_exporter_params::from(cfg);
auto hdl = sys.spawn<exporter_t>(native(core_), std::move(params));
telemetry_exporter_ = facade(hdl);
auto str = caf::get_or(cfg, "broker.metrics.address", ""s);
if (!str.empty())
str += ':';
str += std::to_string(port->number());
BROKER_INFO("expose metrics on" << str);
exposer_ = std::make_unique<prometheus::Exposer>(str);
exposer_->RegisterCollectable(registry_);
}
// Spin up a WebSocket server when requested.
if (auto port = caf::get_as<broker::port>(cfg, "broker.web-socket.port"))
Expand Down Expand Up @@ -639,13 +511,6 @@ void endpoint::shutdown() {
self->wait_for(native(hdl));
workers_.clear();
}
BROKER_DEBUG("stop the telemetry exporter");
self->send_exit(native(telemetry_exporter_),
caf::exit_reason::user_shutdown);
if (sched)
sched->run();
self->wait_for(native(telemetry_exporter_));
telemetry_exporter_ = nullptr;
}
BROKER_DEBUG("stop" << background_tasks_.size() << "background tasks");
background_tasks_.clear();
Expand Down
Loading

0 comments on commit 2433a72

Please sign in to comment.