Skip to content

Commit

Permalink
feat: change the way we handle store configs
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Nov 7, 2024
1 parent f37c417 commit fb03383
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 38 deletions.
3 changes: 1 addition & 2 deletions configs/examples/handle_alerts.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
daq_job_type = "handle_alerts"

[store_config]
daq_job_store_type = "csv"
[store_config.csv]
file_path = "alerts.csv"
add_date = true
3 changes: 1 addition & 2 deletions configs/examples/handle_stats.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
daq_job_type = "handle_stats"

[store_config]
daq_job_store_type = "csv"
[store_config.csv]
file_path = "stats.csv"
add_date = false
overwrite = true
3 changes: 1 addition & 2 deletions configs/examples/n1081b.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ port = "8080"
password = "password"
sections_to_store = ["SEC_B", "SEC_D"]

[store_config]
daq_job_store_type = "csv"
[store_config.csv]
file_path = "n1081b.csv"
add_date = true
3 changes: 1 addition & 2 deletions configs/examples/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ daq_job_type = "test"
rand_min = 1
rand_max = 100

[store_config]
daq_job_store_type = "csv"
[store_config.csv]
file_path = "test.csv"
add_date = true
10 changes: 2 additions & 8 deletions src/daq/jobs/store/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,13 @@

from daq.models import DAQJobConfig
from daq.store.base import DAQJobStore
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfigCSV
from utils.file import modify_file_path

DAQ_JOB_STORE_CSV_FLUSH_INTERVAL_SECONDS = 15
DAQ_JOB_STORE_CSV_WRITE_BATCH_SIZE = 1000


class DAQJobStoreConfigCSV(DAQJobStoreConfig):
file_path: str
add_date: bool
overwrite: Optional[bool] = None


class DAQJobStoreCSVConfig(DAQJobConfig):
out_dir: str = "out/"

Expand All @@ -47,7 +41,7 @@ def __init__(self, config: Any):

def handle_message(self, message: DAQJobMessageStore) -> bool:
super().handle_message(message)
store_config = cast(DAQJobStoreConfigCSV, message.store_config)
store_config = cast(DAQJobStoreConfigCSV, message.store_config.csv)
file_path = modify_file_path(
store_config.file_path, store_config.add_date, message.prefix
)
Expand Down
7 changes: 1 addition & 6 deletions src/daq/jobs/store/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@

from daq.models import DAQJobConfig
from daq.store.base import DAQJobStore
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfigROOT
from utils.file import modify_file_path


class DAQJobStoreConfigROOT(DAQJobStoreConfig):
file_path: str
add_date: bool


class DAQJobStoreROOTConfig(DAQJobConfig):
pass

Expand Down
6 changes: 3 additions & 3 deletions src/daq/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

from daq.base import DAQJob
from daq.models import DAQJobMessage
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig
from daq.store.models import DAQJobMessageStore


class DAQJobStore(DAQJob):
allowed_store_config_types: list[type[DAQJobStoreConfig]]
allowed_store_config_types: list

def start(self):
while True:
Expand All @@ -29,6 +29,6 @@ def can_store(self, message: DAQJobMessage) -> bool:
return False
is_message_allowed = False
for allowed_config_type in self.allowed_store_config_types:
if isinstance(message.store_config, allowed_config_type):
if message.store_config.has_store_config(allowed_config_type):
is_message_allowed = True
return is_message_allowed
31 changes: 26 additions & 5 deletions src/daq/store/models.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,47 @@
from typing import Any
from typing import Any, Optional

from msgspec import Struct

from daq.base import DAQJobInfo
from daq.models import DAQJobConfig, DAQJobMessage


class DAQJobStoreConfig(Struct):
class DAQJobStoreConfig(Struct, dict=True):
"""
Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig.
"""

daq_job_store_type: str
csv: "Optional[DAQJobStoreConfigCSV]" = None
root: "Optional[DAQJobStoreConfigROOT]" = None

def has_store_config(self, store_type: Any) -> bool:
for key in dir(self):
if key.startswith("_"):
continue
value = getattr(self, key)
if isinstance(value, store_type):
return True
return False


class DAQJobMessageStore(DAQJobMessage):
store_config: dict | DAQJobStoreConfig
store_config: DAQJobStoreConfig
daq_job_info: DAQJobInfo
keys: list[str]
data: list[list[Any]]
prefix: str | None = None


class StorableDAQJobConfig(DAQJobConfig):
store_config: dict
store_config: DAQJobStoreConfig


class DAQJobStoreConfigCSV(Struct):
file_path: str
add_date: bool
overwrite: Optional[bool] = None


class DAQJobStoreConfigROOT(Struct):
file_path: str
add_date: bool
10 changes: 5 additions & 5 deletions src/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
DAQJobStoreConfigCSV,
DAQJobStoreCSV,
)
from daq.store.models import DAQJobMessageStore
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig


class TestDAQJobStoreCSV(unittest.TestCase):
Expand All @@ -25,8 +25,8 @@ def test_handle_message_new_file(
self, mock_touch, mock_exists, mock_open, mock_add_date
):
message = MagicMock(spec=DAQJobMessageStore)
message.store_config = DAQJobStoreConfigCSV(
daq_job_store_type="csv", file_path="test.csv", add_date=True
message.store_config = DAQJobStoreConfig(
csv=DAQJobStoreConfigCSV(file_path="test.csv", add_date=True)
)
message.keys = ["header1", "header2"]
message.data = [["row1_col1", "row1_col2"], ["row2_col1", "row2_col2"]]
Expand All @@ -45,8 +45,8 @@ def test_handle_message_new_file(
@patch("os.path.exists", return_value=True)
def test_handle_message_existing_file(self, mock_exists, mock_open, mock_add_date):
message = MagicMock(spec=DAQJobMessageStore)
message.store_config = DAQJobStoreConfigCSV(
daq_job_store_type="csv", file_path="test.csv", add_date=True
message.store_config = DAQJobStoreConfig(
csv=DAQJobStoreConfigCSV(file_path="test.csv", add_date=True)
)
message.keys = ["header1", "header2"]
message.data = [["row1_col1", "row1_col2"], ["row2_col1", "row2_col2"]]
Expand Down
6 changes: 3 additions & 3 deletions src/tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from daq.jobs.store.csv import DAQJobStoreConfigCSV
from daq.jobs.test_job import DAQJobTest
from daq.models import DAQJobMessage
from daq.store.models import DAQJobMessageStore
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig


class TestDAQJobRemote(unittest.TestCase):
Expand Down Expand Up @@ -49,8 +49,8 @@ def stop_receive_thread():
def test_receive_thread(self):
message = DAQJobMessageStore(
id="testmsg",
store_config=DAQJobStoreConfigCSV(
daq_job_store_type="csv", file_path="test", add_date=True
store_config=DAQJobStoreConfig(
csv=DAQJobStoreConfigCSV(file_path="test", add_date=True)
),
data=[],
keys=[],
Expand Down

0 comments on commit fb03383

Please sign in to comment.