From 34ba94695cb5bdb6e6930f7eb02b72d8dbd924a1 Mon Sep 17 00:00:00 2001 From: Furkan Date: Sun, 13 Oct 2024 21:40:40 +0300 Subject: [PATCH] feat: add DAQJobStats --- src/daq/models.py | 10 +++++++++ src/daq/types.py | 5 +++++ src/main.py | 51 ++++++++++++++++++++++++++++++++++-------- src/tests/test_main.py | 7 +++--- 4 files changed, 61 insertions(+), 12 deletions(-) diff --git a/src/daq/models.py b/src/daq/models.py index 924a358..0f5587f 100644 --- a/src/daq/models.py +++ b/src/daq/models.py @@ -1,4 +1,6 @@ from dataclasses import dataclass +from datetime import datetime +from typing import Optional from dataclasses_json import DataClassJsonMixin @@ -18,6 +20,14 @@ class DAQJobMessageStop(DAQJobMessage): reason: str +@dataclass +class DAQJobStats: + message_in_count: int + message_out_count: int + last_message_in_date: Optional[datetime] + last_message_out_date: Optional[datetime] + + class DAQJobStopError(Exception): def __init__(self, reason: str): self.reason = reason diff --git a/src/daq/types.py b/src/daq/types.py index 7f38070..90a8a2a 100644 --- a/src/daq/types.py +++ b/src/daq/types.py @@ -1,5 +1,8 @@ +from typing import Dict + from daq.base import DAQJob from daq.caen.n1081b import DAQJobN1081B +from daq.models import DAQJobStats from daq.serve_http import DAQJobServeHTTP from daq.store.csv import DAQJobStoreCSV from daq.store.root import DAQJobStoreROOT @@ -12,3 +15,5 @@ "store_root": DAQJobStoreROOT, "serve_http": DAQJobServeHTTP, } + +DAQJobStatsDict = Dict[type[DAQJob], DAQJobStats] diff --git a/src/main.py b/src/main.py index 79aeddb..b5da822 100644 --- a/src/main.py +++ b/src/main.py @@ -1,14 +1,16 @@ import logging import time +from datetime import datetime from queue import Empty import coloredlogs -from daq.base import DAQJobThread +from daq.base import DAQJob, DAQJobThread from daq.daq_job import load_daq_jobs, parse_store_config, start_daq_job, start_daq_jobs -from daq.models import DAQJobMessage +from daq.models import DAQJobMessage, DAQJobStats from daq.store.base import DAQJobStore from daq.store.models import DAQJobMessageStore +from daq.types import DAQJobStatsDict DAQ_SUPERVISOR_SLEEP_TIME = 0.5 DAQ_JOB_QUEUE_ACTION_TIMEOUT = 0.1 @@ -18,7 +20,10 @@ def start_daq_job_threads() -> list[DAQJobThread]: return start_daq_jobs(load_daq_jobs("configs/")) -def loop(daq_job_threads: list[DAQJobThread]) -> list[DAQJobThread]: +def loop( + daq_job_threads: list[DAQJobThread], + daq_job_stats: DAQJobStatsDict, +) -> tuple[list[DAQJobThread], DAQJobStatsDict]: # Remove dead threads dead_threads = [t for t in daq_job_threads if not t.thread.is_alive()] # Clean up dead threads @@ -29,15 +34,30 @@ def loop(daq_job_threads: list[DAQJobThread]) -> list[DAQJobThread]: daq_job_threads.append(start_daq_job(thread.daq_job)) # Get messages from DAQ Jobs - daq_messages = get_messages_from_daq_jobs(daq_job_threads) + daq_messages_out = get_messages_from_daq_jobs(daq_job_threads, daq_job_stats) # Send messages to appropriate DAQ Jobs - send_messages_to_daq_jobs(daq_job_threads, daq_messages) + send_messages_to_daq_jobs(daq_job_threads, daq_messages_out, daq_job_stats) - return daq_job_threads + return daq_job_threads, daq_job_stats -def get_messages_from_daq_jobs(daq_job_threads: list[DAQJobThread]): +def get_or_create_daq_job_stats( + daq_job_stats: DAQJobStatsDict, daq_job_type: type[DAQJob] +) -> DAQJobStats: + if daq_job_type not in daq_job_stats: + daq_job_stats[daq_job_type] = DAQJobStats( + message_in_count=0, + message_out_count=0, + last_message_in_date=None, + last_message_out_date=None, + ) + return daq_job_stats[daq_job_type] + + +def get_messages_from_daq_jobs( + daq_job_threads: list[DAQJobThread], daq_job_stats: DAQJobStatsDict +) -> list[DAQJobMessage]: res = [] for thread in daq_job_threads: try: @@ -45,13 +65,20 @@ def get_messages_from_daq_jobs(daq_job_threads: list[DAQJobThread]): res.append( thread.daq_job.message_out.get(timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT) ) + + # Update stats + stats = get_or_create_daq_job_stats(daq_job_stats, type(thread.daq_job)) + stats.message_out_count += 1 + stats.last_message_out_date = datetime.now() except Empty: pass return res def send_messages_to_daq_jobs( - daq_job_threads: list[DAQJobThread], daq_messages: list[DAQJobMessage] + daq_job_threads: list[DAQJobThread], + daq_messages: list[DAQJobMessage], + daq_job_stats: DAQJobStatsDict, ): for message in daq_messages: if isinstance(message, DAQJobMessageStore) and isinstance( @@ -72,6 +99,11 @@ def send_messages_to_daq_jobs( continue daq_job.message_in.put(message, timeout=DAQ_JOB_QUEUE_ACTION_TIMEOUT) + # Update stats + stats = get_or_create_daq_job_stats(daq_job_stats, type(daq_job)) + stats.message_in_count += 1 + stats.last_message_in_date = datetime.now() + if __name__ == "__main__": coloredlogs.install( @@ -79,13 +111,14 @@ def send_messages_to_daq_jobs( datefmt="%Y-%m-%d %H:%M:%S", ) daq_job_threads = start_daq_job_threads() + daq_job_stats: DAQJobStatsDict = {} if not any(x for x in daq_job_threads if isinstance(x.daq_job, DAQJobStore)): logging.warning("No store job found, data will not be stored") while True: try: - daq_job_threads = loop(daq_job_threads) + daq_job_threads, daq_job_stats = loop(daq_job_threads, daq_job_stats) time.sleep(DAQ_SUPERVISOR_SLEEP_TIME) except KeyboardInterrupt: logging.warning("KeyboardInterrupt received, cleaning up") diff --git a/src/tests/test_main.py b/src/tests/test_main.py index 224cae4..8b2d19a 100644 --- a/src/tests/test_main.py +++ b/src/tests/test_main.py @@ -61,7 +61,8 @@ def test_loop(self, mock_start_daq_job): daq_job_threads = [mock_thread_alive, mock_thread_dead, mock_thread_store] daq_job_threads: list[DAQJobThread] = daq_job_threads - result = loop(daq_job_threads) + # TODO: test stats + result, _ = loop(daq_job_threads, {}) self.assertEqual( result, [mock_thread_alive, mock_thread_store, mock_thread_dead] @@ -79,7 +80,7 @@ def test_get_messages_from_daq_jobs(self): daq_job_threads = [mock_thread] daq_job_threads: list[DAQJobThread] = daq_job_threads - result = get_messages_from_daq_jobs(daq_job_threads) + result = get_messages_from_daq_jobs(daq_job_threads, {}) self.assertEqual(result, [mock_message]) @@ -94,7 +95,7 @@ def test_send_messages_to_daq_jobs(self, mock_parse_store_config): daq_job_threads = [mock_thread] daq_job_threads: list[DAQJobThread] = daq_job_threads - send_messages_to_daq_jobs(daq_job_threads, [mock_message]) + send_messages_to_daq_jobs(daq_job_threads, [mock_message], {}) mock_parse_store_config.assert_called_once_with({}) self.assertFalse(mock_thread.daq_job.message_in.empty())