-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement
DAQJobCAENToolbox
for CAEN digitizer register dumpi…
…ng and add tests
- Loading branch information
1 parent
e663e70
commit 5663207
Showing
2 changed files
with
189 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import os | ||
import subprocess | ||
import tempfile | ||
from datetime import datetime | ||
from shutil import which | ||
|
||
from enrgdaq.daq.base import DAQJob | ||
from enrgdaq.daq.store.models import ( | ||
DAQJobMessageStoreTabular, | ||
StorableDAQJobConfig, | ||
) | ||
from enrgdaq.utils.time import get_now_unix_timestamp_ms, sleep_for | ||
|
||
DAQ_JOB_CAEN_TOOLBOX_SLEEP_INTERVAL = 1 | ||
|
||
|
||
class DAQJobCAENToolboxConfig(StorableDAQJobConfig): | ||
""" | ||
Configuration for the DAQ job that dumps register data of a CAEN digitizer using the caen-toolbox cli. | ||
Attributes: | ||
digitizer_type (str): Type of the CAEN digitizer, e.g. `dig1` | ||
connection_string (str): Connection string of the CAEN digitizer, e.g. `-c OPTICAL_LINK -l 0` | ||
register_labels (dict[str, str]): Dictionary of register labels, e.g. `{"0x11a8": "ADC_ch_1_Temperature"}` | ||
""" | ||
|
||
digitizer_type: str | ||
connection_string: str | ||
register_labels: dict[str, str] | ||
|
||
|
||
class DAQJobCAENToolbox(DAQJob): | ||
""" | ||
Dumps register data of a CAEN digitizer using the caen-toolbox cli. | ||
Attributes: | ||
config (DAQJobCAENToolboxConfig): Configuration for the DAQ job. | ||
config_type (DAQJobCAENToolboxConfig): Configuration type for the DAQ job. | ||
""" | ||
|
||
config_type = DAQJobCAENToolboxConfig | ||
config: DAQJobCAENToolboxConfig | ||
|
||
def __init__(self, config: DAQJobCAENToolboxConfig, **kwargs): | ||
super().__init__(config, **kwargs) | ||
# Check if 'caen-toolbox' cli is present | ||
if which("caen-toolbox") is None: | ||
raise Exception("caen-toolbox cli not found") | ||
|
||
def _dump_digitizer(self) -> dict[str, int]: | ||
temp_file = os.path.join(tempfile.mkdtemp(), "reg-dump.csv") | ||
# dump registers | ||
res = subprocess.run( | ||
[ | ||
"caen-toolbox", | ||
self.config.digitizer_type, | ||
"dump", | ||
self.config.connection_string, | ||
] | ||
) | ||
if res.returncode != 0: | ||
raise Exception(f"caen-toolbox dump failed: {res.stderr}") | ||
if not os.path.exists(temp_file): | ||
raise Exception("Register dump file not found") | ||
with open(temp_file, "r") as f: | ||
lines = f.readlines() | ||
raw_registers = {} | ||
# Load CSV file | ||
for line in lines: | ||
if line.startswith("#"): | ||
continue | ||
parts = line.split(",") | ||
raw_registers[parts[0]] = int(parts[1], 16) | ||
registers = {} | ||
for key, label in self.config.register_labels.items(): | ||
registers[label] = raw_registers[key] | ||
return registers | ||
|
||
def start(self): | ||
while True: | ||
start_time = datetime.now() | ||
registers = self._dump_digitizer() | ||
self._put_message_out( | ||
DAQJobMessageStoreTabular( | ||
store_config=self.config.store_config, | ||
tag="caen-toolbox", | ||
keys=["timestamp", *[f"caen_{x}" for x in registers]], | ||
data=[ | ||
[ | ||
get_now_unix_timestamp_ms(), | ||
*[registers[x] for x in registers], | ||
] | ||
], | ||
) | ||
) | ||
sleep_for(DAQ_JOB_CAEN_TOOLBOX_SLEEP_INTERVAL, start_time) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import unittest | ||
from datetime import datetime | ||
from unittest.mock import MagicMock, mock_open, patch | ||
|
||
from enrgdaq.daq.jobs.caen.toolbox import DAQJobCAENToolbox, DAQJobCAENToolboxConfig | ||
from enrgdaq.daq.store.models import DAQJobMessageStoreTabular | ||
|
||
|
||
class TestDAQJobCAENToolbox(unittest.TestCase): | ||
@patch("enrgdaq.daq.jobs.caen.toolbox.which", return_value="/usr/bin/caen-toolbox") | ||
def setUp(self, mock_which): | ||
self.config = DAQJobCAENToolboxConfig( | ||
daq_job_type="", | ||
digitizer_type="dig1", | ||
connection_string="-c OPTICAL_LINK -l 0", | ||
store_config=MagicMock(), | ||
register_labels={"reg1": "caen_reg1", "reg2": "caen_reg2"}, | ||
) | ||
self.daq_job = DAQJobCAENToolbox(self.config) | ||
|
||
@patch("enrgdaq.daq.jobs.caen.toolbox.which", return_value="/usr/bin/caen-toolbox") | ||
def test_init_cli_present(self, mock_which): | ||
try: | ||
DAQJobCAENToolbox(self.config) | ||
except Exception: | ||
self.fail("DAQJobCAENToolbox raised Exception unexpectedly!") | ||
|
||
@patch("enrgdaq.daq.jobs.caen.toolbox.which", return_value=None) | ||
def test_init_cli_not_present(self, mock_which): | ||
with self.assertRaises(Exception) as context: | ||
DAQJobCAENToolbox(self.config) | ||
self.assertEqual(str(context.exception), "caen-toolbox cli not found") | ||
|
||
@patch("enrgdaq.daq.jobs.caen.toolbox.subprocess.run") | ||
@patch("enrgdaq.daq.jobs.caen.toolbox.os.path.exists", return_value=True) | ||
@patch( | ||
"enrgdaq.daq.jobs.caen.toolbox.open", | ||
new_callable=mock_open, | ||
read_data="reg1,0x1\nreg2,0x2\nwill_not_show_up,0x3\n", | ||
) | ||
def test_dump_digitizer(self, mock_open, mock_exists, mock_run): | ||
mock_run.return_value.returncode = 0 | ||
registers = self.daq_job._dump_digitizer() | ||
self.assertEqual(registers, {"caen_reg1": 1, "caen_reg2": 2}) | ||
|
||
@patch("enrgdaq.daq.jobs.caen.toolbox.subprocess.run") | ||
@patch("enrgdaq.daq.jobs.caen.toolbox.os.path.exists", return_value=False) | ||
def test_dump_digitizer_file_not_found(self, mock_exists, mock_run): | ||
mock_run.return_value.returncode = 0 | ||
with self.assertRaises(Exception) as context: | ||
self.daq_job._dump_digitizer() | ||
self.assertEqual(str(context.exception), "Register dump file not found") | ||
|
||
@patch("enrgdaq.daq.jobs.caen.toolbox.subprocess.run") | ||
def test_dump_digitizer_subprocess_error(self, mock_run): | ||
mock_run.return_value.returncode = 1 | ||
mock_run.return_value.stderr = "error" | ||
with self.assertRaises(Exception) as context: | ||
self.daq_job._dump_digitizer() | ||
self.assertEqual(str(context.exception), "caen-toolbox dump failed: error") | ||
|
||
@patch( | ||
"enrgdaq.daq.jobs.caen.toolbox.DAQJobCAENToolbox._dump_digitizer", | ||
return_value={"reg1": 1, "reg2": 2}, | ||
) | ||
@patch( | ||
"enrgdaq.daq.jobs.caen.toolbox.get_now_unix_timestamp_ms", | ||
return_value=1234567890, | ||
) | ||
@patch("time.sleep", return_value=None) | ||
@patch("enrgdaq.daq.jobs.caen.toolbox.sleep_for", side_effect=StopIteration) | ||
@patch("enrgdaq.daq.jobs.caen.toolbox.get_now_unix_timestamp_ms") | ||
@patch("uuid.uuid4", return_value="testuuid") | ||
def test_start( | ||
self, | ||
mock_uuid, | ||
mock_get_now_unix_timestamp_ms, | ||
mock_sleep_for, | ||
mock_get_now, | ||
mock_dump_digitizer, | ||
mock_sleep, | ||
): | ||
mock_get_now_unix_timestamp_ms.return_value = 1234567890 | ||
self.daq_job._put_message_out = MagicMock() | ||
with self.assertRaises(StopIteration): | ||
self.daq_job.start() | ||
message = self.daq_job._put_message_out.call_args[0][0] | ||
self.assertEqual(message.store_config, self.config.store_config) | ||
self.assertEqual(message.tag, "caen-toolbox") | ||
self.assertEqual(message.keys, ["timestamp", "caen_reg1", "caen_reg2"]) | ||
self.assertEqual(message.data, [[1234567890, 1, 2]]) | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |