diff --git a/src/results_cache.py b/src/results_cache.py index 7f6ddf0..a0c9188 100644 --- a/src/results_cache.py +++ b/src/results_cache.py @@ -35,6 +35,24 @@ def set_result(self, input_id, predicate, qualifiers, source_input, caller, work key = self.get_query_key(input_id, predicate, qualifiers, source_input, caller, workflow) self.redis.set(key, gzip.compress(json.dumps(final_answer).encode())) + + def get_lookup_query_key(self, workflow, query_graph): + keydict = {'workflow': workflow, 'query_graph': query_graph} + return json.dumps(keydict, sort_keys=True) + + def get_lookup_result(self, workflow, query_graph): + key = self.get_query_key(workflow, query_graph) + result = self.redis.get(key) + if result is not None: + result = json.loads(gzip.decompress(result)) + return result + + + def set_lookup_result(self, workflow, query_graph, final_answer): + key = self.get_query_key(workflow, query_graph) + + self.redis.set(key, gzip.compress(json.dumps(final_answer).encode())) + def clear_cache(self): self.redis.flushdb() diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 6439d92..f38f697 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -168,6 +168,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): results_cache = ResultsCache() override_cache = (message.get("parameters") or {}).get("override_cache") override_cache = override_cache if type(override_cache) is bool else False + results = None if infer: # We're going to cache infer queries, and we need to do that even if we're overriding the cache # because we need these values to post to the cache at the end. @@ -182,6 +183,16 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): return results, 200 else: logger.info(f"{guid}: Results cache miss") + else: + if not override_cache: + try: + query_graph = message["query_graph"] + except: + return f"No query graph", 422 + results = results_cache.get_lookup_result(workflow_def, query_graph) + if results is not None: + logger.info(f"{guid}: Returning results cache lookup") + return results, 200 workflow = [] @@ -199,6 +210,9 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): if infer: results_cache.set_result(input_id, predicate, qualifiers, source_input, caller, workflow_def, final_answer) + else: + query_graph = message["query_graph"] + results_cache.set_lookup_result(workflow_def, query_graph, final_answer) # return the answer return final_answer, status_code