diff --git a/src/daq/jobs/handle_stats.py b/src/daq/jobs/handle_stats.py new file mode 100644 index 0000000..b464294 --- /dev/null +++ b/src/daq/jobs/handle_stats.py @@ -0,0 +1,71 @@ +import time +from dataclasses import dataclass +from datetime import datetime +from typing import Dict, Optional + +from daq.base import DAQJob +from daq.models import DAQJobMessage, DAQJobStats +from daq.store.models import DAQJobMessageStore, StorableDAQJobConfig + +DAQJobStatsDict = Dict[type[DAQJob], DAQJobStats] + + +@dataclass +class DAQJobHandleStatsConfig(StorableDAQJobConfig): + pass + + +@dataclass +class DAQJobMessageStats(DAQJobMessage): + stats: DAQJobStatsDict + + +class DAQJobHandleStats(DAQJob): + allowed_message_in_types = [DAQJobMessageStats] + config_type = DAQJobHandleStatsConfig + config: DAQJobHandleStatsConfig + + def start(self): + while True: + self.consume() + time.sleep(1) + + def handle_message(self, message: DAQJobMessageStats) -> bool: + if not super().handle_message(message): + return False + + keys = [ + "daq_job", + "last_message_in_date", + "message_in_count", + "last_message_out_date", + "message_out_count", + ] + + def datetime_to_str(dt: Optional[datetime]): + if dt is None: + return "N/A" + return dt.strftime("%Y-%m-%d %H:%M:%S") + + data_to_send = [] + for daq_job_type, msg in message.stats.items(): + data_to_send.append( + [ + daq_job_type.__name__, + datetime_to_str(msg.last_message_in_date), + msg.message_in_count, + datetime_to_str(msg.last_message_out_date), + msg.message_out_count, + ] + ) + + self.message_out.put( + DAQJobMessageStore( + store_config=self.config.store_config, + daq_job=self, + keys=keys, + data=data_to_send, + ) + ) + + return True diff --git a/src/daq/jobs/store/csv.py b/src/daq/jobs/store/csv.py index 089ad85..9c9916c 100644 --- a/src/daq/jobs/store/csv.py +++ b/src/daq/jobs/store/csv.py @@ -5,7 +5,7 @@ from datetime import datetime from io import TextIOWrapper from pathlib import Path -from typing import Any, cast +from typing import Any, Optional, cast from daq.models import DAQJobConfig from daq.store.base import DAQJobStore @@ -20,6 +20,7 @@ class DAQJobStoreConfigCSV(DAQJobStoreConfig): file_path: str add_date: bool + overwrite: Optional[bool] = None @dataclass @@ -32,6 +33,7 @@ class CSVFile: file: TextIOWrapper last_flush_date: datetime write_queue: deque[list[Any]] + overwrite: Optional[bool] = None class DAQJobStoreCSV(DAQJobStore): @@ -51,10 +53,12 @@ def handle_message(self, message: DAQJobMessageStore) -> bool: file_path = modify_file_path( store_config.file_path, store_config.add_date, message.prefix ) - file, new_file = self._open_csv_file(file_path) + file, new_file = self._open_csv_file(file_path, store_config.overwrite) + if file.overwrite: + file.write_queue.clear() # Write headers if the file is new - if new_file: + if new_file or file.overwrite: file.write_queue.append(message.keys) # Append rows to write_queue @@ -63,7 +67,9 @@ def handle_message(self, message: DAQJobMessageStore) -> bool: return True - def _open_csv_file(self, file_path: str) -> tuple[CSVFile, bool]: + def _open_csv_file( + self, file_path: str, overwrite: Optional[bool] + ) -> tuple[CSVFile, bool]: """ Opens a file and returns (CSVFile, new_file) """ @@ -74,7 +80,12 @@ def _open_csv_file(self, file_path: str) -> tuple[CSVFile, bool]: Path(file_path).touch() # Open file - file = CSVFile(open(file_path, "a", newline=""), datetime.now(), deque()) + file = CSVFile( + open(file_path, "a" if not overwrite else "w", newline=""), + datetime.now(), + deque(), + overwrite, + ) self._open_csv_files[file_path] = file else: file_exists = True @@ -104,6 +115,11 @@ def store_loop(self): writer.writerows(list(file.write_queue)) file.write_queue.clear() + if file.overwrite: + file.file.close() + files_to_delete.append(file_path) + continue + # Flush if the flush time is up if self._flush(file): self._logger.debug(f"Flushed '{file.file.name}' ({row_size} rows)") diff --git a/src/daq/types.py b/src/daq/types.py index 4059542..f0da860 100644 --- a/src/daq/types.py +++ b/src/daq/types.py @@ -1,12 +1,10 @@ -from typing import Dict - from daq.base import DAQJob from daq.jobs.caen.n1081b import DAQJobN1081B +from daq.jobs.handle_stats import DAQJobHandleStats from daq.jobs.serve_http import DAQJobServeHTTP from daq.jobs.store.csv import DAQJobStoreCSV from daq.jobs.store.root import DAQJobStoreROOT from daq.jobs.test_job import DAQJobTest -from daq.models import DAQJobStats DAQ_JOB_TYPE_TO_CLASS: dict[str, type[DAQJob]] = { "n1081b": DAQJobN1081B, @@ -14,6 +12,5 @@ "store_csv": DAQJobStoreCSV, "store_root": DAQJobStoreROOT, "serve_http": DAQJobServeHTTP, + "handle_stats": DAQJobHandleStats, } - -DAQJobStatsDict = Dict[type[DAQJob], DAQJobStats] diff --git a/src/main.py b/src/main.py index b5da822..7198d89 100644 --- a/src/main.py +++ b/src/main.py @@ -7,10 +7,10 @@ from daq.base import DAQJob, DAQJobThread from daq.daq_job import load_daq_jobs, parse_store_config, start_daq_job, start_daq_jobs +from daq.jobs.handle_stats import DAQJobMessageStats, DAQJobStatsDict from daq.models import DAQJobMessage, DAQJobStats from daq.store.base import DAQJobStore from daq.store.models import DAQJobMessageStore -from daq.types import DAQJobStatsDict DAQ_SUPERVISOR_SLEEP_TIME = 0.5 DAQ_JOB_QUEUE_ACTION_TIMEOUT = 0.1 @@ -36,12 +36,25 @@ def loop( # Get messages from DAQ Jobs daq_messages_out = get_messages_from_daq_jobs(daq_job_threads, daq_job_stats) + # Add supervisor messages + daq_messages_out.extend(get_supervisor_messages(daq_job_threads, daq_job_stats)) + # Send messages to appropriate DAQ Jobs send_messages_to_daq_jobs(daq_job_threads, daq_messages_out, daq_job_stats) return daq_job_threads, daq_job_stats +def get_supervisor_messages( + daq_job_threads: list[DAQJobThread], daq_job_stats: DAQJobStatsDict +): + messages = [] + + # Send stats message + messages.append(DAQJobMessageStats(daq_job_stats)) + return messages + + def get_or_create_daq_job_stats( daq_job_stats: DAQJobStatsDict, daq_job_type: type[DAQJob] ) -> DAQJobStats: