From f609b3fc277bcdd579b013809dc194ff325fc36e Mon Sep 17 00:00:00 2001 From: Furkan Date: Wed, 4 Dec 2024 23:36:19 +0300 Subject: [PATCH] fix: streamline supervisor ID handling in DAQJobHandleStats and convert bytes to megabytes --- src/enrgdaq/daq/jobs/handle_stats.py | 30 +++++++++++++++++----------- src/enrgdaq/daq/jobs/remote.py | 2 +- src/enrgdaq/daq/models.py | 7 +++++++ 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/enrgdaq/daq/jobs/handle_stats.py b/src/enrgdaq/daq/jobs/handle_stats.py index ea1d1c8..306a746 100644 --- a/src/enrgdaq/daq/jobs/handle_stats.py +++ b/src/enrgdaq/daq/jobs/handle_stats.py @@ -74,13 +74,9 @@ def handle_message( return True if isinstance(message, DAQJobMessageStats): - self._stats[message.daq_job_info.supervisor_config.supervisor_id] = ( - message.stats - ) + self._stats[message.supervisor_id] = message.stats elif isinstance(message, DAQJobMessageStatsRemote): - self._remote_stats[message.daq_job_info.supervisor_config.supervisor_id] = ( - message.stats - ) + self._remote_stats[message.supervisor_id] = message.stats return True def _save_stats(self): @@ -135,9 +131,9 @@ def _save_remote_stats(self): "is_alive", "last_active", "message_in_count", - "message_in_bytes", + "message_in_megabytes", "message_out_count", - "message_out_bytes", + "message_out_megabytes", ] data_to_send = [] @@ -152,12 +148,19 @@ def _save_remote_stats(self): ].items(): remote_stats_combined[supervisor_id] = remote_stats - for remote_stats_dict in self._remote_stats.values(): + for remote_supervisor_id, remote_stats_dict in self._remote_stats.items(): # For each remote stats dict, combine the values for ( supervisor_id, remote_stats_dict_serialized_item, ) in remote_stats_dict.items(): + # Skip if the remote supervisor id is the same as the local supervisor id or + # other supervisors try to overwrite other supervisors + if supervisor_id != remote_supervisor_id or ( + self._supervisor_config + and self._supervisor_config.supervisor_id == remote_supervisor_id + ): + continue # Convert the supervisor remote stats to a dict remote_stats_dict_serialized = msgspec.structs.asdict( remote_stats_dict_serialized_item @@ -167,20 +170,23 @@ def _save_remote_stats(self): if value == 0 or not value: continue setattr(remote_stats_combined[supervisor_id], item, value) - for supervisor_id, remote_stats in remote_stats_combined.items(): is_remote_alive = datetime.now() - remote_stats.last_active <= timedelta( seconds=DAQ_JOB_HANDLE_STATS_REMOTE_ALIVE_SECONDS ) + + def _byte_to_mb(x): + return "{:.3f}".format(x / 1024 / 1024) + data_to_send.append( [ supervisor_id, str(is_remote_alive).lower(), remote_stats.last_active, remote_stats.message_in_count, - remote_stats.message_in_bytes, + _byte_to_mb(remote_stats.message_in_bytes), remote_stats.message_out_count, - remote_stats.message_out_bytes, + _byte_to_mb(remote_stats.message_out_bytes), ] ) self._put_message_out( diff --git a/src/enrgdaq/daq/jobs/remote.py b/src/enrgdaq/daq/jobs/remote.py index 473f6e3..61bf91e 100644 --- a/src/enrgdaq/daq/jobs/remote.py +++ b/src/enrgdaq/daq/jobs/remote.py @@ -298,8 +298,8 @@ def _unpack_message(self, message: bytes) -> DAQJobMessage: def _send_remote_stats_message(self): msg = DAQJobMessageStatsRemote(dict(self._remote_stats)) - self._send_remote_message(msg) self._put_message_out(msg) + self.message_in.put(msg) def __del__(self): """ diff --git a/src/enrgdaq/daq/models.py b/src/enrgdaq/daq/models.py index 2e93756..dde5b51 100644 --- a/src/enrgdaq/daq/models.py +++ b/src/enrgdaq/daq/models.py @@ -113,6 +113,13 @@ class DAQJobMessage(Struct, kw_only=True): daq_job_info: Optional["DAQJobInfo"] = None remote_config: DAQRemoteConfig = field(default_factory=DAQRemoteConfig) + @property + def supervisor_id(self) -> str: + if self.daq_job_info is None or self.daq_job_info.supervisor_config is None: + return "unknown" + + return self.daq_job_info.supervisor_config.supervisor_id + class DAQJobMessageStop(DAQJobMessage): reason: str