diff --git a/src/daq/jobs/store/redis.py b/src/daq/jobs/store/redis.py index 4a9c84b..a009ee1 100644 --- a/src/daq/jobs/store/redis.py +++ b/src/daq/jobs/store/redis.py @@ -23,6 +23,7 @@ class RedisWriteQueueItem: redis_key: str data: dict[str, list[Any]] expiration: Optional[timedelta] + prefix: Optional[str] class DAQJobStoreRedis(DAQJobStore): @@ -72,6 +73,7 @@ def handle_message(self, message: DAQJobMessageStore) -> bool: store_config.key, data, key_expiration, + message.prefix, ) ) @@ -88,6 +90,8 @@ def store_loop(self): # Append item to key in redis for key, values in item.data.items(): item_key = f"{item.redis_key}.{key}" + if item.prefix is not None: + item_key = f"{item.prefix}.{item_key}" # Add date to key if expiration is set if item.expiration is not None: diff --git a/src/tests/test_redis.py b/src/tests/test_redis.py index 3d38e00..487e927 100644 --- a/src/tests/test_redis.py +++ b/src/tests/test_redis.py @@ -70,6 +70,7 @@ def test_store_loop(self): "header2": ["row1_col2", "row2_col2"], }, timedelta(days=1), + None, ), RedisWriteQueueItem( "test_key_no_expiration", @@ -78,6 +79,7 @@ def test_store_loop(self): "header2": ["row1_col2", "row2_col2"], }, None, + "prefix", ), ] ) @@ -94,10 +96,10 @@ def test_store_loop(self): ) self.store._connection.rpush.assert_any_call( - "test_key_no_expiration.header1", "row1_col1", "row2_col1" + "prefix.test_key_no_expiration.header1", "row1_col1", "row2_col1" ) self.store._connection.rpush.assert_any_call( - "test_key_no_expiration.header2", "row1_col2", "row2_col2" + "prefix.test_key_no_expiration.header2", "row1_col2", "row2_col2" ) self.store._connection.expire.assert_any_call(