Handles statistics for DAQ jobs.
@@ -59,14 +59,33 @@
config_type = DAQJobHandleStatsConfig
config: DAQJobHandleStatsConfig
+ _stats: dict[str, DAQJobStatsDict]
+
+ def __init__(self, config: DAQJobHandleStatsConfig, **kwargs):
+ super().__init__(config, **kwargs)
+ self._stats = {}
+
def start(self):
while True:
- self.consume(nowait=False)
+ start_time = datetime.now()
+ self.consume()
+ self._save_stats()
+ sleep_for(DAQ_JOB_HANDLE_STATS_SLEEP_INTERVAL_SECONDS, start_time)
def handle_message(self, message: DAQJobMessageStats) -> bool:
if not super().handle_message(message):
return False
+ # Ignore if the message has no supervisor info
+ if not message.daq_job_info or not message.daq_job_info.supervisor_config:
+ return True
+
+ self._stats[message.daq_job_info.supervisor_config.supervisor_id] = (
+ message.stats
+ )
+ return True
+
+ def _save_stats(self):
keys = [
"supervisor",
"daq_job",
@@ -90,38 +109,27 @@
record.count,
]
- if message.daq_job_info and message.daq_job_info.supervisor_config:
- supervisor_id = message.daq_job_info.supervisor_config.supervisor_id
- else:
- supervisor_id = "N/A"
- data_to_send = []
- for daq_job_type, msg in message.stats.items():
- data_to_send.append(
- [
- supervisor_id,
- daq_job_type.__name__,
- str(msg.is_alive).lower(),
- *unpack_record(msg.message_in_stats),
- *unpack_record(msg.message_out_stats),
- *unpack_record(msg.restart_stats),
- ]
- )
-
- if message.daq_job_info and message.daq_job_info.supervisor_config:
- tag = message.daq_job_info.supervisor_config.supervisor_id
- else:
- tag = None
-
- self._put_message_out(
- DAQJobMessageStoreTabular(
- store_config=self.config.store_config,
- keys=keys,
- data=data_to_send,
- tag=tag,
- )
- )
+ for supervisor_id, stats in self._stats.items():
+ data_to_send = []
+ for daq_job_type, msg in stats.items():
+ data_to_send.append(
+ [
+ supervisor_id,
+ daq_job_type.__name__,
+ str(msg.is_alive).lower(),
+ *unpack_record(msg.message_in_stats),
+ *unpack_record(msg.message_out_stats),
+ *unpack_record(msg.restart_stats),
+ ]
+ )
- return True
+ self._put_message_out(
+ DAQJobMessageStoreTabular(
+ store_config=self.config.store_config,
+ keys=keys,
+ data=data_to_send,
+ )
+ )
Ancestors
diff --git a/daq/jobs/remote.html b/daq/jobs/remote.html
index 2ff8122..fdbb3f7 100644
--- a/daq/jobs/remote.html
+++ b/daq/jobs/remote.html
@@ -120,10 +120,8 @@ Attributes
def handle_message(self, message: DAQJobMessage) -> bool:
if (
- # Do not send stats messages to the remote
- isinstance(message, DAQJobMessageStats)
# Ignore if we already received the message
- or message.id in self._remote_message_ids
+ message.id in self._remote_message_ids
# Ignore if the message is not allowed by the DAQ Job
or not super().handle_message(message)
# Ignore if the message is remote, meaning it was sent by another Supervisor