Skip to content

Commit

Permalink
fix: streamline supervisor ID handling in DAQJobHandleStats and conve…
Browse files Browse the repository at this point in the history
…rt bytes to megabytes
  • Loading branch information
furkan-bilgin committed Dec 4, 2024
1 parent 0264e37 commit f609b3f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
30 changes: 18 additions & 12 deletions src/enrgdaq/daq/jobs/handle_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,9 @@ def handle_message(
return True

if isinstance(message, DAQJobMessageStats):
self._stats[message.daq_job_info.supervisor_config.supervisor_id] = (
message.stats
)
self._stats[message.supervisor_id] = message.stats
elif isinstance(message, DAQJobMessageStatsRemote):
self._remote_stats[message.daq_job_info.supervisor_config.supervisor_id] = (
message.stats
)
self._remote_stats[message.supervisor_id] = message.stats
return True

def _save_stats(self):
Expand Down Expand Up @@ -135,9 +131,9 @@ def _save_remote_stats(self):
"is_alive",
"last_active",
"message_in_count",
"message_in_bytes",
"message_in_megabytes",
"message_out_count",
"message_out_bytes",
"message_out_megabytes",
]
data_to_send = []

Expand All @@ -152,12 +148,19 @@ def _save_remote_stats(self):
].items():
remote_stats_combined[supervisor_id] = remote_stats

for remote_stats_dict in self._remote_stats.values():
for remote_supervisor_id, remote_stats_dict in self._remote_stats.items():
# For each remote stats dict, combine the values
for (
supervisor_id,
remote_stats_dict_serialized_item,
) in remote_stats_dict.items():
# Skip if the remote supervisor id is the same as the local supervisor id or
# other supervisors try to overwrite other supervisors
if supervisor_id != remote_supervisor_id or (
self._supervisor_config
and self._supervisor_config.supervisor_id == remote_supervisor_id
):
continue
# Convert the supervisor remote stats to a dict
remote_stats_dict_serialized = msgspec.structs.asdict(
remote_stats_dict_serialized_item
Expand All @@ -167,20 +170,23 @@ def _save_remote_stats(self):
if value == 0 or not value:
continue
setattr(remote_stats_combined[supervisor_id], item, value)

for supervisor_id, remote_stats in remote_stats_combined.items():
is_remote_alive = datetime.now() - remote_stats.last_active <= timedelta(
seconds=DAQ_JOB_HANDLE_STATS_REMOTE_ALIVE_SECONDS
)

def _byte_to_mb(x):
return "{:.3f}".format(x / 1024 / 1024)

data_to_send.append(
[
supervisor_id,
str(is_remote_alive).lower(),
remote_stats.last_active,
remote_stats.message_in_count,
remote_stats.message_in_bytes,
_byte_to_mb(remote_stats.message_in_bytes),
remote_stats.message_out_count,
remote_stats.message_out_bytes,
_byte_to_mb(remote_stats.message_out_bytes),
]
)
self._put_message_out(
Expand Down
2 changes: 1 addition & 1 deletion src/enrgdaq/daq/jobs/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ def _unpack_message(self, message: bytes) -> DAQJobMessage:

def _send_remote_stats_message(self):
msg = DAQJobMessageStatsRemote(dict(self._remote_stats))
self._send_remote_message(msg)
self._put_message_out(msg)
self.message_in.put(msg)

def __del__(self):
"""
Expand Down
7 changes: 7 additions & 0 deletions src/enrgdaq/daq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ class DAQJobMessage(Struct, kw_only=True):
daq_job_info: Optional["DAQJobInfo"] = None
remote_config: DAQRemoteConfig = field(default_factory=DAQRemoteConfig)

@property
def supervisor_id(self) -> str:
if self.daq_job_info is None or self.daq_job_info.supervisor_config is None:
return "unknown"

return self.daq_job_info.supervisor_config.supervisor_id


class DAQJobMessageStop(DAQJobMessage):
reason: str
Expand Down

0 comments on commit f609b3f

Please sign in to comment.