diff --git a/src/daq/jobs/store/redis.py b/src/daq/jobs/store/redis.py index a009ee1..66220b8 100644 --- a/src/daq/jobs/store/redis.py +++ b/src/daq/jobs/store/redis.py @@ -5,6 +5,7 @@ import redis import redis.exceptions +from redis.commands.timeseries import TimeSeries from daq.models import DAQJobConfig from daq.store.base import DAQJobStore @@ -20,9 +21,8 @@ class DAQJobStoreRedisConfig(DAQJobConfig): @dataclass class RedisWriteQueueItem: - redis_key: str + store_config: DAQJobStoreConfigRedis data: dict[str, list[Any]] - expiration: Optional[timedelta] prefix: Optional[str] @@ -34,7 +34,7 @@ class DAQJobStoreRedis(DAQJobStore): _write_queue: deque[RedisWriteQueueItem] _last_flush_date: datetime _connection: Optional[redis.Redis] - _check_keys_for_removal: deque[DAQJobStoreConfigRedis] = deque() + _ts: Optional[TimeSeries] def __init__(self, config: DAQJobStoreRedisConfig, **kwargs): super().__init__(config, **kwargs) @@ -50,6 +50,12 @@ def start(self): db=self.config.db, password=self.config.password, ) + try: + self._ts = self._connection.ts() + except Exception as ex: + self._logger.error("Timeseries not supported by Redis server", exc_info=ex) + self._ts = None + super().start() def handle_message(self, message: DAQJobMessageStore) -> bool: @@ -57,9 +63,6 @@ def handle_message(self, message: DAQJobMessageStore) -> bool: return False store_config = cast(DAQJobStoreConfigRedis, message.store_config.redis) - key_expiration = None - if store_config.key_expiration_days is not None: - key_expiration = timedelta(days=store_config.key_expiration_days) data = {} # Add data to data dict that we can add to Redis @@ -69,40 +72,73 @@ def handle_message(self, message: DAQJobMessageStore) -> bool: # Append rows to write_queue for row in message.data: self._write_queue.append( - RedisWriteQueueItem( - store_config.key, - data, - key_expiration, - message.prefix, - ) + RedisWriteQueueItem(store_config, data, message.prefix) ) - # Append keys to check_keys - self._check_keys_for_removal.append(store_config) - return True def store_loop(self): assert self._connection is not None while self._write_queue: - item = self._write_queue.popleft() - - # Append item to key in redis - for key, values in item.data.items(): - item_key = f"{item.redis_key}.{key}" - if item.prefix is not None: - item_key = f"{item.prefix}.{item_key}" - - # Add date to key if expiration is set - if item.expiration is not None: - item_key += ":" + datetime.now().strftime("%Y-%m-%d") + msg = self._write_queue.popleft() + if msg.store_config.use_timeseries and self._ts is None: + self._logger.warning( + "Trying to store data in timeseries, but timeseries is not supported by Redis server" + ) + return - item_exists = self._connection.exists(item_key) - self._connection.rpush(item_key, *values) + key_expiration = None + if msg.store_config.key_expiration_days is not None: + key_expiration = timedelta(days=msg.store_config.key_expiration_days) - # Set expiration if it was newly created - if not item_exists and item.expiration is not None: - self._connection.expire(item_key, item.expiration) + # Append item to key in redis + for i, item in enumerate(msg.data.items()): + key, values = item + item_key = f"{msg.store_config.key}.{key}" + if msg.prefix is not None: + item_key = f"{msg.prefix}.{item_key}" + + if msg.store_config.use_timeseries: + # Use Redis TimeSeries if requested + assert self._ts is not None + + # Create TimeSeries key if it doesn't exist + if not self._connection.exists(item_key) and key != "timestamp": + retention_msecs = None + if msg.store_config.key_expiration_days is not None: + retention_msecs = int( + timedelta( + days=msg.store_config.key_expiration_days + ).total_seconds() + * 1000 + ) + self._ts.create( + item_key, + retention_msecs=retention_msecs, + ) + if "timestamp" not in msg.data: + self._logger.warning( + "Message data does not contain a timestamp, skipping" + ) + return + + self._ts.madd( + [ + (item_key, msg.data["timestamp"][i], value) + for i, value in enumerate(values) + ] + ) + else: + # Add date to key if expiration is set + if key_expiration is not None: + item_key += ":" + datetime.now().strftime("%Y-%m-%d") + + item_exists = self._connection.exists(item_key) + self._connection.rpush(item_key, *values) + + # Set expiration if it was newly created + if not item_exists and key_expiration is not None: + self._connection.expire(item_key, key_expiration) def __del__(self): try: diff --git a/src/daq/jobs/test_job.py b/src/daq/jobs/test_job.py index 399d0a6..6d0ef71 100644 --- a/src/daq/jobs/test_job.py +++ b/src/daq/jobs/test_job.py @@ -1,4 +1,5 @@ import time +from datetime import datetime from random import randint from N1081B import N1081B @@ -31,7 +32,9 @@ def get_int(): self._put_message_out( DAQJobMessageStore( store_config=self.config.store_config, - keys=["A", "B", "C"], - data=[[get_int(), get_int(), get_int()]], + keys=["timestamp", "A", "B", "C"], + data=[ + [datetime.now().timestamp() * 1000, get_int(), get_int(), get_int()] + ], ) ) diff --git a/src/daq/store/models.py b/src/daq/store/models.py index 96c16dc..7abf425 100644 --- a/src/daq/store/models.py +++ b/src/daq/store/models.py @@ -86,6 +86,13 @@ class DAQJobStoreConfigRedis(DAQJobStoreConfigBase): If None, keys will not be deleted. """ + use_timeseries: Optional[bool] = None + """ + Utilize Redis Timeseries to store data. + + A key called "timestamp" is requires when using timeseries. + """ + class DAQJobStoreConfigROOT(DAQJobStoreConfigBase): file_path: str diff --git a/src/tests/test_redis.py b/src/tests/test_redis.py index 487e927..ff07569 100644 --- a/src/tests/test_redis.py +++ b/src/tests/test_redis.py @@ -47,7 +47,7 @@ def test_handle_message(self): self.assertTrue(result) self.assertEqual(len(self.store._write_queue), 2) - self.assertEqual(self.store._write_queue[0].redis_key, "test_key") + self.assertEqual(self.store._write_queue[0].store_config.key, "test_key") self.assertEqual( self.store._write_queue[0].data["header1"], ["row1_col1", "row2_col1"] ) @@ -60,26 +60,44 @@ def test_store_loop(self): self.store._connection.exists = MagicMock(return_value=False) self.store._connection.rpush = MagicMock() self.store._connection.expire = MagicMock() + self.store._ts = MagicMock() + + unix_ms = int(datetime.now().timestamp() * 1000) self.store._write_queue = deque( [ RedisWriteQueueItem( - "test_key", - { + store_config=DAQJobStoreConfigRedis( + key="test_key", key_expiration_days=1 + ), + data={ "header1": ["row1_col1", "row2_col1"], "header2": ["row1_col2", "row2_col2"], }, - timedelta(days=1), - None, + prefix=None, ), RedisWriteQueueItem( - "test_key_no_expiration", - { + store_config=DAQJobStoreConfigRedis( + key="test_key_no_expiration", key_expiration_days=None + ), + data={ "header1": ["row1_col1", "row2_col1"], "header2": ["row1_col2", "row2_col2"], }, - None, - "prefix", + prefix="prefix", + ), + RedisWriteQueueItem( + store_config=DAQJobStoreConfigRedis( + key="test_key_timeseries", + key_expiration_days=None, + use_timeseries=True, + ), + data={ + "timestamp": [unix_ms, unix_ms + 1], + "header1": ["row1_col1", "row2_col1"], + "header2": ["row1_col2", "row2_col2"], + }, + prefix="prefix", ), ] ) @@ -102,6 +120,20 @@ def test_store_loop(self): "prefix.test_key_no_expiration.header2", "row1_col2", "row2_col2" ) + self.store._ts.madd.assert_any_call( + [ + ("prefix.test_key_timeseries.header1", unix_ms, "row1_col1"), + ("prefix.test_key_timeseries.header1", unix_ms + 1, "row2_col1"), + ] + ) + + self.store._ts.madd.assert_any_call( + [ + ("prefix.test_key_timeseries.header2", unix_ms, "row1_col2"), + ("prefix.test_key_timeseries.header2", unix_ms + 1, "row2_col2"), + ] + ) + self.store._connection.expire.assert_any_call( "test_key.header1:" + date, timedelta(days=1) ) @@ -122,14 +154,14 @@ def test_handle_message_no_expiration(self): self.assertTrue(result) self.assertEqual(len(self.store._write_queue), 2) - self.assertEqual(self.store._write_queue[0].redis_key, "test_key") + self.assertEqual(self.store._write_queue[0].store_config.key, "test_key") self.assertEqual( self.store._write_queue[0].data["header1"], ["row1_col1", "row2_col1"] ) self.assertEqual( self.store._write_queue[0].data["header2"], ["row1_col2", "row2_col2"] ) - self.assertIsNone(self.store._write_queue[0].expiration) + self.assertIsNone(self.store._write_queue[0].store_config.key_expiration_days) def test_handle_message_empty_data(self): message = MagicMock(spec=DAQJobMessageStore)