From 56632075bb564251f6f1312d2bfdf960509cc60b Mon Sep 17 00:00:00 2001 From: Furkan Bilgin Date: Thu, 26 Dec 2024 00:06:48 +0300 Subject: [PATCH] feat: implement `DAQJobCAENToolbox` for CAEN digitizer register dumping and add tests --- src/enrgdaq/daq/jobs/caen/toolbox.py | 94 +++++++++++++++++++++++++++ src/tests/test_toolbox.py | 95 ++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 src/enrgdaq/daq/jobs/caen/toolbox.py create mode 100644 src/tests/test_toolbox.py diff --git a/src/enrgdaq/daq/jobs/caen/toolbox.py b/src/enrgdaq/daq/jobs/caen/toolbox.py new file mode 100644 index 0000000..c9d1ed7 --- /dev/null +++ b/src/enrgdaq/daq/jobs/caen/toolbox.py @@ -0,0 +1,94 @@ +import os +import subprocess +import tempfile +from datetime import datetime +from shutil import which + +from enrgdaq.daq.base import DAQJob +from enrgdaq.daq.store.models import ( + DAQJobMessageStoreTabular, + StorableDAQJobConfig, +) +from enrgdaq.utils.time import get_now_unix_timestamp_ms, sleep_for + +DAQ_JOB_CAEN_TOOLBOX_SLEEP_INTERVAL = 1 + + +class DAQJobCAENToolboxConfig(StorableDAQJobConfig): + """ + Configuration for the DAQ job that dumps register data of a CAEN digitizer using the caen-toolbox cli. + Attributes: + digitizer_type (str): Type of the CAEN digitizer, e.g. `dig1` + connection_string (str): Connection string of the CAEN digitizer, e.g. `-c OPTICAL_LINK -l 0` + register_labels (dict[str, str]): Dictionary of register labels, e.g. `{"0x11a8": "ADC_ch_1_Temperature"}` + """ + + digitizer_type: str + connection_string: str + register_labels: dict[str, str] + + +class DAQJobCAENToolbox(DAQJob): + """ + Dumps register data of a CAEN digitizer using the caen-toolbox cli. + Attributes: + config (DAQJobCAENToolboxConfig): Configuration for the DAQ job. + config_type (DAQJobCAENToolboxConfig): Configuration type for the DAQ job. + """ + + config_type = DAQJobCAENToolboxConfig + config: DAQJobCAENToolboxConfig + + def __init__(self, config: DAQJobCAENToolboxConfig, **kwargs): + super().__init__(config, **kwargs) + # Check if 'caen-toolbox' cli is present + if which("caen-toolbox") is None: + raise Exception("caen-toolbox cli not found") + + def _dump_digitizer(self) -> dict[str, int]: + temp_file = os.path.join(tempfile.mkdtemp(), "reg-dump.csv") + # dump registers + res = subprocess.run( + [ + "caen-toolbox", + self.config.digitizer_type, + "dump", + self.config.connection_string, + ] + ) + if res.returncode != 0: + raise Exception(f"caen-toolbox dump failed: {res.stderr}") + if not os.path.exists(temp_file): + raise Exception("Register dump file not found") + with open(temp_file, "r") as f: + lines = f.readlines() + raw_registers = {} + # Load CSV file + for line in lines: + if line.startswith("#"): + continue + parts = line.split(",") + raw_registers[parts[0]] = int(parts[1], 16) + registers = {} + for key, label in self.config.register_labels.items(): + registers[label] = raw_registers[key] + return registers + + def start(self): + while True: + start_time = datetime.now() + registers = self._dump_digitizer() + self._put_message_out( + DAQJobMessageStoreTabular( + store_config=self.config.store_config, + tag="caen-toolbox", + keys=["timestamp", *[f"caen_{x}" for x in registers]], + data=[ + [ + get_now_unix_timestamp_ms(), + *[registers[x] for x in registers], + ] + ], + ) + ) + sleep_for(DAQ_JOB_CAEN_TOOLBOX_SLEEP_INTERVAL, start_time) diff --git a/src/tests/test_toolbox.py b/src/tests/test_toolbox.py new file mode 100644 index 0000000..afe3132 --- /dev/null +++ b/src/tests/test_toolbox.py @@ -0,0 +1,95 @@ +import unittest +from datetime import datetime +from unittest.mock import MagicMock, mock_open, patch + +from enrgdaq.daq.jobs.caen.toolbox import DAQJobCAENToolbox, DAQJobCAENToolboxConfig +from enrgdaq.daq.store.models import DAQJobMessageStoreTabular + + +class TestDAQJobCAENToolbox(unittest.TestCase): + @patch("enrgdaq.daq.jobs.caen.toolbox.which", return_value="/usr/bin/caen-toolbox") + def setUp(self, mock_which): + self.config = DAQJobCAENToolboxConfig( + daq_job_type="", + digitizer_type="dig1", + connection_string="-c OPTICAL_LINK -l 0", + store_config=MagicMock(), + register_labels={"reg1": "caen_reg1", "reg2": "caen_reg2"}, + ) + self.daq_job = DAQJobCAENToolbox(self.config) + + @patch("enrgdaq.daq.jobs.caen.toolbox.which", return_value="/usr/bin/caen-toolbox") + def test_init_cli_present(self, mock_which): + try: + DAQJobCAENToolbox(self.config) + except Exception: + self.fail("DAQJobCAENToolbox raised Exception unexpectedly!") + + @patch("enrgdaq.daq.jobs.caen.toolbox.which", return_value=None) + def test_init_cli_not_present(self, mock_which): + with self.assertRaises(Exception) as context: + DAQJobCAENToolbox(self.config) + self.assertEqual(str(context.exception), "caen-toolbox cli not found") + + @patch("enrgdaq.daq.jobs.caen.toolbox.subprocess.run") + @patch("enrgdaq.daq.jobs.caen.toolbox.os.path.exists", return_value=True) + @patch( + "enrgdaq.daq.jobs.caen.toolbox.open", + new_callable=mock_open, + read_data="reg1,0x1\nreg2,0x2\nwill_not_show_up,0x3\n", + ) + def test_dump_digitizer(self, mock_open, mock_exists, mock_run): + mock_run.return_value.returncode = 0 + registers = self.daq_job._dump_digitizer() + self.assertEqual(registers, {"caen_reg1": 1, "caen_reg2": 2}) + + @patch("enrgdaq.daq.jobs.caen.toolbox.subprocess.run") + @patch("enrgdaq.daq.jobs.caen.toolbox.os.path.exists", return_value=False) + def test_dump_digitizer_file_not_found(self, mock_exists, mock_run): + mock_run.return_value.returncode = 0 + with self.assertRaises(Exception) as context: + self.daq_job._dump_digitizer() + self.assertEqual(str(context.exception), "Register dump file not found") + + @patch("enrgdaq.daq.jobs.caen.toolbox.subprocess.run") + def test_dump_digitizer_subprocess_error(self, mock_run): + mock_run.return_value.returncode = 1 + mock_run.return_value.stderr = "error" + with self.assertRaises(Exception) as context: + self.daq_job._dump_digitizer() + self.assertEqual(str(context.exception), "caen-toolbox dump failed: error") + + @patch( + "enrgdaq.daq.jobs.caen.toolbox.DAQJobCAENToolbox._dump_digitizer", + return_value={"reg1": 1, "reg2": 2}, + ) + @patch( + "enrgdaq.daq.jobs.caen.toolbox.get_now_unix_timestamp_ms", + return_value=1234567890, + ) + @patch("time.sleep", return_value=None) + @patch("enrgdaq.daq.jobs.caen.toolbox.sleep_for", side_effect=StopIteration) + @patch("enrgdaq.daq.jobs.caen.toolbox.get_now_unix_timestamp_ms") + @patch("uuid.uuid4", return_value="testuuid") + def test_start( + self, + mock_uuid, + mock_get_now_unix_timestamp_ms, + mock_sleep_for, + mock_get_now, + mock_dump_digitizer, + mock_sleep, + ): + mock_get_now_unix_timestamp_ms.return_value = 1234567890 + self.daq_job._put_message_out = MagicMock() + with self.assertRaises(StopIteration): + self.daq_job.start() + message = self.daq_job._put_message_out.call_args[0][0] + self.assertEqual(message.store_config, self.config.store_config) + self.assertEqual(message.tag, "caen-toolbox") + self.assertEqual(message.keys, ["timestamp", "caen_reg1", "caen_reg2"]) + self.assertEqual(message.data, [[1234567890, 1, 2]]) + + +if __name__ == "__main__": + unittest.main()