From fcc4cfcf2c07683f91e6cae4454c803bc47336f9 Mon Sep 17 00:00:00 2001 From: mcfurry Date: Fri, 4 Jun 2021 10:26:45 +0200 Subject: [PATCH 1/4] Directly publish to /diagnostics_agg on non-ok status and also fill toplevel status with message --- diagnostic_aggregator/CMakeLists.txt | 3 +- .../diagnostic_aggregator/aggregator.h | 18 +++- diagnostic_aggregator/src/aggregator.cpp | 96 ++++++++++++++++--- diagnostic_aggregator/src/aggregator_node.cpp | 17 ++-- .../launch/test_immediate_agg_publish.launch | 12 +++ .../test/test_immediate_agg_publish.py | 75 +++++++++++++++ 6 files changed, 192 insertions(+), 29 deletions(-) create mode 100644 diagnostic_aggregator/test/launch/test_immediate_agg_publish.launch create mode 100755 diagnostic_aggregator/test/test_immediate_agg_publish.py diff --git a/diagnostic_aggregator/CMakeLists.txt b/diagnostic_aggregator/CMakeLists.txt index eabb44917..27b2c8361 100644 --- a/diagnostic_aggregator/CMakeLists.txt +++ b/diagnostic_aggregator/CMakeLists.txt @@ -38,7 +38,7 @@ target_link_libraries(diagnostic_aggregator ${Boost_LIBRARIES} ) add_dependencies(${PROJECT_NAME} diagnostic_msgs_generate_messages_cpp) -# Aggregator node +# Aggregator node add_executable(aggregator_node src/aggregator_node.cpp) target_link_libraries(aggregator_node ${catkin_LIBRARIES} ${PROJECT_NAME} @@ -59,6 +59,7 @@ if(CATKIN_ENABLE_TESTING) add_rostest(test/launch/test_multiple_match.launch) add_rostest(test/launch/test_discard_stale_not_published.launch) + add_rostest(test/launch/test_immediate_agg_publish.launch) endif() catkin_install_python( diff --git a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h index d35c8f1bd..a14033534 100644 --- a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h +++ b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h @@ -78,11 +78,11 @@ Input (status names): /Robot/Sensors/Tilt Hokuyo/Frequency /Robot/Sensors/Tilt Hokuyo/Connection \endverbatim - * The analyzer should always output a DiagnosticStatus with the name of the + * The analyzer should always output a DiagnosticStatus with the name of the * prefix. Any other data output is up to the analyzer developer. - * + * * Analyzer's are loaded by specifying the private parameters of the - * aggregator. + * aggregator. \verbatim base_path: My Robot pub_rate: 1.0 @@ -103,7 +103,7 @@ other_as_errors: false * the aggregator will report the error and publish it in the aggregated output. */ class Aggregator -{ +{ public: /*! *\brief Constructor initializes with main prefix (ex: '/Robot') @@ -117,6 +117,14 @@ class Aggregator */ void publishData(); + /*! + *\brief Timer callback + */ + void updateTimerCB(const ros::TimerEvent&) + { + publishData(); + } + /*! *\brief True if the NodeHandle reports OK */ @@ -133,8 +141,10 @@ class Aggregator ros::Subscriber diag_sub_; /**< DiagnosticArray, /diagnostics */ ros::Publisher agg_pub_; /**< DiagnosticArray, /diagnostics_agg */ ros::Publisher toplevel_state_pub_; /**< DiagnosticStatus, /diagnostics_toplevel_state */ + ros::Timer updateTimer_; /* ROS timer for periodic updates */ boost::mutex mutex_; double pub_rate_; + int8_t diag_toplevel_ = -1; /*! *\brief Callback for incoming "/diagnostics" diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index 69e382e57..b2ac8ea60 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -69,6 +69,9 @@ Aggregator::Aggregator() : diag_sub_ = n_.subscribe("/diagnostics", 1000, &Aggregator::diagCallback, this); agg_pub_ = n_.advertise("/diagnostics_agg", 1); toplevel_state_pub_ = n_.advertise("/diagnostics_toplevel_state", 1); + + // Create standard update publish timer + updateTimer_ = n_.createTimer(pub_rate_, &Aggregator::updateTimerCB, this); } void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg) @@ -84,7 +87,7 @@ void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr stamp_warn += ", "; stamp_warn += it->name; } - + if (!ros_warnings_.count(stamp_warn)) { ROS_WARN("%s", stamp_warn.c_str()); @@ -97,21 +100,38 @@ void Aggregator::diagCallback(const diagnostic_msgs::DiagnosticArray::ConstPtr& checkTimestamp(diag_msg); bool analyzed = false; - { // lock the whole loop to ensure nothing in the analyzer group changes - // during it. - boost::mutex::scoped_lock lock(mutex_); - for (unsigned int j = 0; j < diag_msg->status.size(); ++j) + bool immediate_report = false; + // lock the whole loop to ensure nothing in the analyzer group changes during it. + mutex_.lock(); + for (unsigned int j = 0; j < diag_msg->status.size(); ++j) + { + analyzed = false; + boost::shared_ptr item(new StatusItem(&diag_msg->status[j])); + + if (analyzer_group_->match(item->getName())) { - analyzed = false; - boost::shared_ptr item(new StatusItem(&diag_msg->status[j])); + analyzed = analyzer_group_->analyze(item); + } - if (analyzer_group_->match(item->getName())) - analyzed = analyzer_group_->analyze(item); + if (!analyzed) + { + other_analyzer_->analyze(item); + } - if (!analyzed) - other_analyzer_->analyze(item); + // In case we get a Diagnostic message with level > diag_toplevel_, report immediately instead of std 1Hz rate + if (item->getLevel() > diag_toplevel_) + { + immediate_report = true; } } + + // Unlock mutex since we're done processing and publishData() now might need to access it + mutex_.unlock(); + + if (immediate_report) + { + publishData(); + } } Aggregator::~Aggregator() @@ -207,7 +227,10 @@ void Aggregator::publishData() diag_toplevel_state.name = "toplevel_state"; diag_toplevel_state.level = -1; int min_level = 255; - + uint non_ok_status_deepness = 0; + uint deepness = 0; + uint report_idx = 0; + vector > processed; { boost::mutex::scoped_lock lock(mutex_); @@ -217,13 +240,37 @@ void Aggregator::publishData() { diag_array.status.push_back(*processed[i]); + deepness = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); if (processed[i]->level > diag_toplevel_state.level) + { diag_toplevel_state.level = processed[i]->level; + non_ok_status_deepness = deepness; + report_idx = i; + } + if (processed[i]->level == diag_toplevel_state.level && deepness > non_ok_status_deepness) + { + // On non okay diagnostics also copy the deepest message to toplevel state + non_ok_status_deepness = deepness; + report_idx = i; + } if (processed[i]->level < min_level) + { min_level = processed[i]->level; + } + } + // When a non-ok item was found, copy the complete status message once + if (diag_toplevel_state.level > diagnostic_msgs::DiagnosticStatus::OK) + { + diag_toplevel_state.name = processed[report_idx]->name; + diag_toplevel_state.message = processed[report_idx]->message; + diag_toplevel_state.hardware_id = processed[report_idx]->hardware_id; + diag_toplevel_state.values = processed[report_idx]->values; } - vector > processed_other; + /* Process other category (unmapped items) */ + non_ok_status_deepness = 0; + report_idx = 0; + vector > processed_other; { boost::mutex::scoped_lock lock(mutex_); processed_other = other_analyzer_->report(); @@ -232,10 +279,31 @@ void Aggregator::publishData() { diag_array.status.push_back(*processed_other[i]); + deepness = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); if (processed_other[i]->level > diag_toplevel_state.level) + { diag_toplevel_state.level = processed_other[i]->level; + non_ok_status_deepness = deepness; + report_idx = i; + } + if (processed_other[i]->level == diag_toplevel_state.level && deepness > non_ok_status_deepness) + { + // On non okay diagnostics also copy the deepest message to toplevel state + non_ok_status_deepness = deepness; + report_idx = i; + } if (processed_other[i]->level < min_level) + { min_level = processed_other[i]->level; + } + // When a non-ok item was found AND it was reported in 'other' categpry, copy the complete status message once + if (diag_toplevel_state.level > diagnostic_msgs::DiagnosticStatus::OK && non_ok_status_deepness > 0) + { + diag_toplevel_state.name = processed_other[report_idx]->name; + diag_toplevel_state.message = processed_other[report_idx]->message; + diag_toplevel_state.hardware_id = processed_other[report_idx]->hardware_id; + diag_toplevel_state.values = processed_other[report_idx]->values; + } } diag_array.header.stamp = ros::Time::now(); @@ -246,5 +314,7 @@ void Aggregator::publishData() if (diag_toplevel_state.level > int(DiagnosticLevel::Level_Error) && min_level <= int(DiagnosticLevel::Level_Error)) diag_toplevel_state.level = DiagnosticLevel::Level_Error; + // Store latest toplevel state for immediate publish checking + diag_toplevel_ = diag_toplevel_state.level; toplevel_state_pub_.publish(diag_toplevel_state); } diff --git a/diagnostic_aggregator/src/aggregator_node.cpp b/diagnostic_aggregator/src/aggregator_node.cpp index f8c9d85fb..972a8b36c 100644 --- a/diagnostic_aggregator/src/aggregator_node.cpp +++ b/diagnostic_aggregator/src/aggregator_node.cpp @@ -42,26 +42,21 @@ using namespace std; int main(int argc, char **argv) { ros::init(argc, argv, "diagnostic_aggregator"); - + try { diagnostic_aggregator::Aggregator agg; - - ros::Rate pub_rate(agg.getPubRate()); - while (agg.ok()) - { - ros::spinOnce(); - agg.publishData(); - pub_rate.sleep(); - } + + // handle callbacks until shut down + ros::spin(); } catch (exception& e) { ROS_FATAL("Diagnostic aggregator node caught exception. Aborting. %s", e.what()); ROS_BREAK(); } - + exit(0); return 0; } - + diff --git a/diagnostic_aggregator/test/launch/test_immediate_agg_publish.launch b/diagnostic_aggregator/test/launch/test_immediate_agg_publish.launch new file mode 100644 index 000000000..a73704a75 --- /dev/null +++ b/diagnostic_aggregator/test/launch/test_immediate_agg_publish.launch @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/diagnostic_aggregator/test/test_immediate_agg_publish.py b/diagnostic_aggregator/test/test_immediate_agg_publish.py new file mode 100755 index 000000000..879fc894f --- /dev/null +++ b/diagnostic_aggregator/test/test_immediate_agg_publish.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +import os +import unittest + +import rospy +import rostest + +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus + + +class TestCustomAggregator(unittest.TestCase): + def __init__(self, *args): + super(TestCustomAggregator, self).__init__(*args) + + self.toplevel_rcvd = DiagnosticStatus.OK + self.toplevel_msg_rcvd = 'Nothing received yet!' + + self.diag_pub = rospy.Publisher("/diagnostics", DiagnosticArray, queue_size=4) + self.diag_sub = rospy.Subscriber("/diagnostics_toplevel_state", DiagnosticStatus, self.toplevel_cb) + + # Make sure topics are connected + rospy.sleep(rospy.Duration(1.0)) + + def toplevel_cb(self, msg): + self.toplevel_rcvd = msg.level + self.toplevel_msg_rcvd = msg.message + + def publish_diagnostic(self, level, msg): + error_msg = DiagnosticArray() + self.error_status = DiagnosticStatus() + self.error_status.level = level + self.error_status.name = 'test: test-component' + self.error_status.message = msg + self.error_status.hardware_id = 'cpu' + error_msg.status.append(self.error_status) + error_msg.header.stamp = rospy.Time.now() + self.diag_pub.publish(error_msg) + + def test_publish_immediate(self): + # Start happy and wait for okay aggregation + t0 = rospy.Time.now() + while self.toplevel_rcvd != DiagnosticStatus.OK and rospy.Time.now() - t0 < rospy.Duration(3.0): + self.publish_diagnostic(level=DiagnosticStatus.OK, msg='Nothing on the hand') + rospy.sleep(rospy.Duration(0.5)) + + # Get time, publish error and wait for new aggregated update + t0 = rospy.Time.now() + while self.toplevel_rcvd != DiagnosticStatus.ERROR and rospy.Time.now() - t0 < rospy.Duration(3.0): + self.publish_diagnostic(level=DiagnosticStatus.ERROR, msg='This is the error message') + rospy.sleep(rospy.Duration(0.1)) + t1 = rospy.Time.now() + # Test time diff < 1 sec + self.assertLess(t1 - t0, rospy.Duration(1.0), msg="Error message was not directly cascaded to aggregated topic") + + def test_message_in_toplevel(self): + # Start happy and wait for okay aggregation + t0 = rospy.Time.now() + while self.toplevel_rcvd != DiagnosticStatus.OK and rospy.Time.now() - t0 < rospy.Duration(3.0): + self.publish_diagnostic(level=DiagnosticStatus.OK, msg='Nothing on the hand') + rospy.sleep(rospy.Duration(0.5)) + + # Get time, publish error and wait for new aggregated update + t0 = rospy.Time.now() + error_msg = 'This is the error message' + while self.toplevel_rcvd != DiagnosticStatus.ERROR and rospy.Time.now() - t0 < rospy.Duration(3.0): + self.publish_diagnostic(level=DiagnosticStatus.ERROR, msg=error_msg) + rospy.sleep(rospy.Duration(0.1)) + + # Test for message in toplevel status + self.assertTrue(self.toplevel_msg_rcvd == error_msg, msg='Error message not in toplevel state') + + +if __name__ == "__main__": + rospy.init_node("test_custom_aggregator", anonymous=False) + rostest.rosrun("diagnostic_aggregator", "test_custom_aggregator", TestCustomAggregator) From 1d7365483b6139dd3fefc16e39233c07b7870fbb Mon Sep 17 00:00:00 2001 From: Rokus Ottervanger Date: Fri, 8 Apr 2022 11:01:58 +0200 Subject: [PATCH 2/4] style: rename "deepness" to "depth" --- diagnostic_aggregator/src/aggregator.cpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index b2ac8ea60..b48232218 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -227,8 +227,8 @@ void Aggregator::publishData() diag_toplevel_state.name = "toplevel_state"; diag_toplevel_state.level = -1; int min_level = 255; - uint non_ok_status_deepness = 0; - uint deepness = 0; + uint non_ok_status_depth = 0; + uint depth = 0; uint report_idx = 0; vector > processed; @@ -240,17 +240,17 @@ void Aggregator::publishData() { diag_array.status.push_back(*processed[i]); - deepness = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); + depth = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); if (processed[i]->level > diag_toplevel_state.level) { diag_toplevel_state.level = processed[i]->level; - non_ok_status_deepness = deepness; + non_ok_status_depth = depth; report_idx = i; } - if (processed[i]->level == diag_toplevel_state.level && deepness > non_ok_status_deepness) + if (processed[i]->level == diag_toplevel_state.level && depth > non_ok_status_depth) { // On non okay diagnostics also copy the deepest message to toplevel state - non_ok_status_deepness = deepness; + non_ok_status_depth = depth; report_idx = i; } if (processed[i]->level < min_level) @@ -268,7 +268,7 @@ void Aggregator::publishData() } /* Process other category (unmapped items) */ - non_ok_status_deepness = 0; + non_ok_status_depth = 0; report_idx = 0; vector > processed_other; { @@ -279,17 +279,17 @@ void Aggregator::publishData() { diag_array.status.push_back(*processed_other[i]); - deepness = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); + depth = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); if (processed_other[i]->level > diag_toplevel_state.level) { diag_toplevel_state.level = processed_other[i]->level; - non_ok_status_deepness = deepness; + non_ok_status_depth = depth; report_idx = i; } - if (processed_other[i]->level == diag_toplevel_state.level && deepness > non_ok_status_deepness) + if (processed_other[i]->level == diag_toplevel_state.level && depth > non_ok_status_depth) { // On non okay diagnostics also copy the deepest message to toplevel state - non_ok_status_deepness = deepness; + non_ok_status_depth = depth; report_idx = i; } if (processed_other[i]->level < min_level) @@ -297,7 +297,7 @@ void Aggregator::publishData() min_level = processed_other[i]->level; } // When a non-ok item was found AND it was reported in 'other' categpry, copy the complete status message once - if (diag_toplevel_state.level > diagnostic_msgs::DiagnosticStatus::OK && non_ok_status_deepness > 0) + if (diag_toplevel_state.level > diagnostic_msgs::DiagnosticStatus::OK && non_ok_status_depth > 0) { diag_toplevel_state.name = processed_other[report_idx]->name; diag_toplevel_state.message = processed_other[report_idx]->message; From f1b65d6f58b62aae5ea610da43d676a560938bb5 Mon Sep 17 00:00:00 2001 From: Rokus Ottervanger Date: Fri, 8 Apr 2022 11:07:22 +0200 Subject: [PATCH 3/4] refactor: limit scope of local variable and declare const --- diagnostic_aggregator/src/aggregator.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index b48232218..e6823fce4 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -228,7 +228,6 @@ void Aggregator::publishData() diag_toplevel_state.level = -1; int min_level = 255; uint non_ok_status_depth = 0; - uint depth = 0; uint report_idx = 0; vector > processed; @@ -240,7 +239,7 @@ void Aggregator::publishData() { diag_array.status.push_back(*processed[i]); - depth = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); + const uint depth = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); if (processed[i]->level > diag_toplevel_state.level) { diag_toplevel_state.level = processed[i]->level; @@ -279,7 +278,7 @@ void Aggregator::publishData() { diag_array.status.push_back(*processed_other[i]); - depth = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); + const uint depth = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); if (processed_other[i]->level > diag_toplevel_state.level) { diag_toplevel_state.level = processed_other[i]->level; From 8172cb15bc59dbb270134045003ef03060d12f6f Mon Sep 17 00:00:00 2001 From: Ferry Schoenmakers Date: Thu, 5 Jan 2023 15:41:44 +0100 Subject: [PATCH 4/4] Bugfix! We parse the wrong vector here which could yield a segfault --- diagnostic_aggregator/src/aggregator.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index e6823fce4..1000dd422 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -278,7 +278,8 @@ void Aggregator::publishData() { diag_array.status.push_back(*processed_other[i]); - const uint depth = static_cast(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/')); + const uint depth = static_cast( + std::count(processed_other[i]->name.begin(), processed_other[i]->name.end(), '/')); if (processed_other[i]->level > diag_toplevel_state.level) { diag_toplevel_state.level = processed_other[i]->level;