Skip to content

Commit

Permalink
feat: tons of refactoring and csv store done
Browse files Browse the repository at this point in the history
- moved classes around
  • Loading branch information
furkan-bilgin committed Oct 10, 2024
1 parent 99f5222 commit c9f8a23
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 64 deletions.
47 changes: 47 additions & 0 deletions src/daq/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
import threading
from dataclasses import dataclass
from queue import Queue
from typing import Any

from daq.models import DAQJobMessage, DAQJobMessageStop, DAQJobStopError


class DAQJob:
config_type: Any
config: Any
message_in: Queue["DAQJobMessage"]
message_out: Queue["DAQJobMessage"]

_logger: logging.Logger

def __init__(self, config: Any):
self.config = config
self.message_in = Queue()
self.message_out = Queue()
self._logger = logging.getLogger(type(self).__name__)
self._should_stop = False

def consume(self):
# consume messages from the queue
while not self.message_in.empty():
message = self.message_in.get()
if not self.handle_message(message):
self.message_in.put_nowait(message)

def handle_message(self, message: "DAQJobMessage") -> bool:
if isinstance(message, DAQJobMessageStop):
raise DAQJobStopError(message.reason)
return True

def start(self):
raise NotImplementedError

def __del__(self):
self._logger.info("DAQ job is being deleted")


@dataclass
class DAQJobThread:
daq_job: DAQJob
thread: threading.Thread
6 changes: 4 additions & 2 deletions src/daq/caen/n1081b.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from N1081B import N1081B
from websocket import WebSocket

from daq.models import DAQJob, DAQJobConfig, DAQJobMessage
from daq.base import DAQJob
from daq.models import DAQJobMessage
from daq.store.models import StorableDAQJobConfig

N1081B_QUERY_INTERVAL_SECONDS = 1


@dataclass
class DAQN1081BConfig(DAQJobConfig):
class DAQN1081BConfig(StorableDAQJobConfig):
host: str
port: str
password: str
Expand Down
5 changes: 3 additions & 2 deletions src/daq/daq_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

import tomllib

from daq.base import DAQJob, DAQJobThread
from daq.caen.n1081b import DAQJobN1081B
from daq.models import DAQJob, DAQJobConfig, DAQJobThread
from daq.models import DAQJobConfig

DAQ_JOB_TYPE_TO_CLASS: dict[str, type[DAQJob]] = {
DAQ_JOB_TYPE_TO_CLASS: dict[str, type["DAQJob"]] = {
"n1081b": DAQJobN1081B,
}

Expand Down
44 changes: 0 additions & 44 deletions src/daq/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import logging
import threading
from dataclasses import dataclass
from queue import Queue
from typing import Any

from dataclasses_json import DataClassJsonMixin

Expand All @@ -12,46 +8,6 @@ class DAQJobConfig(DataClassJsonMixin):
daq_job_type: str


class DAQJob:
config_type: Any
config: Any
message_in: Queue["DAQJobMessage"]
message_out: Queue["DAQJobMessage"]

_logger: logging.Logger

def __init__(self, config: Any):
self.config = config
self.message_in = Queue()
self.message_out = Queue()
self._logger = logging.getLogger(type(self).__name__)
self._should_stop = False

def consume(self):
# consume messages from the queue
while not self.message_in.empty():
message = self.message_in.get()
if not self.handle_message(message):
self.message_in.put_nowait(message)

def handle_message(self, message: "DAQJobMessage") -> bool:
if isinstance(message, DAQJobMessageStop):
raise DAQJobStopError(message.reason)
return True

def start(self):
raise NotImplementedError

def __del__(self):
self._logger.info("DAQ job is being deleted")


@dataclass
class DAQJobThread:
daq_job: DAQJob
thread: threading.Thread


@dataclass
class DAQJobMessage:
pass
Expand Down
23 changes: 23 additions & 0 deletions src/daq/store/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import time

from daq.base import DAQJob
from daq.models import DAQJobMessage
from daq.store.models import DAQJobMessageStore


class DAQJobStore(DAQJob):
allowed_message_types: list[type["DAQJobMessageStore"]]

def start(self):
while True:
self.consume()
time.sleep(0.5)

def handle_message(self, message: DAQJobMessage) -> bool:
is_message_allowed = False
for allowed_message_type in self.allowed_message_types:
if isinstance(message, allowed_message_type):
is_message_allowed = True
if not is_message_allowed:
raise Exception(f"Invalid message type: {type(message)}")
return super().handle_message(message)
79 changes: 79 additions & 0 deletions src/daq/store/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import csv
import os
from dataclasses import dataclass
from datetime import datetime
from io import TextIOWrapper
from pathlib import Path
from typing import Any, cast

from daq.store.base import DAQJobStore
from daq.store.models import DAQJobMessageStore, DAQJobStoreConfig


@dataclass
class DAQJobMessageStoreCSV(DAQJobMessageStore):
header: list[str]
data: list[list[str]]


@dataclass
class DAQJobStoreConfigCSV(DAQJobStoreConfig):
file_path: str
add_date: bool


class DAQJobStoreCSV(DAQJobStore):
allowed_message_types = [DAQJobMessageStoreCSV]
_open_files: dict[str, TextIOWrapper]

def __init__(self, config: Any):
super().__init__(config)
self._open_files = {}

def handle_message(self, message: DAQJobMessageStoreCSV) -> bool:
super().handle_message(message)
store_config = cast(DAQJobStoreConfigCSV, message.store_config)
file_path = store_config.file_path

# Append date to file name if specified
if store_config.add_date:
splitted_file_path = os.path.splitext(file_path)
date_text = datetime.now().strftime("%Y-%m-%d")
if len(splitted_file_path) > 1:
file_path = (
f"{splitted_file_path[0]}_{date_text}{splitted_file_path[1]}"
)
else:
file_path = f"{splitted_file_path[0]}_{date_text}"

self._logger.debug(
f"Handling message for DAQ Job: {type(message.daq_job).__name__}"
)

if file_path not in self._open_files:
# Create the file if it doesn't exist
Path(file_path).touch(exist_ok=True)

# Open file and write csv headers
file = open(file_path, "a")
self._open_files[file_path] = file
writer = csv.writer(file)
writer.writerow(message.header)
else:
file = self._open_files[file_path]
writer = csv.writer(file)

# Write rows and flush
writer.writerows(message.data)
file.flush()

return True

def __del__(self):
# Close all open files
for file in self._open_files.values():
if file.closed:
continue
file.close()

return super().__del__()
30 changes: 15 additions & 15 deletions src/daq/store/models.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import time

Check failure on line 1 in src/daq/store/models.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

src/daq/store/models.py:1:8: F401 `time` imported but unused
from dataclasses import dataclass

from daq.models import DAQJob, DAQJobMessage
from dataclasses_json import DataClassJsonMixin

from daq.base import DAQJob
from daq.models import DAQJobConfig, DAQJobMessage

class DAQJobStore(DAQJob):
allowed_message_types: list[type["DAQJobMessageStore"]]

def start(self):
while True:
self.consume()
time.sleep(0.5)
@dataclass
class DAQJobStoreConfig(DataClassJsonMixin):
"""
Used to store the configuration of the DAQ Job Store, usually inside DAQJobConfig.
"""

def handle_message(self, message: DAQJobMessage) -> bool:
is_message_allowed = False
for allowed_message_type in self.allowed_message_types:
if isinstance(message, allowed_message_type):
is_message_allowed = True
if not is_message_allowed:
raise Exception(f"Invalid message type: {type(message)}")
return super().handle_message(message)
daq_job_store_type: str


@dataclass
class DAQJobMessageStore(DAQJobMessage):
store_config: DAQJobStoreConfig
daq_job: DAQJob


@dataclass
class StorableDAQJobConfig(DAQJobConfig):
store_config: type[DAQJobStoreConfig]
29 changes: 29 additions & 0 deletions src/test_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
import time

import coloredlogs

from daq.daq_job import DAQJob, start_daq_job
from daq.store.csv import DAQJobMessageStoreCSV, DAQJobStoreConfigCSV, DAQJobStoreCSV

coloredlogs.install(
level=logging.DEBUG,
datefmt="%Y-%m-%d %H:%M:%S",
)

daq_job_thread = start_daq_job(DAQJobStoreCSV({}))
_test_daq_job = DAQJob({})
while True:
daq_job_thread.daq_job.message_in.put_nowait(
DAQJobMessageStoreCSV(
daq_job=_test_daq_job,
header=["a", "b", "c"],
data=[["1", "2", "3"], ["4", "5", "6"]],
store_config=DAQJobStoreConfigCSV(
daq_job_store_type="",
file_path="test.csv",
add_date=True,
),
)
)
time.sleep(1)
2 changes: 1 addition & 1 deletion src/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import coloredlogs

from daq.daq_job import load_daq_jobs, start_daq_job, start_daq_jobs
from daq.store.models import DAQJobStore
from daq.store.base import DAQJobStore

coloredlogs.install(
level=logging.DEBUG,
Expand Down

0 comments on commit c9f8a23

Please sign in to comment.