diff --git a/daq/jobs/handle_stats.html b/daq/jobs/handle_stats.html index 4c3df96..2f7de66 100644 --- a/daq/jobs/handle_stats.html +++ b/daq/jobs/handle_stats.html @@ -155,7 +155,16 @@

Classes

# Combine remote stats from all supervisors remote_stats_combined = defaultdict(lambda: SupervisorRemoteStats()) - for _, remote_stats_dict in self._remote_stats.items(): + if ( + self._supervisor_config + and self._supervisor_config.supervisor_id in self._remote_stats + ): + for supervisor_id, remote_stats in self._remote_stats[ + self._supervisor_config.supervisor_id + ].items(): + remote_stats_combined[supervisor_id] = remote_stats + + for remote_stats_dict in self._remote_stats.values(): # For each remote stats dict, combine the values for ( supervisor_id, @@ -165,7 +174,10 @@

Classes

remote_stats_dict_serialized = msgspec.structs.asdict( remote_stats_dict_serialized_item ) + # Set each value for item, value in remote_stats_dict_serialized.items(): + if value == 0 or not value: + continue setattr(remote_stats_combined[supervisor_id], item, value) for supervisor_id, remote_stats in remote_stats_combined.items(): diff --git a/daq/jobs/remote.html b/daq/jobs/remote.html index d8f129a..00ca871 100644 --- a/daq/jobs/remote.html +++ b/daq/jobs/remote.html @@ -165,16 +165,23 @@

Attributes

if message.remote_config.remote_disable: return True + self._send_remote_message(message) + return True + + def _send_remote_message(self, message: DAQJobMessage): + if self._zmq_pub is None: + return + remote_topic = message.remote_config.remote_topic or DEFAULT_REMOTE_TOPIC - remote_topic = remote_topic.encode() + remote_topic_bytes = remote_topic.encode() packed_message = self._pack_message(message) - self._zmq_pub.send_multipart([remote_topic, packed_message]) + self._zmq_pub.send_multipart([remote_topic_bytes, packed_message]) # Update remote stats if self._supervisor_config: self._remote_stats[ self._supervisor_config.supervisor_id - ].update_message_out_stats(len(packed_message) + len(remote_topic)) + ].update_message_out_stats(len(packed_message) + len(remote_topic_bytes)) self._logger.debug( f"Sent message '{type(message).__name__}' to topic '{remote_topic}'" @@ -319,7 +326,9 @@

Attributes

return res def _send_remote_stats_message(self): - self._put_message_out(DAQJobMessageStatsRemote(dict(self._remote_stats))) + msg = DAQJobMessageStatsRemote(dict(self._remote_stats)) + self._send_remote_message(msg) + self._put_message_out(msg) def __del__(self): """