Skip to content

Commit

Permalink
feat: create DAQRemoteConfig and add it to DAQJobConfig and DAQJobMes…
Browse files Browse the repository at this point in the history
…sageStore

- let DAQJobMessageStore override DAQJobRemoteConfig
- create a custom method for putting messages out for daq jobs
- add new tests for DAQJobRemote
  • Loading branch information
furkan-bilgin committed Nov 10, 2024
1 parent 49ae5dc commit 24104af
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 26 deletions.
13 changes: 13 additions & 0 deletions src/daq/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DAQJobMessageStop,
DAQJobStopError,
)
from daq.store.models import DAQJobMessageStore
from models import SupervisorConfig

daq_job_instance_id = 0
Expand Down Expand Up @@ -96,6 +97,18 @@ def _create_info(self) -> "DAQJobInfo":
supervisor_config=getattr(self, "_supervisor_config", None),
)

def _put_message_out(self, message: DAQJobMessage):
message.daq_job_info = self.info
message.remote_config = self.config.remote_config

# Get the remote config from the store config if it exists
if isinstance(message, DAQJobMessageStore):
store_remote_config = message.get_remote_config()
if store_remote_config is not None:
message.remote_config = store_remote_config

self.message_out.put(message)

def __del__(self):
self._logger.info("DAQ job is being deleted")
self._has_been_freed = True
Expand Down
3 changes: 1 addition & 2 deletions src/daq/jobs/caen/n1081b.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,9 @@ def _send_store_message(self, data: dict, section):
get_now_unix_timestamp_ms(), # unix timestamp in milliseconds
*[x["value"] for x in data["counters"]],
]
self.message_out.put(
self._put_message_out(
DAQJobMessageStore(
store_config=self.config.store_config,
daq_job_info=self.info,
prefix=section,
keys=keys,
data=[values],
Expand Down
3 changes: 1 addition & 2 deletions src/daq/jobs/handle_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ def handle_message(self, message: DAQJobMessageAlert) -> bool:
]
]

self.message_out.put(
self._put_message_out(
DAQJobMessageStore(
store_config=self.config.store_config,
daq_job_info=self.info,
keys=keys,
data=data_to_send,
)
Expand Down
3 changes: 1 addition & 2 deletions src/daq/jobs/handle_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ def unpack_record(record: DAQJobStatsRecord):
]
)

self.message_out.put(
self._put_message_out(
DAQJobMessageStore(
store_config=self.config.store_config,
daq_job_info=self.info,
keys=keys,
data=data_to_send,
)
Expand Down
3 changes: 1 addition & 2 deletions src/daq/jobs/healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,8 @@ def handle_checks(self):
self._sent_alert_items.remove(item_id)

def send_alert(self, item: HealthcheckItem):
self.message_out.put(
self._put_message_out(
DAQJobMessageAlert(
daq_job_info=self.info,
date=datetime.now(),
alert_info=item.alert_info,
)
Expand Down
8 changes: 3 additions & 5 deletions src/daq/jobs/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from daq.jobs.handle_stats import DAQJobMessageStats
from daq.models import (
DEFAULT_REMOTE_TOPIC,
REMOTE_TOPIC_VOID,
DAQJobConfig,
DAQJobMessage,
)
Expand Down Expand Up @@ -84,12 +83,11 @@ def handle_message(self, message: DAQJobMessage) -> bool:
):
return True # Silently ignore

remote_topic = message.remote_topic or DEFAULT_REMOTE_TOPIC

# Don't send message if the topic is void
if remote_topic == REMOTE_TOPIC_VOID:
if message.remote_config.remote_disable:
return True

remote_topic = message.remote_config.remote_topic or DEFAULT_REMOTE_TOPIC

self._zmq_pub.send_multipart(
[remote_topic.encode(), self._pack_message(message)]
)
Expand Down
3 changes: 1 addition & 2 deletions src/daq/jobs/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ def _send_store_message(self):
def get_int():
return randint(self.config.rand_min, self.config.rand_max)

self.message_out.put(
self._put_message_out(
DAQJobMessageStore(
store_config=self.config.store_config,
daq_job_info=self.info,
keys=["A", "B", "C"],
data=[[get_int(), get_int(), get_int()]],
)
Expand Down
8 changes: 7 additions & 1 deletion src/daq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ def mock() -> "DAQJobInfo":
)


class DAQRemoteConfig(Struct, kw_only=True):
remote_topic: Optional[str] = DEFAULT_REMOTE_TOPIC
remote_disable: Optional[bool] = False


class DAQJobConfig(Struct, kw_only=True):
verbosity: LogVerbosity = LogVerbosity.INFO
remote_config: Optional[DAQRemoteConfig] = field(default_factory=DAQRemoteConfig)
daq_job_type: str


Expand All @@ -54,7 +60,7 @@ class DAQJobMessage(Struct, kw_only=True):
timestamp: Optional[datetime] = field(default_factory=datetime.now)
is_remote: bool = False
daq_job_info: Optional["DAQJobInfo"] = None
remote_topic: Optional[str] = DEFAULT_REMOTE_TOPIC
remote_config: DAQRemoteConfig = field(default_factory=DAQRemoteConfig)


class DAQJobMessageStop(DAQJobMessage):
Expand Down
17 changes: 8 additions & 9 deletions src/daq/store/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Any, Optional

from msgspec import Struct
from msgspec import Struct, field

from daq.models import REMOTE_TOPIC_VOID, DAQJobConfig, DAQJobMessage
from daq.models import DAQJobConfig, DAQJobMessage, DAQRemoteConfig


class DAQJobStoreConfig(Struct, dict=True):
Expand All @@ -29,15 +29,15 @@ class DAQJobMessageStore(DAQJobMessage):
data: list[list[Any]]
prefix: str | None = None

def __post_init__(self):
def get_remote_config(self) -> Optional[DAQRemoteConfig]:
for key in dir(self.store_config):
value = getattr(self.store_config, key)
if not isinstance(value, DAQJobStoreConfigBase):
continue
if value.remote_topic is not None:
self.remote_topic = value.remote_topic
if getattr(value, "remote_disable", False):
self.remote_topic = REMOTE_TOPIC_VOID
if value.remote_config is None:
continue
return value.remote_config
return None


class StorableDAQJobConfig(DAQJobConfig):
Expand All @@ -54,8 +54,7 @@ class DAQJobStoreTargetInstance(Struct):


class DAQJobStoreConfigBase(Struct, kw_only=True):
remote_topic: Optional[str] = None
remote_disable: Optional[bool] = None
remote_config: Optional[DAQRemoteConfig] = field(default_factory=DAQRemoteConfig)


class DAQJobStoreConfigCSV(DAQJobStoreConfigBase):
Expand Down
41 changes: 40 additions & 1 deletion src/tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from daq.jobs.remote import DAQJobRemote, DAQJobRemoteConfig
from daq.jobs.store.csv import DAQJobStoreConfigCSV
from daq.jobs.test_job import DAQJobTest
from daq.models import DEFAULT_REMOTE_TOPIC, DAQJobMessage
from daq.models import DEFAULT_REMOTE_TOPIC, DAQJobMessage, DAQRemoteConfig
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig


Expand Down Expand Up @@ -82,6 +82,45 @@ def side_effect():
self.assertEqual(self.daq_job_remote.message_out.put.call_count, 1)
self.assertEqual(call_count, 2)

def test_handle_message_with_remote_config(self):
message = DAQJobMessage(
id="testmsg",
remote_config=DAQRemoteConfig(remote_topic="custom_topic"),
)
self.daq_job_remote.handle_message(message)
self.mock_sender.send_multipart.assert_called_once_with(
["custom_topic".encode(), self.daq_job_remote._pack_message(message)]
)

def test_handle_message_with_remote_disable(self):
message = DAQJobMessage(
id="testmsg",
remote_config=DAQRemoteConfig(remote_disable=True),
)
result = self.daq_job_remote.handle_message(message)
self.assertTrue(result)
self.mock_sender.send_multipart.assert_not_called()

def test_handle_message_with_default_remote_topic(self):
message = DAQJobMessage(
id="testmsg",
remote_config=DAQRemoteConfig(),
)
self.daq_job_remote.handle_message(message)
self.mock_sender.send_multipart.assert_called_once_with(
[DEFAULT_REMOTE_TOPIC.encode(), self.daq_job_remote._pack_message(message)]
)

def test_handle_message_with_custom_remote_topic(self):
message = DAQJobMessage(
id="testmsg",
remote_config=DAQRemoteConfig(remote_topic="custom_topic"),
)
self.daq_job_remote.handle_message(message)
self.mock_sender.send_multipart.assert_called_once_with(
["custom_topic".encode(), self.daq_job_remote._pack_message(message)]
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 24104af

Please sign in to comment.