Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directly publish to /diagnostics_agg on non-ok status #197

Open
wants to merge 4 commits into
base: noetic-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion diagnostic_aggregator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ target_link_libraries(diagnostic_aggregator ${Boost_LIBRARIES}
)
add_dependencies(${PROJECT_NAME} diagnostic_msgs_generate_messages_cpp)

# Aggregator node
# Aggregator node
add_executable(aggregator_node src/aggregator_node.cpp)
target_link_libraries(aggregator_node ${catkin_LIBRARIES}
${PROJECT_NAME}
Expand All @@ -59,6 +59,7 @@ if(CATKIN_ENABLE_TESTING)
add_rostest(test/launch/test_multiple_match.launch)

add_rostest(test/launch/test_discard_stale_not_published.launch)
add_rostest(test/launch/test_immediate_agg_publish.launch)
endif()

catkin_install_python(
Expand Down
18 changes: 14 additions & 4 deletions diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ Input (status names):
/Robot/Sensors/Tilt Hokuyo/Frequency
/Robot/Sensors/Tilt Hokuyo/Connection
\endverbatim
* The analyzer should always output a DiagnosticStatus with the name of the
* The analyzer should always output a DiagnosticStatus with the name of the
* prefix. Any other data output is up to the analyzer developer.
*
*
* Analyzer's are loaded by specifying the private parameters of the
* aggregator.
* aggregator.
\verbatim
base_path: My Robot
pub_rate: 1.0
Expand All @@ -103,7 +103,7 @@ other_as_errors: false
* the aggregator will report the error and publish it in the aggregated output.
*/
class Aggregator
{
{
public:
/*!
*\brief Constructor initializes with main prefix (ex: '/Robot')
Expand All @@ -117,6 +117,14 @@ class Aggregator
*/
void publishData();

/*!
*\brief Timer callback
*/
void updateTimerCB(const ros::TimerEvent&)
{
publishData();
}

/*!
*\brief True if the NodeHandle reports OK
*/
Expand All @@ -133,8 +141,10 @@ class Aggregator
ros::Subscriber diag_sub_; /**< DiagnosticArray, /diagnostics */
ros::Publisher agg_pub_; /**< DiagnosticArray, /diagnostics_agg */
ros::Publisher toplevel_state_pub_; /**< DiagnosticStatus, /diagnostics_toplevel_state */
ros::Timer updateTimer_; /* ROS timer for periodic updates */
boost::mutex mutex_;
double pub_rate_;
int8_t diag_toplevel_ = -1;

/*!
*\brief Callback for incoming "/diagnostics"
Expand Down
96 changes: 83 additions & 13 deletions diagnostic_aggregator/src/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ Aggregator::Aggregator() :
diag_sub_ = n_.subscribe("/diagnostics", 1000, &Aggregator::diagCallback, this);
agg_pub_ = n_.advertise<diagnostic_msgs::DiagnosticArray>("/diagnostics_agg", 1);
toplevel_state_pub_ = n_.advertise<diagnostic_msgs::DiagnosticStatus>("/diagnostics_toplevel_state", 1);

// Create standard update publish timer
updateTimer_ = n_.createTimer(pub_rate_, &Aggregator::updateTimerCB, this);
}

void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
Expand All @@ -84,7 +87,7 @@ void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr
stamp_warn += ", ";
stamp_warn += it->name;
}

if (!ros_warnings_.count(stamp_warn))
{
ROS_WARN("%s", stamp_warn.c_str());
Expand All @@ -97,21 +100,38 @@ void Aggregator::diagCallback(const diagnostic_msgs::DiagnosticArray::ConstPtr&
checkTimestamp(diag_msg);

bool analyzed = false;
{ // lock the whole loop to ensure nothing in the analyzer group changes
// during it.
boost::mutex::scoped_lock lock(mutex_);
for (unsigned int j = 0; j < diag_msg->status.size(); ++j)
bool immediate_report = false;
// lock the whole loop to ensure nothing in the analyzer group changes during it.
mutex_.lock();
for (unsigned int j = 0; j < diag_msg->status.size(); ++j)
{
analyzed = false;
boost::shared_ptr<StatusItem> item(new StatusItem(&diag_msg->status[j]));

if (analyzer_group_->match(item->getName()))
{
analyzed = false;
boost::shared_ptr<StatusItem> item(new StatusItem(&diag_msg->status[j]));
analyzed = analyzer_group_->analyze(item);
}

if (analyzer_group_->match(item->getName()))
analyzed = analyzer_group_->analyze(item);
if (!analyzed)
{
other_analyzer_->analyze(item);
}

if (!analyzed)
other_analyzer_->analyze(item);
// In case we get a Diagnostic message with level > diag_toplevel_, report immediately instead of std 1Hz rate
if (item->getLevel() > diag_toplevel_)
{
immediate_report = true;
}
}

// Unlock mutex since we're done processing and publishData() now might need to access it
mutex_.unlock();

if (immediate_report)
{
publishData();
}
}

Aggregator::~Aggregator()
Expand Down Expand Up @@ -207,7 +227,9 @@ void Aggregator::publishData()
diag_toplevel_state.name = "toplevel_state";
diag_toplevel_state.level = -1;
int min_level = 255;

uint non_ok_status_depth = 0;
uint report_idx = 0;

vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed;
{
boost::mutex::scoped_lock lock(mutex_);
Expand All @@ -217,13 +239,37 @@ void Aggregator::publishData()
{
diag_array.status.push_back(*processed[i]);

const uint depth = static_cast<uint>(std::count(processed[i]->name.begin(), processed[i]->name.end(), '/'));
if (processed[i]->level > diag_toplevel_state.level)
{
diag_toplevel_state.level = processed[i]->level;
non_ok_status_depth = depth;
report_idx = i;
}
if (processed[i]->level == diag_toplevel_state.level && depth > non_ok_status_depth)
{
// On non okay diagnostics also copy the deepest message to toplevel state
non_ok_status_depth = depth;
report_idx = i;
}
if (processed[i]->level < min_level)
{
min_level = processed[i]->level;
}
}
// When a non-ok item was found, copy the complete status message once
if (diag_toplevel_state.level > diagnostic_msgs::DiagnosticStatus::OK)
{
diag_toplevel_state.name = processed[report_idx]->name;
diag_toplevel_state.message = processed[report_idx]->message;
diag_toplevel_state.hardware_id = processed[report_idx]->hardware_id;
diag_toplevel_state.values = processed[report_idx]->values;
}

vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed_other;
/* Process other category (unmapped items) */
non_ok_status_depth = 0;
report_idx = 0;
vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed_other;
{
boost::mutex::scoped_lock lock(mutex_);
processed_other = other_analyzer_->report();
Expand All @@ -232,10 +278,32 @@ void Aggregator::publishData()
{
diag_array.status.push_back(*processed_other[i]);

const uint depth = static_cast<uint>(
std::count(processed_other[i]->name.begin(), processed_other[i]->name.end(), '/'));
if (processed_other[i]->level > diag_toplevel_state.level)
{
diag_toplevel_state.level = processed_other[i]->level;
non_ok_status_depth = depth;
report_idx = i;
}
if (processed_other[i]->level == diag_toplevel_state.level && depth > non_ok_status_depth)
{
// On non okay diagnostics also copy the deepest message to toplevel state
non_ok_status_depth = depth;
report_idx = i;
}
if (processed_other[i]->level < min_level)
{
min_level = processed_other[i]->level;
}
// When a non-ok item was found AND it was reported in 'other' categpry, copy the complete status message once
if (diag_toplevel_state.level > diagnostic_msgs::DiagnosticStatus::OK && non_ok_status_depth > 0)
{
diag_toplevel_state.name = processed_other[report_idx]->name;
diag_toplevel_state.message = processed_other[report_idx]->message;
diag_toplevel_state.hardware_id = processed_other[report_idx]->hardware_id;
diag_toplevel_state.values = processed_other[report_idx]->values;
}
}

diag_array.header.stamp = ros::Time::now();
Expand All @@ -246,5 +314,7 @@ void Aggregator::publishData()
if (diag_toplevel_state.level > int(DiagnosticLevel::Level_Error) && min_level <= int(DiagnosticLevel::Level_Error))
diag_toplevel_state.level = DiagnosticLevel::Level_Error;

// Store latest toplevel state for immediate publish checking
diag_toplevel_ = diag_toplevel_state.level;
toplevel_state_pub_.publish(diag_toplevel_state);
}
17 changes: 6 additions & 11 deletions diagnostic_aggregator/src/aggregator_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,21 @@ using namespace std;
int main(int argc, char **argv)
{
ros::init(argc, argv, "diagnostic_aggregator");

try
{
diagnostic_aggregator::Aggregator agg;

ros::Rate pub_rate(agg.getPubRate());
while (agg.ok())
{
ros::spinOnce();
agg.publishData();
pub_rate.sleep();
}

// handle callbacks until shut down
ros::spin();
}
catch (exception& e)
{
ROS_FATAL("Diagnostic aggregator node caught exception. Aborting. %s", e.what());
ROS_BREAK();
}

exit(0);
return 0;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<launch>

<!-- Aggregator -->
<node name="aggregator_node" pkg="diagnostic_aggregator" type="aggregator_node" output="screen">
<param name="analyzers/test/type" value="diagnostic_aggregator/GenericAnalyzer" />
<param name="analyzers/test/path" value="Test" />
<param name="analyzers/test/find_and_remove_prefix" value="test: " />
</node>

<test test-name="test_immediate_agg_publish" pkg="diagnostic_aggregator" type="test_immediate_agg_publish.py" />

</launch>
75 changes: 75 additions & 0 deletions diagnostic_aggregator/test/test_immediate_agg_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python
import os
import unittest

import rospy
import rostest

from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus


class TestCustomAggregator(unittest.TestCase):
def __init__(self, *args):
super(TestCustomAggregator, self).__init__(*args)

self.toplevel_rcvd = DiagnosticStatus.OK
self.toplevel_msg_rcvd = 'Nothing received yet!'

self.diag_pub = rospy.Publisher("/diagnostics", DiagnosticArray, queue_size=4)
self.diag_sub = rospy.Subscriber("/diagnostics_toplevel_state", DiagnosticStatus, self.toplevel_cb)

# Make sure topics are connected
rospy.sleep(rospy.Duration(1.0))

def toplevel_cb(self, msg):
self.toplevel_rcvd = msg.level
self.toplevel_msg_rcvd = msg.message

def publish_diagnostic(self, level, msg):
error_msg = DiagnosticArray()
self.error_status = DiagnosticStatus()
self.error_status.level = level
self.error_status.name = 'test: test-component'
self.error_status.message = msg
self.error_status.hardware_id = 'cpu'
error_msg.status.append(self.error_status)
error_msg.header.stamp = rospy.Time.now()
self.diag_pub.publish(error_msg)

def test_publish_immediate(self):
# Start happy and wait for okay aggregation
t0 = rospy.Time.now()
while self.toplevel_rcvd != DiagnosticStatus.OK and rospy.Time.now() - t0 < rospy.Duration(3.0):
self.publish_diagnostic(level=DiagnosticStatus.OK, msg='Nothing on the hand')
rospy.sleep(rospy.Duration(0.5))

# Get time, publish error and wait for new aggregated update
t0 = rospy.Time.now()
while self.toplevel_rcvd != DiagnosticStatus.ERROR and rospy.Time.now() - t0 < rospy.Duration(3.0):
self.publish_diagnostic(level=DiagnosticStatus.ERROR, msg='This is the error message')
rospy.sleep(rospy.Duration(0.1))
t1 = rospy.Time.now()
# Test time diff < 1 sec
self.assertLess(t1 - t0, rospy.Duration(1.0), msg="Error message was not directly cascaded to aggregated topic")

def test_message_in_toplevel(self):
# Start happy and wait for okay aggregation
t0 = rospy.Time.now()
while self.toplevel_rcvd != DiagnosticStatus.OK and rospy.Time.now() - t0 < rospy.Duration(3.0):
self.publish_diagnostic(level=DiagnosticStatus.OK, msg='Nothing on the hand')
rospy.sleep(rospy.Duration(0.5))

# Get time, publish error and wait for new aggregated update
t0 = rospy.Time.now()
error_msg = 'This is the error message'
while self.toplevel_rcvd != DiagnosticStatus.ERROR and rospy.Time.now() - t0 < rospy.Duration(3.0):
self.publish_diagnostic(level=DiagnosticStatus.ERROR, msg=error_msg)
rospy.sleep(rospy.Duration(0.1))

# Test for message in toplevel status
self.assertTrue(self.toplevel_msg_rcvd == error_msg, msg='Error message not in toplevel state')


if __name__ == "__main__":
rospy.init_node("test_custom_aggregator", anonymous=False)
rostest.rosrun("diagnostic_aggregator", "test_custom_aggregator", TestCustomAggregator)