Skip to content

Commit

Permalink
refactor: rename prefix to tag in DAQJobMessageStore
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Nov 16, 2024
1 parent 3e25825 commit 225bf96
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/daq/jobs/caen/n1081b.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _send_store_message(self, data: dict, section):
self._put_message_out(
DAQJobMessageStore(
store_config=self.config.store_config,
prefix=section,
tag=section,
keys=keys,
data=[values],
)
Expand Down
6 changes: 3 additions & 3 deletions src/daq/jobs/handle_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ def unpack_record(record: DAQJobStatsRecord):
)

if message.daq_job_info and message.daq_job_info.supervisor_config:
prefix = message.daq_job_info.supervisor_config.supervisor_id
tag = message.daq_job_info.supervisor_config.supervisor_id
else:
prefix = None
tag = None

self._put_message_out(
DAQJobMessageStore(
store_config=self.config.store_config,
keys=keys,
data=data_to_send,
prefix=prefix,
tag=tag,
)
)

Expand Down
2 changes: 1 addition & 1 deletion src/daq/jobs/store/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:

store_config = cast(DAQJobStoreConfigCSV, message.store_config.csv)
file_path = modify_file_path(
store_config.file_path, store_config.add_date, message.prefix
store_config.file_path, store_config.add_date, message.tag
)
file_path = os.path.join(self.config.out_dir, file_path)

Expand Down
10 changes: 5 additions & 5 deletions src/daq/jobs/store/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class DAQJobStoreRedisConfig(DAQJobConfig):
class RedisWriteQueueItem:
store_config: DAQJobStoreConfigRedis
data: dict[str, list[Any]]
prefix: Optional[str]
tag: Optional[str]


class DAQJobStoreRedis(DAQJobStore):
Expand Down Expand Up @@ -72,7 +72,7 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:
# Append rows to write_queue
for row in message.data:
self._write_queue.append(
RedisWriteQueueItem(store_config, data, message.prefix)
RedisWriteQueueItem(store_config, data, message.tag)
)

return True
Expand All @@ -95,8 +95,8 @@ def store_loop(self):
for i, item in enumerate(msg.data.items()):
key, values = item
item_key = f"{msg.store_config.key}.{key}"
if msg.prefix is not None:
item_key = f"{msg.prefix}.{item_key}"
if msg.tag is not None:
item_key = f"{msg.tag}.{item_key}"

if msg.store_config.use_timeseries:
# Use Redis TimeSeries if requested
Expand All @@ -116,7 +116,7 @@ def store_loop(self):
item_key,
retention_msecs=retention_msecs,
labels={"key": msg.store_config.key}
| ({"prefix": msg.prefix} if msg.prefix else {}),
| ({"tag": msg.tag} if msg.tag else {}),
)
if "timestamp" not in msg.data:
self._logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion src/daq/jobs/store/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:
super().handle_message(message)
store_config = cast(DAQJobStoreConfigROOT, message.store_config)
file_path = modify_file_path(
store_config.file_path, store_config.add_date, message.prefix
store_config.file_path, store_config.add_date, message.tag
)

if file_path not in self._open_files:
Expand Down
2 changes: 1 addition & 1 deletion src/daq/store/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DAQJobMessageStore(DAQJobMessage):
store_config: DAQJobStoreConfig
keys: list[str]
data: list[list[Any]]
prefix: str | None = None
tag: str | None = None

def get_remote_config(self) -> Optional[DAQRemoteConfig]:
for key in dir(self.store_config):
Expand Down
4 changes: 2 additions & 2 deletions src/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_handle_message_new_file(
)
message.keys = ["header1", "header2"]
message.data = [["row1_col1", "row1_col2"], ["row2_col1", "row2_col2"]]
message.prefix = None
message.tag = None

self.store.handle_message(message)

Expand All @@ -50,7 +50,7 @@ def test_handle_message_existing_file(self, mock_exists, mock_open, mock_add_dat
)
message.keys = ["header1", "header2"]
message.data = [["row1_col1", "row1_col2"], ["row2_col1", "row2_col2"]]
message.prefix = None
message.tag = None

self.store.handle_message(message)

Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_n1081b.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_send_store_message(self, mock_get_function_results):
self.daq_job.message_out.put.assert_called_once()
message = self.daq_job.message_out.put.call_args[0][0]
self.assertIsInstance(message, DAQJobMessageStore)
self.assertEqual(message.prefix, "SEC_A")
self.assertEqual(message.tag, "SEC_A")
self.assertIn("timestamp", message.keys)
self.assertIn("lemo_1", message.keys)
self.assertIn(100, message.data[0])
Expand Down
6 changes: 3 additions & 3 deletions src/tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_store_loop(self):
"header1": ["row1_col1", "row2_col1"],
"header2": ["row1_col2", "row2_col2"],
},
prefix=None,
tag=None,
),
RedisWriteQueueItem(
store_config=DAQJobStoreConfigRedis(
Expand All @@ -84,7 +84,7 @@ def test_store_loop(self):
"header1": ["row1_col1", "row2_col1"],
"header2": ["row1_col2", "row2_col2"],
},
prefix="prefix",
tag="prefix",
),
RedisWriteQueueItem(
store_config=DAQJobStoreConfigRedis(
Expand All @@ -97,7 +97,7 @@ def test_store_loop(self):
"header1": ["row1_col1", "row2_col1"],
"header2": ["row1_col2", "row2_col2"],
},
prefix="prefix",
tag="prefix",
),
]
)
Expand Down

0 comments on commit 225bf96

Please sign in to comment.