From 7ca53cd41207862e8c18d9c4bb0b4bcc7abcd54c Mon Sep 17 00:00:00 2001 From: Martin Cornelis Date: Thu, 11 Jan 2024 17:01:55 +0100 Subject: [PATCH] Add add_analyzer functionality --- diagnostic_aggregator/CMakeLists.txt | 10 +++ .../diagnostic_aggregator/aggregator.hpp | 12 ++++ diagnostic_aggregator/src/add_analyzer.cpp | 63 +++++++++++++++++++ diagnostic_aggregator/src/aggregator.cpp | 60 ++++++++++++------ 4 files changed, 125 insertions(+), 20 deletions(-) create mode 100644 diagnostic_aggregator/src/add_analyzer.cpp diff --git a/diagnostic_aggregator/CMakeLists.txt b/diagnostic_aggregator/CMakeLists.txt index fd6c2a070..9a69aab02 100644 --- a/diagnostic_aggregator/CMakeLists.txt +++ b/diagnostic_aggregator/CMakeLists.txt @@ -14,6 +14,7 @@ find_package(ament_cmake REQUIRED) find_package(diagnostic_msgs REQUIRED) find_package(pluginlib REQUIRED) find_package(rclcpp REQUIRED) +find_package(rcl_interfaces REQUIRED) find_package(std_msgs REQUIRED) add_library(${PROJECT_NAME} SHARED @@ -67,6 +68,10 @@ add_executable(aggregator_node src/aggregator_node.cpp) target_link_libraries(aggregator_node ${PROJECT_NAME}) +# Add analyzer +add_executable(add_analyzer src/add_analyzer.cpp) +ament_target_dependencies(add_analyzer rclcpp rcl_interfaces) + # Testing macro if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) @@ -135,6 +140,11 @@ install( DESTINATION lib/${PROJECT_NAME} ) +install( + TARGETS add_analyzer + DESTINATION lib/${PROJECT_NAME} +) + install( TARGETS ${PROJECT_NAME} ${ANALYZERS} EXPORT ${PROJECT_NAME}Targets diff --git a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp index b901acdc0..5f48dd6b1 100644 --- a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp +++ b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp @@ -133,6 +133,8 @@ class Aggregator rclcpp::Service::SharedPtr add_srv_; /// DiagnosticArray, /diagnostics rclcpp::Subscription::SharedPtr diag_sub_; + /// ParameterEvent, /parameter_events + rclcpp::Subscription::SharedPtr param_sub_; /// DiagnosticArray, /diagnostics_agg rclcpp::Publisher::SharedPtr agg_pub_; /// DiagnosticStatus, /diagnostics_toplevel_state @@ -165,6 +167,16 @@ class Aggregator /// Records all ROS warnings. No warnings are repeated. std::set ros_warnings_; + /* + *!\brief Checks for new parameters to trigger reinitialization of the AnalyzerGroup and OtherAnalyzer + */ + void parameterCallback(const rcl_interfaces::msg::ParameterEvent::SharedPtr param_msg); + + /* + *!\brief (re)initializes the AnalyzerGroup and OtherAnalyzer + */ + void initAnalyzers(); + /* *!\brief Checks timestamp of message, and warns if timestamp is 0 (not set) */ diff --git a/diagnostic_aggregator/src/add_analyzer.cpp b/diagnostic_aggregator/src/add_analyzer.cpp new file mode 100644 index 000000000..d3e303a2f --- /dev/null +++ b/diagnostic_aggregator/src/add_analyzer.cpp @@ -0,0 +1,63 @@ +#include "rclcpp/rclcpp.hpp" +#include "rcl_interfaces/srv/set_parameters_atomically.hpp" +#include "rcl_interfaces/msg/parameter.hpp" + +#include +#include +#include + +using namespace std::chrono_literals; + +class AddAnalyzer : public rclcpp::Node +{ +public: + AddAnalyzer() + : Node("add_analyzer_node", "",rclcpp::NodeOptions().allow_undeclared_parameters(true).automatically_declare_parameters_from_overrides(true)) + { + client_ = this->create_client("/diagnostics_agg/set_parameters_atomically"); + } + + void send_request(){ + while (!client_->wait_for_service(1s)) { + if (!rclcpp::ok()) { + RCLCPP_ERROR(this->get_logger(), "Interrupted while waiting for the service. Exiting."); + return; + } + RCLCPP_INFO_ONCE(this->get_logger(), "service not available, waiting ..."); + } + auto request = std::make_shared(); + std::map parameters; + if (this->get_parameters("", parameters)) { + for (const auto & param : parameters) { + if (param.first.substr(0, prefix_.length()).compare(prefix_) == 0) { + auto parameter_msg = param.second.to_parameter_msg(); + request->parameters.push_back(parameter_msg); + } + } + } + auto result = client_->async_send_request(request); + // Wait for the result. + if (rclcpp::spin_until_future_complete(this->get_node_base_interface(), result) == + rclcpp::FutureReturnCode::SUCCESS) + { + RCLCPP_INFO(this->get_logger(), "Parameters succesfully set"); + } else { + RCLCPP_ERROR(this->get_logger(), "Failed to set parameters"); + } + } + +private: + rclcpp::Client::SharedPtr client_; + std::string prefix_ = "analyzers."; +}; + +int main(int argc, char **argv) +{ + rclcpp::init(argc, argv); + + auto add_analyzer = std::make_shared(); + add_analyzer->send_request(); + rclcpp::shutdown(); + + return 0; +} \ No newline at end of file diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index 64b716caf..ca849380e 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -59,7 +59,7 @@ using diagnostic_msgs::msg::DiagnosticStatus; Aggregator::Aggregator() : n_(std::make_shared( "analyzers", "", - rclcpp::NodeOptions().automatically_declare_parameters_from_overrides(true))), + rclcpp::NodeOptions().allow_undeclared_parameters(true).automatically_declare_parameters_from_overrides(true))), logger_(rclcpp::get_logger("Aggregator")), pub_rate_(1.0), history_depth_(1000), @@ -69,6 +69,35 @@ Aggregator::Aggregator() last_top_level_state_(DiagnosticStatus::STALE) { RCLCPP_DEBUG(logger_, "constructor"); + initAnalyzers(); + + diag_sub_ = n_->create_subscription( + "/diagnostics", rclcpp::SystemDefaultsQoS().keep_last(history_depth_), + std::bind(&Aggregator::diagCallback, this, _1)); + agg_pub_ = n_->create_publisher("/diagnostics_agg", 1); + toplevel_state_pub_ = + n_->create_publisher("/diagnostics_toplevel_state", 1); + + int publish_rate_ms = 1000 / pub_rate_; + publish_timer_ = n_->create_wall_timer( + std::chrono::milliseconds(publish_rate_ms), + std::bind(&Aggregator::publishData, this)); + + param_sub_ = n_->create_subscription( + "/parameter_events", 1, std::bind(&Aggregator::parameterCallback, this, _1)); +} + +void Aggregator::parameterCallback(const rcl_interfaces::msg::ParameterEvent::SharedPtr msg) +{ + if (msg->node == "/" + std::string(n_->get_name())) { + if (msg->new_parameters.size() != 0){ + initAnalyzers(); + } + } +} + +void Aggregator::initAnalyzers() +{ bool other_as_errors = false; std::map parameters; @@ -101,26 +130,17 @@ Aggregator::Aggregator() RCLCPP_DEBUG( logger_, "Aggregator critical publisher configured to: %s", (critical_ ? "true" : "false")); - analyzer_group_ = std::make_unique(); - if (!analyzer_group_->init(base_path_, "", n_)) { - RCLCPP_ERROR(logger_, "Analyzer group for diagnostic aggregator failed to initialize!"); - } - - // Last analyzer handles remaining data - other_analyzer_ = std::make_unique(other_as_errors); - other_analyzer_->init(base_path_); // This always returns true - - diag_sub_ = n_->create_subscription( - "/diagnostics", rclcpp::SystemDefaultsQoS().keep_last(history_depth_), - std::bind(&Aggregator::diagCallback, this, _1)); - agg_pub_ = n_->create_publisher("/diagnostics_agg", 1); - toplevel_state_pub_ = - n_->create_publisher("/diagnostics_toplevel_state", 1); + { // lock the mutex while analyzer_group_ and other_analyzer_ are being updated + std::lock_guard lock(mutex_); + analyzer_group_ = std::make_unique(); + if (!analyzer_group_->init(base_path_, "", n_)) { + RCLCPP_ERROR(logger_, "Analyzer group for diagnostic aggregator failed to initialize!"); + } - int publish_rate_ms = 1000 / pub_rate_; - publish_timer_ = n_->create_wall_timer( - std::chrono::milliseconds(publish_rate_ms), - std::bind(&Aggregator::publishData, this)); + // Last analyzer handles remaining data + other_analyzer_ = std::make_unique(other_as_errors); + other_analyzer_->init(base_path_); // This always returns true + } } void Aggregator::checkTimestamp(const DiagnosticArray::SharedPtr diag_msg)