Skip to content

Commit

Permalink
refactor: update DAQJob consume method to accept nowait parameter and…
Browse files Browse the repository at this point in the history
… adjust sleep intervals
  • Loading branch information
furkan-bilgin committed Nov 8, 2024
1 parent 79f4bf6 commit 782b12b
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 18 deletions.
4 changes: 1 addition & 3 deletions src/daq/alert/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from datetime import datetime
from enum import Enum
from typing import Any
Expand Down Expand Up @@ -36,10 +35,9 @@ def __init__(self, config: Any):

def start(self):
while True:
self.consume()
self.consume(nowait=False)
self.alert_loop()
self._alerts.clear()
time.sleep(0.5)

def handle_message(self, message: DAQJobMessageAlert) -> bool:
self._alerts.append(message)
Expand Down
9 changes: 6 additions & 3 deletions src/daq/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ def __init__(self, config: Any):
self._should_stop = False
self.unique_id = str(uuid.uuid4())

def consume(self):
def consume(self, nowait=True):
# consume messages from the queue
while True:
try:
message = self.message_in.get_nowait()
if nowait:
message = self.message_in.get_nowait()
else:
message = self.message_in.get()
if not self.handle_message(message):
self.message_in.put_nowait(message)
self.message_in.put(message)
except Empty:
break

Expand Down
5 changes: 1 addition & 4 deletions src/daq/jobs/handle_alerts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import time

from daq.alert.base import DAQJobMessageAlert
from daq.base import DAQJob
from daq.store.models import DAQJobMessageStore, StorableDAQJobConfig
Expand All @@ -17,8 +15,7 @@ class DAQJobHandleAlerts(DAQJob):

def start(self):
while True:
self.consume()
time.sleep(0.5)
self.consume(nowait=False)

def handle_message(self, message: DAQJobMessageAlert) -> bool:
super().handle_message(message)
Expand Down
4 changes: 1 addition & 3 deletions src/daq/jobs/handle_stats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from datetime import datetime
from typing import Dict, Optional

Expand All @@ -25,8 +24,7 @@ class DAQJobHandleStats(DAQJob):

def start(self):
while True:
self.consume()
time.sleep(1)
self.consume(nowait=False)

def handle_message(self, message: DAQJobMessageStats) -> bool:
if not super().handle_message(message):
Expand Down
4 changes: 3 additions & 1 deletion src/daq/jobs/healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from daq.jobs.handle_stats import DAQJobMessageStats, DAQJobStatsDict
from daq.models import DAQJobConfig, DAQJobStats

HEALTHCHECK_LOOP_INTERVAL_SECONDS = 0.1


class AlertCondition(str, Enum):
SATISFIED = "satisfied"
Expand Down Expand Up @@ -115,7 +117,7 @@ def start(self):
while True:
self.consume()
self.handle_checks()
time.sleep(0.5)
time.sleep(HEALTHCHECK_LOOP_INTERVAL_SECONDS)

def handle_message(self, message: DAQJobMessageStats) -> bool:
if not super().handle_message(message):
Expand Down
4 changes: 3 additions & 1 deletion src/daq/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from daq.models import DAQJobMessage
from daq.store.models import DAQJobMessageStore

STORE_LOOP_INTERVAL_SECONDS = 0.1


class DAQJobStore(DAQJob):
allowed_store_config_types: list
Expand All @@ -12,7 +14,7 @@ def start(self):
while True:
self.consume()
self.store_loop()
time.sleep(0.5)
time.sleep(STORE_LOOP_INTERVAL_SECONDS)

def store_loop(self):
raise NotImplementedError
Expand Down
4 changes: 1 addition & 3 deletions src/tests/test_handle_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ def test_handle_message_invalid(self):
self.assertFalse(result)
self.daq_job.message_out.put.assert_not_called()

@patch("time.sleep", return_value=None)
def test_start(self, mock_sleep):
def test_start(self):
self.daq_job.consume = MagicMock(side_effect=[None, Exception("Stop")])

with self.assertRaises(Exception) as context:
self.daq_job.start()

self.assertEqual(str(context.exception), "Stop")
self.daq_job.consume.assert_called()
mock_sleep.assert_called()


if __name__ == "__main__":
Expand Down

0 comments on commit 782b12b

Please sign in to comment.