Skip to content

Commit

Permalink
feat: add timestamp database support for DAQJobStoreRedis
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Nov 15, 2024
1 parent cf6be84 commit 7fe318b
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 44 deletions.
98 changes: 67 additions & 31 deletions src/daq/jobs/store/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import redis
import redis.exceptions
from redis.commands.timeseries import TimeSeries

from daq.models import DAQJobConfig
from daq.store.base import DAQJobStore
Expand All @@ -20,9 +21,8 @@ class DAQJobStoreRedisConfig(DAQJobConfig):

@dataclass
class RedisWriteQueueItem:
redis_key: str
store_config: DAQJobStoreConfigRedis
data: dict[str, list[Any]]
expiration: Optional[timedelta]
prefix: Optional[str]


Expand All @@ -34,7 +34,7 @@ class DAQJobStoreRedis(DAQJobStore):
_write_queue: deque[RedisWriteQueueItem]
_last_flush_date: datetime
_connection: Optional[redis.Redis]
_check_keys_for_removal: deque[DAQJobStoreConfigRedis] = deque()
_ts: Optional[TimeSeries]

def __init__(self, config: DAQJobStoreRedisConfig, **kwargs):
super().__init__(config, **kwargs)
Expand All @@ -50,16 +50,19 @@ def start(self):
db=self.config.db,
password=self.config.password,
)
try:
self._ts = self._connection.ts()
except Exception as ex:
self._logger.error("Timeseries not supported by Redis server", exc_info=ex)
self._ts = None

super().start()

def handle_message(self, message: DAQJobMessageStore) -> bool:
if not super().handle_message(message):
return False

store_config = cast(DAQJobStoreConfigRedis, message.store_config.redis)
key_expiration = None
if store_config.key_expiration_days is not None:
key_expiration = timedelta(days=store_config.key_expiration_days)

data = {}
# Add data to data dict that we can add to Redis
Expand All @@ -69,40 +72,73 @@ def handle_message(self, message: DAQJobMessageStore) -> bool:
# Append rows to write_queue
for row in message.data:
self._write_queue.append(
RedisWriteQueueItem(
store_config.key,
data,
key_expiration,
message.prefix,
)
RedisWriteQueueItem(store_config, data, message.prefix)
)

# Append keys to check_keys
self._check_keys_for_removal.append(store_config)

return True

def store_loop(self):
assert self._connection is not None
while self._write_queue:
item = self._write_queue.popleft()

# Append item to key in redis
for key, values in item.data.items():
item_key = f"{item.redis_key}.{key}"
if item.prefix is not None:
item_key = f"{item.prefix}.{item_key}"

# Add date to key if expiration is set
if item.expiration is not None:
item_key += ":" + datetime.now().strftime("%Y-%m-%d")
msg = self._write_queue.popleft()
if msg.store_config.use_timeseries and self._ts is None:
self._logger.warning(
"Trying to store data in timeseries, but timeseries is not supported by Redis server"
)
return

item_exists = self._connection.exists(item_key)
self._connection.rpush(item_key, *values)
key_expiration = None
if msg.store_config.key_expiration_days is not None:
key_expiration = timedelta(days=msg.store_config.key_expiration_days)

# Set expiration if it was newly created
if not item_exists and item.expiration is not None:
self._connection.expire(item_key, item.expiration)
# Append item to key in redis
for i, item in enumerate(msg.data.items()):
key, values = item
item_key = f"{msg.store_config.key}.{key}"
if msg.prefix is not None:
item_key = f"{msg.prefix}.{item_key}"

if msg.store_config.use_timeseries:
# Use Redis TimeSeries if requested
assert self._ts is not None

# Create TimeSeries key if it doesn't exist
if not self._connection.exists(item_key) and key != "timestamp":
retention_msecs = None
if msg.store_config.key_expiration_days is not None:
retention_msecs = int(
timedelta(
days=msg.store_config.key_expiration_days
).total_seconds()
* 1000
)
self._ts.create(
item_key,
retention_msecs=retention_msecs,
)
if "timestamp" not in msg.data:
self._logger.warning(
"Message data does not contain a timestamp, skipping"
)
return

self._ts.madd(
[
(item_key, msg.data["timestamp"][i], value)
for i, value in enumerate(values)
]
)
else:
# Add date to key if expiration is set
if key_expiration is not None:
item_key += ":" + datetime.now().strftime("%Y-%m-%d")

item_exists = self._connection.exists(item_key)
self._connection.rpush(item_key, *values)

# Set expiration if it was newly created
if not item_exists and key_expiration is not None:
self._connection.expire(item_key, key_expiration)

def __del__(self):
try:
Expand Down
7 changes: 5 additions & 2 deletions src/daq/jobs/test_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from datetime import datetime
from random import randint

from N1081B import N1081B
Expand Down Expand Up @@ -31,7 +32,9 @@ def get_int():
self._put_message_out(
DAQJobMessageStore(
store_config=self.config.store_config,
keys=["A", "B", "C"],
data=[[get_int(), get_int(), get_int()]],
keys=["timestamp", "A", "B", "C"],
data=[
[datetime.now().timestamp() * 1000, get_int(), get_int(), get_int()]
],
)
)
7 changes: 7 additions & 0 deletions src/daq/store/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ class DAQJobStoreConfigRedis(DAQJobStoreConfigBase):
If None, keys will not be deleted.
"""

use_timeseries: Optional[bool] = None
"""
Utilize Redis Timeseries to store data.
A key called "timestamp" is requires when using timeseries.
"""


class DAQJobStoreConfigROOT(DAQJobStoreConfigBase):
file_path: str
Expand Down
54 changes: 43 additions & 11 deletions src/tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_handle_message(self):

self.assertTrue(result)
self.assertEqual(len(self.store._write_queue), 2)
self.assertEqual(self.store._write_queue[0].redis_key, "test_key")
self.assertEqual(self.store._write_queue[0].store_config.key, "test_key")
self.assertEqual(
self.store._write_queue[0].data["header1"], ["row1_col1", "row2_col1"]
)
Expand All @@ -60,26 +60,44 @@ def test_store_loop(self):
self.store._connection.exists = MagicMock(return_value=False)
self.store._connection.rpush = MagicMock()
self.store._connection.expire = MagicMock()
self.store._ts = MagicMock()

unix_ms = int(datetime.now().timestamp() * 1000)

self.store._write_queue = deque(
[
RedisWriteQueueItem(
"test_key",
{
store_config=DAQJobStoreConfigRedis(
key="test_key", key_expiration_days=1
),
data={
"header1": ["row1_col1", "row2_col1"],
"header2": ["row1_col2", "row2_col2"],
},
timedelta(days=1),
None,
prefix=None,
),
RedisWriteQueueItem(
"test_key_no_expiration",
{
store_config=DAQJobStoreConfigRedis(
key="test_key_no_expiration", key_expiration_days=None
),
data={
"header1": ["row1_col1", "row2_col1"],
"header2": ["row1_col2", "row2_col2"],
},
None,
"prefix",
prefix="prefix",
),
RedisWriteQueueItem(
store_config=DAQJobStoreConfigRedis(
key="test_key_timeseries",
key_expiration_days=None,
use_timeseries=True,
),
data={
"timestamp": [unix_ms, unix_ms + 1],
"header1": ["row1_col1", "row2_col1"],
"header2": ["row1_col2", "row2_col2"],
},
prefix="prefix",
),
]
)
Expand All @@ -102,6 +120,20 @@ def test_store_loop(self):
"prefix.test_key_no_expiration.header2", "row1_col2", "row2_col2"
)

self.store._ts.madd.assert_any_call(
[
("prefix.test_key_timeseries.header1", unix_ms, "row1_col1"),
("prefix.test_key_timeseries.header1", unix_ms + 1, "row2_col1"),
]
)

self.store._ts.madd.assert_any_call(
[
("prefix.test_key_timeseries.header2", unix_ms, "row1_col2"),
("prefix.test_key_timeseries.header2", unix_ms + 1, "row2_col2"),
]
)

self.store._connection.expire.assert_any_call(
"test_key.header1:" + date, timedelta(days=1)
)
Expand All @@ -122,14 +154,14 @@ def test_handle_message_no_expiration(self):

self.assertTrue(result)
self.assertEqual(len(self.store._write_queue), 2)
self.assertEqual(self.store._write_queue[0].redis_key, "test_key")
self.assertEqual(self.store._write_queue[0].store_config.key, "test_key")
self.assertEqual(
self.store._write_queue[0].data["header1"], ["row1_col1", "row2_col1"]
)
self.assertEqual(
self.store._write_queue[0].data["header2"], ["row1_col2", "row2_col2"]
)
self.assertIsNone(self.store._write_queue[0].expiration)
self.assertIsNone(self.store._write_queue[0].store_config.key_expiration_days)

def test_handle_message_empty_data(self):
message = MagicMock(spec=DAQJobMessageStore)
Expand Down

0 comments on commit 7fe318b

Please sign in to comment.