Skip to content

Commit

Permalink
feat: add more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Nov 16, 2024
1 parent 6a7f2e0 commit 436d791
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 9 deletions.
12 changes: 12 additions & 0 deletions src/enrgdaq/daq/jobs/handle_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,27 @@


class DAQJobHandleAlertsConfig(StorableDAQJobConfig):
"""
Configuration class for DAQJobHandleAlerts.
Inherits from StorableDAQJobConfig.
"""

pass


class DAQJobHandleAlerts(DAQJob):
"""
DAQJobHandleAlerts is a job that stores alert messages (DAQJobMessageAlert).
"""

allowed_message_in_types = [DAQJobMessageAlert]
config_type = DAQJobHandleAlertsConfig
config: DAQJobHandleAlertsConfig

def start(self):
"""
Starts the job, continuously consuming messages from the queue.
"""
while True:
self.consume(nowait=False)

Expand Down
11 changes: 11 additions & 0 deletions src/enrgdaq/daq/jobs/handle_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,25 @@


class DAQJobHandleStatsConfig(StorableDAQJobConfig):
"""Configuration class for DAQJobHandleStats."""

pass


class DAQJobMessageStats(DAQJobMessage):
"""Message class containing DAQ job statistics."""

stats: DAQJobStatsDict


class DAQJobHandleStats(DAQJob):
"""
Handles statistics for DAQ jobs.
This class is responsible for consuming and processing DAQ job statistics messages.
It extracts relevant statistics from the messages and stores them.
"""

allowed_message_in_types = [DAQJobMessageStats]
config_type = DAQJobHandleStatsConfig
config: DAQJobHandleStatsConfig
Expand Down
54 changes: 52 additions & 2 deletions src/enrgdaq/daq/jobs/healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,37 @@


class AlertCondition(str, Enum):
"""Enumeration for alert conditions."""

SATISFIED = "satisfied"
UNSATISFIED = "unsatisfied"


class HealthcheckItem(Struct):
"""Represents a healthcheck item with alert information."""

alert_info: DAQAlertInfo


class HealthcheckStatsItem(HealthcheckItem):
"""
Represents a healthcheck stats item with additional attributes.
Attributes:
daq_job_type (str): The type of the DAQ (Data Acquisition) job.
stats_key (str): The key associated with the stats item.
alert_if_interval_is (AlertCondition): The condition to alert if the interval meets certain criteria. "satisfied" means the condition is met, "unsatisfied" means the condition is not met.
interval (Optional[str]): The interval string representing time duration (e.g., '5s' for 5 seconds, '10m' for 10 minutes, '1h' for 1 hour).
amount (Optional[int]): An optional amount associated with the stats item.
"""

daq_job_type: str
stats_key: str
alert_if_interval_is: AlertCondition
interval: Optional[str] = None
amount: Optional[int] = None

def parse_interval(self) -> timedelta:
"""Parses the interval string into a timedelta object."""
if self.interval is None:
raise ValueError("interval is null")

Expand All @@ -52,11 +67,44 @@ def parse_interval(self) -> timedelta:


class DAQJobHealthcheckConfig(DAQJobConfig):
"""
This class holds the configuration settings for the DAQJobHealthcheck, which is responsible for monitoring the health of the DAQ (Data Acquisition) jobs.
Attributes:
healthcheck_stats (list[HealthcheckStatsItem]):
A list of HealthcheckStatsItem objects that represent various health check statistics.
Each item in the list provides detailed information about a specific aspect of the DAQ job's health,
such as the interval for checking the job's stats, the key for the stats, and the condition for alerting.
enable_alerts_on_restart (bool):
A boolean flag indicating whether alerts should be enabled when the DAQ job is restarted.
If set to True, alerts will be generated and sent to the appropriate channels whenever the job is restarted.
This can be useful for monitoring and ensuring that the restart process does not introduce any issues.
The default value is True.
"""

healthcheck_stats: list[HealthcheckStatsItem]
enable_alerts_on_restart: bool = True


class DAQJobHealthcheck(DAQJob):
"""Healthcheck job class for monitoring DAQ jobs.
This class is responsible for performing health checks on various DAQ jobs
based on the provided configuration. It monitors the stats of DAQ jobs and
sends alerts if certain conditions are met, such as if a job has not been
updated within a specified interval or if a job has restarted.
Attributes:
allowed_message_in_types (list): List of allowed message types for this job.
config_type (type): The configuration class type for this job.
config (DAQJobHealthcheckConfig): The configuration instance for this job.
_sent_alert_items (set): Set of alert item hashes that have been sent.
_current_stats (DAQJobStatsDict): Dictionary holding the current stats of DAQ jobs.
_daq_job_type_to_class (dict): Mapping of DAQ job type names to their class types.
_healthcheck_stats (list): List of healthcheck stats items to monitor.
_get_daq_job_class (Callable): Function to get the DAQ job class by its type name.
"""

allowed_message_in_types = [DAQJobMessageStats]
config_type = DAQJobHealthcheckConfig
config: DAQJobHealthcheckConfig
Expand Down Expand Up @@ -122,13 +170,15 @@ def start(self):
time.sleep(HEALTHCHECK_LOOP_INTERVAL_SECONDS)

def handle_message(self, message: DAQJobMessageStats) -> bool:
"""Handles incoming messages and updates current stats."""
if not super().handle_message(message):
return False

self._current_stats = message.stats
return True

def handle_checks(self):
"""Performs health checks and sends alerts if necessary."""
res: list[tuple[HealthcheckItem, bool]] = []

for item in self._healthcheck_stats:
Expand Down Expand Up @@ -161,11 +211,11 @@ def handle_checks(self):
item_id = hash(msgspec.json.encode(item))
if should_alert and item_id not in self._sent_alert_items:
self._sent_alert_items.add(item_id)
self.send_alert(item)
self._send_alert(item)
elif not should_alert and item_id in self._sent_alert_items:
self._sent_alert_items.remove(item_id)

def send_alert(self, item: HealthcheckItem):
def _send_alert(self, item: HealthcheckItem):
self._put_message_out(
DAQJobMessageAlert(
date=datetime.now(),
Expand Down
68 changes: 65 additions & 3 deletions src/enrgdaq/daq/jobs/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,40 @@


class DAQJobRemoteConfig(DAQJobConfig):
"""
Configuration for DAQJobRemote.
Attributes:
zmq_local_url (str): Local ZMQ URL.
zmq_remote_urls (list[str]): List of remote ZMQ URLs.
topics (list[str]): List of topics to subscribe to.
"""

zmq_local_url: str
zmq_remote_urls: list[str]
topics: list[str] = []


class DAQJobRemote(DAQJob):
"""
DAQJobRemote is a DAQJob that connects two seperate ENRGDAQ instances.
It sends to and receives from a remote ENRGDAQ, in such that:
DAQJobRemote is a DAQJob that connects two separate ENRGDAQ instances.
It sends to and receives from a remote ENRGDAQ, such that:
- message_in -> remote message_out
- remote message_in -> message_out
TODO: Use zmq CURVE security
Attributes:
allowed_message_in_types (list): List of allowed message types.
config_type (type): Configuration type for the job.
config (DAQJobRemoteConfig): Configuration instance.
restart_offset (timedelta): Restart offset time.
_zmq_pub_ctx (zmq.Context): ZMQ context for publishing.
_zmq_sub_ctx (zmq.Context): ZMQ context for subscribing.
_zmq_pub (zmq.Socket): ZMQ socket for publishing.
_zmq_sub (Optional[zmq.Socket]): ZMQ socket for subscribing.
_message_class_cache (dict): Cache for message classes.
_remote_message_ids (set): Set of remote message IDs.
_receive_thread (threading.Thread): Thread for receiving messages.
"""

allowed_message_in_types = [DAQJobMessage] # accept all message types
Expand Down Expand Up @@ -97,6 +117,15 @@ def handle_message(self, message: DAQJobMessage) -> bool:
return True

def _create_zmq_sub(self, remote_urls: list[str]) -> zmq.Socket:
"""
Create a ZMQ subscriber socket.
Args:
remote_urls (list[str]): List of remote URLs to connect to.
Returns:
zmq.Socket: The created ZMQ subscriber socket.
"""
self._zmq_sub_ctx = zmq.Context()
zmq_sub = self._zmq_sub_ctx.socket(zmq.SUB)
for remote_url in remote_urls:
Expand All @@ -114,6 +143,12 @@ def _create_zmq_sub(self, remote_urls: list[str]) -> zmq.Socket:
return zmq_sub

def _start_receive_thread(self, remote_urls: list[str]):
"""
Start the receive thread.
Args:
remote_urls (list[str]): List of remote URLs to connect to.
"""
self._zmq_sub = self._create_zmq_sub(remote_urls)

while True:
Expand All @@ -130,6 +165,9 @@ def _start_receive_thread(self, remote_urls: list[str]):
self.message_out.put(recv_message)

def start(self):
"""
Start the receive thread and the DAQ job.
"""
self._receive_thread.start()

while True:
Expand All @@ -140,13 +178,34 @@ def start(self):
time.sleep(0.1)

def _pack_message(self, message: DAQJobMessage, use_pickle: bool = True) -> bytes:
"""
Pack a message for sending.
Args:
message (DAQJobMessage): The message to pack.
use_pickle (bool): Whether to use pickle for packing, if not, use msgspec.
Returns:
bytes: The packed message.
"""
message_type = type(message).__name__
if use_pickle:
return pickle.dumps(message, protocol=pickle.HIGHEST_PROTOCOL)

return msgspec.msgpack.encode([message_type, message])

def _unpack_message(self, message: bytes) -> DAQJobMessage:
"""
Unpack a received message.
It tries to unpack the message using pickle, and if that fails, it uses msgspec.
Args:
message (bytes): The received message.
Returns:
DAQJobMessage: The unpacked message.
"""
# TODO: fix unpack without pickle
try:
res = pickle.loads(message)
Expand All @@ -169,6 +228,9 @@ def _unpack_message(self, message: bytes) -> DAQJobMessage:
return res

def __del__(self):
"""
Destructor for DAQJobRemote.
"""
if getattr(self, "_zmq_sub_ctx", None) is not None:
self._zmq_sub_ctx.destroy()
if self._zmq_pub_ctx is not None:
Expand Down
30 changes: 30 additions & 0 deletions src/enrgdaq/daq/jobs/serve_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,33 @@


class DAQJobServeHTTPConfig(DAQJobConfig):
"""
Configuration class for DAQJobServeHTTP.
Attributes:
serve_path (str): The path to serve files from.
host (str): The host address to bind the server to.
port (int): The port number to bind the server to.
"""

serve_path: str
host: str
port: int


class DAQJobServeHTTP(DAQJob):
"""
DAQ job to serve HTTP requests.
Can be used to serve files from a specified path, primarily for CSV files.
Handles placeholders in the path, such as "{TODAY}" and "{YESTERDAY}".
Attributes:
config_type (type): The configuration class type.
config (DAQJobServeHTTPConfig): The configuration instance.
"""

config_type = DAQJobServeHTTPConfig
config: DAQJobServeHTTPConfig

Expand All @@ -34,6 +55,9 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, directory=serve_path, **kwargs)

def do_GET(self) -> None:
"""
Handle GET requests and replace placeholders in the path.
"""
REPLACE_DICT = {
"TODAY": datetime.now().strftime("%Y-%m-%d"),
"YESTERDAY": (datetime.now() - timedelta(days=1)).strftime(
Expand All @@ -46,9 +70,15 @@ def do_GET(self) -> None:
return super().do_GET()

def log_message(self, format: str, *args) -> None:
"""
Override to suppress logging.
"""
pass

def start_server():
"""
Start the HTTP server and serve requests indefinitely.
"""
with ThreadingHTTPServer(
(self.config.host, self.config.port), Handler
) as httpd:
Expand Down
Loading

0 comments on commit 436d791

Please sign in to comment.