From 20077733c9d5ca2fffca83033d45c81af462b3ec Mon Sep 17 00:00:00 2001 From: Furkan Date: Thu, 14 Nov 2024 23:39:09 +0300 Subject: [PATCH] feat: add store prefix to `DAQJobHandleStats` --- src/daq/jobs/handle_stats.py | 6 ++++++ src/supervisor.py | 18 ++++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/daq/jobs/handle_stats.py b/src/daq/jobs/handle_stats.py index cfb48db..f9af170 100644 --- a/src/daq/jobs/handle_stats.py +++ b/src/daq/jobs/handle_stats.py @@ -62,11 +62,17 @@ def unpack_record(record: DAQJobStatsRecord): ] ) + if message.daq_job_info and message.daq_job_info.supervisor_config: + prefix = message.daq_job_info.supervisor_config.supervisor_id + else: + prefix = None + self._put_message_out( DAQJobMessageStore( store_config=self.config.store_config, keys=keys, data=data_to_send, + prefix=prefix, ) ) diff --git a/src/supervisor.py b/src/supervisor.py index 23b9d0e..46f0997 100644 --- a/src/supervisor.py +++ b/src/supervisor.py @@ -17,7 +17,7 @@ start_daq_jobs, ) from daq.jobs.handle_stats import DAQJobMessageStats, DAQJobStatsDict -from daq.models import DAQJobConfig, DAQJobMessage, DAQJobStats +from daq.models import DAQJobConfig, DAQJobInfo, DAQJobMessage, DAQJobStats from daq.store.base import DAQJobStore from models import SupervisorConfig @@ -137,7 +137,12 @@ def get_supervisor_messages(self): messages = [] # Send stats message - messages.append(DAQJobMessageStats(stats=self.daq_job_stats)) + messages.append( + DAQJobMessageStats( + stats=self.daq_job_stats, + daq_job_info=self._get_supervisor_daq_job_info(), + ) + ) return messages def get_daq_job_stats( @@ -208,3 +213,12 @@ def _load_supervisor_config(self): with open(SUPERVISOR_CONFIG_FILE_PATH, "rb") as f: return msgspec.toml.decode(f.read(), type=SupervisorConfig) + + def _get_supervisor_daq_job_info(self): + return DAQJobInfo( + daq_job_type="Supervisor", + daq_job_class_name="Supervisor", + supervisor_config=self.config, + unique_id=self.config.supervisor_id, + instance_id=0, + )