From f3f85077d63d79fef02e67fbfff500213fb6ec66 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 08:11:44 -0400 Subject: [PATCH 01/14] shadowfax --- src/pathfinder/get_cooccurrence.py | 50 ++++ src/shadowfax.py | 395 +++++++++++++++++++++++++++++ 2 files changed, 445 insertions(+) create mode 100644 src/pathfinder/get_cooccurrence.py create mode 100644 src/shadowfax.py diff --git a/src/pathfinder/get_cooccurrence.py b/src/pathfinder/get_cooccurrence.py new file mode 100644 index 0000000..8b4de53 --- /dev/null +++ b/src/pathfinder/get_cooccurrence.py @@ -0,0 +1,50 @@ +"""Get more than pairwise literature cooccurence for a given list of curies.""" +import json +import gzip +import redis +import time +from typing import List + + +def get_the_pmids(curies: List[str]): + r = redis.Redis( + host="localhost", + port=6379, + db=0 + ) + start = time.time() + curie_pmids = [] + for curie in curies: + pmids = r.get(curie) + if pmids is None: + pmids = [] + else: + pmids = json.loads(gzip.decompress(pmids)) + # print(len(pmids)) + curie_pmids.append(pmids) + answer = list(set.intersection(*map(set, curie_pmids))) + end = time.time() + # print(f"Got back {len(answer)} pmids in {end - start} seconds.") + return answer + +def get_the_curies(pmid: str): + r = redis.Redis( + host="localhost", + port=6379, + db=1 + ) + start = time.time() + curies = r.get(pmid) + if curies is None: + curies = [] + else: + curies = json.loads(gzip.decompress(curies)) + answer = list(curies) + end = time.time() + # print(f"Got back {len(answer)} curies in {end - start} seconds.") + return answer + + +if __name__ == "__main__": + print(len(get_the_pmids(["MONDO:0004979"]))) + print(len(get_the_pmids(["CHEBI:45783"]))) diff --git a/src/shadowfax.py b/src/shadowfax.py new file mode 100644 index 0000000..1f7ec06 --- /dev/null +++ b/src/shadowfax.py @@ -0,0 +1,395 @@ +import asyncio +from collections import defaultdict +import copy +import uuid +import hashlib +import os +import networkx +import httpx + +from reasoner_pydantic import Message + +from src.pathfinder.get_cooccurrence import get_the_curies, get_the_pmids + +# this should be an environment variable +node_norm_url = "https://nodenormalization-sri.renci.org" +strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/") +num_intermediate_hops = 3 +TOTAL_PUBS = 27840000 + +async def generate_from_strider(message): + try: + async with httpx.AsyncClient(timeout=3600) as client: + lookup_response = await client.post( + url=strider_url, + json=message, + ) + lookup_response.raise_for_status() + lookup_response = lookup_response.json() + except: + lookup_response = {} + return lookup_response.get("message", {}) + + +async def shadowfax(message, guid, logger): + qgraph = message["message"]["query_graph"] + pinned_node_ids = [] + for node in qgraph["nodes"].values(): + if node.get("ids", None) is not None: + pinned_node_ids.append(node["ids"][0]) + else: + unpinned_node_category = node.get("categories", ["biolink:NamedThing"])[0] + if len(pinned_node_ids) != 2: + logger.info(f"{guid}: Pathfinder queries requrie two pinned nodes.") + return message, 500 + + async with httpx.AsyncClient(timeout=900) as client: + normalized_pinned_ids = await client.post( + url=node_norm_url + "/get_normalized_nodes", + json={ + "curies": list(pinned_node_ids), + "conflate": True, + "description": False, + "drug_chemical_conflate": True + } + ) + normalized_pinned_ids.raise_for_status() + normalized_pinned_ids = normalized_pinned_ids.json() + source_node = normalized_pinned_ids.get(pinned_node_ids[0], {"id": {"identifier": pinned_node_ids[0]}})["id"]["identifier"] + source_category = normalized_pinned_ids.get(pinned_node_ids[0], {"type": ["biolink:NamedThing"]})["type"][0] + source_equivalent_ids = [i["identifier"] for i in normalized_pinned_ids.get(pinned_node_ids[0], {"equivalent_identifiers": []})["equivalent_identifiers"]] + target_node = normalized_pinned_ids.get(pinned_node_ids[1], {"id": {"identifier": pinned_node_ids[1]}})["id"]["identifier"] + target_equivalent_ids = [i["identifier"] for i in normalized_pinned_ids.get(pinned_node_ids[1], {"equivalent_identifiers": []})["equivalent_identifiers"]] + target_category = normalized_pinned_ids.get(pinned_node_ids[1], {"type": ["biolink:NamedThing"]})["type"][0] + + source_pubs = len(get_the_pmids([source_node])) + target_pubs = len(get_the_pmids([target_node])) + pairwise_pubs = get_the_pmids([source_node, target_node]) + if source_pubs == 0 or target_pubs == 0 or len(pairwise_pubs) == 0: + logger.info(f"{guid}: No publications found.") + return message, 200 + + curies = set() + for pub in pairwise_pubs: + curie_list = get_the_curies(pub) + for curie in curie_list: + if curie not in [source_node, target_node] and curie not in source_equivalent_ids and curie not in target_equivalent_ids: + curies.add(curie) + + if len(curies) == 0: + logger.info(f"{guid}: No curies found.") + return message, 200 + + async with httpx.AsyncClient(timeout=900) as client: + normmalizer_input = { + "curies": list(curies), + "conflate": True, + "description": False, + "drug_chemical_conflate": True + } + normalizer_response = await client.post( + url=node_norm_url + "/get_normalized_nodes", + json=normmalizer_input + ) + normalizer_response.raise_for_status() + normalizer_response = normalizer_response.json() + curie_info = defaultdict(dict) + for curie, normalizer_info in normalizer_response.items(): + if normalizer_info: + curie_info[curie]["categories"] = normalizer_info.get("type", ["biolink:NamedThing"]) + cooc = len(get_the_pmids([curie, source_node, target_node])) + num_pubs = len(get_the_pmids([curie])) + curie_info[curie]["pubs"] = num_pubs + curie_info[curie]["score"] = max(0, ((cooc / TOTAL_PUBS) - + (source_pubs / TOTAL_PUBS) * + (target_pubs / TOTAL_PUBS) * + (num_pubs / TOTAL_PUBS)) * + (normalizer_info.get( + "information_content", 1 + ) + / 10000)) + + pruned_curies = [] + while len(pruned_curies) < 50 and len(pruned_curies) < len(curies): + max_cov = 0 + for info in curie_info.values(): + max_cov = max(info["score"], max_cov) + for curie, info in curie_info.items(): + if info["score"] == max_cov: + pruned_curies.append(curie) + info["score"] = -1 + + node_category_mapping = defaultdict(list) + node_category_mapping[source_category].append(source_node) + node_category_mapping[target_category].append(target_node) + for curie in pruned_curies: + node_category_mapping[curie_info[curie]["categories"][0]].append(curie) + + lookup_nodes = {} + for category, category_curies in node_category_mapping.items(): + lookup_nodes[category.removeprefix("biolink:")] = { + "ids": category_curies, + "categories": [category] + } + + # Create queries matching each category to each other + strider_multiquery = [] + for subject_index, subject_category in enumerate(lookup_nodes.keys()): + for object_category in list(lookup_nodes.keys())[(subject_index + 1):]: + lookup_edge = { + "subject": subject_category, + "object": object_category, + "predicates": [ + "biolink:related_to" + ] + } + m = { + "message": { + "query_graph": { + "nodes": { + subject_category: lookup_nodes[subject_category], + object_category: lookup_nodes[object_category] + }, + "edges": { + "e0": lookup_edge + } + } + } + } + strider_multiquery.append(m) + lookup_messages = [] + # We will want to use multistrider here eventually + for lookup_message in await asyncio.gather(*[generate_from_strider(lookup_query) for lookup_query in strider_multiquery]): + if lookup_message: + lookup_message["query_graph"] = {"nodes": {}, "edges": {}} + lookup_messages.append(lookup_message) + + merged_lookup_message = Message.parse_obj(lookup_messages[0]) + lookup_results = [] + for lookup_message in lookup_messages: + # Build graph from results to avoid subclass loops + # Results do not concatenate when they have different qnode ids + lookup_message_object = Message.parse_obj(lookup_message) + merged_lookup_message.update(lookup_message_object) + lookup_results.extend(lookup_message_object.dict().get("results", [])) + merged_lookup_message_dict = merged_lookup_message.dict() + lookup_knowledge_graph = merged_lookup_message_dict.get("knowledge_graph", {"nodes": {}, "edges": {}}) + lookup_aux_graphs = merged_lookup_message_dict.get("auxiliary_graphs", {}) + + kg = networkx.Graph() + kg.add_nodes_from([source_node, target_node]) + for lookup_result in lookup_results: + for edge_binding in lookup_result["analyses"][0]["edge_bindings"]["e0"]: + edge_key = edge_binding["id"] + edge = lookup_knowledge_graph["edges"][edge_key] + e_subject = edge["subject"] + e_object = edge["object"] + + if e_subject not in [source_node, target_node] or e_object not in [source_node, target_node]: + if kg.has_edge(e_subject, e_object): + kg[e_subject][e_object]["keys"].append(edge_key) + else: + kg.add_edge(e_subject, e_object, keys=[edge_key]) + + paths = networkx.all_simple_paths(kg, source_node, target_node, 4) + num_paths = 0 + curie_path_mapping = defaultdict(list) + for path in paths: + num_paths += 1 + for curie in path: + if curie not in [source_node, target_node] and curie in curie_info.keys(): + if unpinned_node_category in curie_info[curie]["categories"]: + curie_path_mapping[curie].append(path) + logger.debug(f"{guid}: Found {num_paths} paths.") + + results = [] + aux_graphs = {} + knowledge_graph = {"nodes": copy.deepcopy(lookup_knowledge_graph["nodes"]), "edges": {}} + for node_key, node in qgraph["nodes"].items(): + if node.get("ids", None) is not None: + if pinned_node_ids[0] in node["ids"]: + source_node_key = node_key + elif pinned_node_ids[1] in node["ids"]: + target_node_key = node_key + else: + unpinned_node_key = node_key + for edge_key, edge in qgraph["edges"].items(): + if unpinned_node_key == edge["subject"] or unpinned_node_key == edge["object"]: + if source_node_key == edge["subject"] or source_node_key == edge["object"]: + before_edge_key = edge_key + elif target_node_key == edge["subject"] or target_node_key == edge["object"]: + after_edge_key = edge_key + else: + main_edge_key = edge_key + for curie, curie_paths in curie_path_mapping.items(): + aux_edges_list = [] + before_curie_edges_list = [] + after_curie_edges_list = [] + for path in curie_paths: + aux_edges = [] + before_curie_edges = [] + after_curie_edges = [] + before_curie = True + for i, node in enumerate(path[:-1]): + next_node = path[i+1] + if node == curie: + before_curie = False + for kedge_key in kg[node][next_node]["keys"]: + edge = lookup_knowledge_graph["edges"][kedge_key] + if (node in edge["subject"] or node in edge["object"]) and (next_node in edge["subject"] or next_node in edge["object"]): + knowledge_graph["edges"][kedge_key] = edge + # Handles support graphs from subclassing + for attribute in edge["attributes"]: + if attribute.get("attribute_type_id") == "biolink:support_graphs": + for support_graph in attribute.get("value", []): + aux_graphs[support_graph] = lookup_aux_graphs[support_graph] + for support_edge in lookup_aux_graphs[support_graph].get("edges", []): + knowledge_graph["edges"][support_edge] = lookup_knowledge_graph["edges"][support_edge] + if kedge_key not in aux_edges: + aux_edges.append(kedge_key) + if before_curie and kedge_key not in before_curie_edges: + before_curie_edges.append(kedge_key) + elif kedge_key not in after_curie_edges: + after_curie_edges.append(kedge_key) + aux_edges_list.append(aux_edges) + before_curie_edges_list.append(before_curie_edges) + after_curie_edges_list.append(after_curie_edges) + aux_edges_keys = [] + before_curie_edges_keys = [] + after_curie_edges_keys = [] + for aux_edges in aux_edges_list: + sha256 = hashlib.sha256() + for x in set(aux_edges): + sha256.update(bytes(x, encoding="utf-8")) + aux_edges_key = sha256.hexdigest() + if aux_edges_key not in aux_edges_keys: + aux_graphs[aux_edges_key] = {"edges": list(aux_edges), "attributes": []} + aux_edges_keys.append(aux_edges_key) + for before_curie_edges in before_curie_edges_list: + sha256 = hashlib.sha256() + for x in set(before_curie_edges): + sha256.update(bytes(x, encoding="utf-8")) + before_curie_edges_key = sha256.hexdigest() + if before_curie_edges_key not in before_curie_edges_keys: + aux_graphs[before_curie_edges_key] = {"edges": list(before_curie_edges), "attributes": []} + before_curie_edges_keys.append(before_curie_edges_key) + for after_curie_edges in after_curie_edges_list: + sha256 = hashlib.sha256() + for x in set(after_curie_edges): + sha256.update(bytes(x, encoding="utf-8")) + after_curie_edges_key = sha256.hexdigest() + if after_curie_edges_key not in after_curie_edges_keys: + aux_graphs[after_curie_edges_key] = {"edges": list(after_curie_edges), "attributes": []} + after_curie_edges_keys.append(after_curie_edges_key) + main_edge = uuid.uuid1().hex + before_edge = uuid.uuid1().hex + after_edge = uuid.uuid1().hex + knowledge_graph["edges"][main_edge] = { + "subject": source_node, + "object": target_node, + "predicate": "biolink:related_to", + "sources": [ + { + "resource_id": "infores:shadowfax", + "resource_role": "primary_knowledge_source", + } + ], + "attributes": [ + { + "attribute_type_id": "biolink:support_graphs", + "value": aux_edges_keys + } + ] + } + knowledge_graph["edges"][before_edge] = { + "subject": source_node, + "object": curie, + "predicate": "biolink:related_to", + "sources": [ + { + "resource_id": "infores:shadowfax", + "resource_role": "primary_knowledge_source", + } + ], + "attributes": [ + { + "attribute_type_id": "biolink:support_graphs", + "value": before_curie_edges_keys + } + ] + } + knowledge_graph["edges"][after_edge] = { + "subject": curie, + "object": target_node, + "predicate": "biolink:related_to", + "sources": [ + { + "resource_id": "infores:shadowfax", + "resource_role": "primary_knowledge_source", + } + ], + "attributes": [ + { + "attribute_type_id": "biolink:support_graphs", + "value": after_curie_edges_keys + } + ] + } + result = { + "node_bindings": { + source_node_key: [ + { + "id": source_node, + "attributes": [] + } + ], + unpinned_node_key: [ + { + "id": curie, + "attributes": [] + } + ], + target_node_key: [ + { + "id": target_node, + "attributes": [] + } + ] + }, + "analyses": [ + { + "resource_id": "infores:aragorn", + "edge_bindings": { + main_edge_key: [ + { + "id": main_edge, + "attributes": [] + } + ], + before_edge_key: [ + { + "id": before_edge, + "attributes": [] + } + ], + after_edge_key: [ + { + "id": after_edge, + "attributes": [] + } + ] + } + } + ] + } + results.append(result) + result_message = { + "message": { + "query_graph": message["message"]["query_graph"], + "knowledge_graph": knowledge_graph, + "results": results, + "auxiliary_graphs": aux_graphs + } + } + + return result_message, 200 From d29ff539848c2007bae5554a0c31c4cb19fd4b3d Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 08:11:54 -0400 Subject: [PATCH 02/14] add in shadowfax --- src/service_aggregator.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 54fccfc..7947ec8 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -23,6 +23,7 @@ from asyncio.exceptions import TimeoutError from reasoner_pydantic import Query, KnowledgeGraph, QueryGraph from reasoner_pydantic import Response as PDResponse +from src.shadowfax import shadowfax import uuid DUMPTRUCK = False @@ -47,6 +48,8 @@ def examine_query(message): # queries that are any shape with all lookup edges # OR # A 1-hop infer query. + # OR + # Pathfinder query try: # this can still fail if the input looks like e.g.: # "query_graph": None @@ -57,13 +60,14 @@ def examine_query(message): for edge_id in qedges: if qedges.get(edge_id, {}).get("knowledge_type", "lookup") == "inferred": n_infer_edges += 1 - if n_infer_edges > 1: + pathfinder = n_infer_edges == 3 + if n_infer_edges > 1 and n_infer_edges and not pathfinder: raise Exception("Only a single infer edge is supported", 400) if (n_infer_edges > 0) and (n_infer_edges < len(qedges)): raise Exception("Mixed infer and lookup queries not supported", 400) infer = n_infer_edges == 1 if not infer: - return infer, None, None + return infer, None, None, pathfinder qnodes = message.get("message", {}).get("query_graph", {}).get("nodes", {}) question_node = None answer_node = None @@ -76,7 +80,7 @@ def examine_query(message): raise Exception("Both nodes of creative edge pinned", 400) if question_node is None: raise Exception("No nodes of creative edge pinned", 400) - return infer, question_node, answer_node + return infer, question_node, answer_node, pathfinder def match_results_to_query(results, query_message, query_source, query_target, query_qedge_id): @@ -111,7 +115,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): """ try: - infer, question_qnode, answer_qnode = examine_query(message) + infer, question_qnode, answer_qnode, pathfinder = examine_query(message) except Exception as e: print(e) return None, 500 @@ -127,7 +131,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): # e.g. our score operation will include both weighting and scoring for now. # Also gives us a place to handle function specific logic known_operations = { - "lookup": partial(lookup, caller=caller, infer=infer, answer_qnode=answer_qnode, question_qnode=question_qnode, bypass_cache=bypass_cache), + "lookup": partial(lookup, caller=caller, infer=infer, pathfinder=pathfinder, answer_qnode=answer_qnode, question_qnode=question_qnode, bypass_cache=bypass_cache), "enrich_results": partial(answercoalesce, coalesce_type=coalesce_type), "overlay_connect_knodes": omnicorp, "score": score, @@ -154,6 +158,13 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): {"id": "score"}, {"id": "filter_message_top_n", "parameters": {"max_results": 500}}, ] + elif pathfinder: + workflow_def = [ + {"id": "lookup"}, + {"id": "overlay_connect_knodes"}, + {"id": "score"}, + {"id": "filter_message_top_n", "parameters": {"max_results": 500}} + ] else: # TODO: if this is robokop, need to normalize. workflow_def = [ @@ -171,7 +182,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): # We told the world what we can do! # Workflow will be a list of the functions, and the parameters if there are any - read_from_cache = not (bypass_cache or overwrite_cache) + read_from_cache = not (bypass_cache or overwrite_cache) and not pathfinder try: query_graph = message["message"]["query_graph"] @@ -221,7 +232,8 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): if overwrite_cache or (not bypass_cache): if infer: results_cache.set_result(input_id, predicate, qualifiers, source_input, caller, workflow_def, mcq, member_ids, final_answer) - elif {"id": "lookup"} in workflow_def: + elif {"id": "lookup"} in workflow_def and not pathfinder: + # We won't cache pathfinder results for now results_cache.set_lookup_result(workflow_def, query_graph, final_answer) # return the answer @@ -708,7 +720,7 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params={}) return ret_val, status_code -async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qnode=None, question_qnode=None, bypass_cache=False) -> (dict, int): +async def lookup(message, params, guid, infer=False, pathfinder=False, caller="ARAGORN", answer_qnode=None, question_qnode=None, bypass_cache=False) -> (dict, int): """ Performs lookup, parameterized by ARAGORN/ROBOKOP and whether the query is an infer type query @@ -720,7 +732,7 @@ async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qn """ message = await normalize_qgraph_ids(message) if caller == "ARAGORN": - return await aragorn_lookup(message, params, guid, infer, answer_qnode, bypass_cache) + return await aragorn_lookup(message, params, guid, infer, pathfinder, answer_qnode, bypass_cache) elif caller == "ROBOKOP": robo_results, robo_status = await robokop_lookup(message, params, guid, infer, question_qnode, answer_qnode) return await add_provenance(robo_results), robo_status @@ -755,10 +767,12 @@ async def de_noneify(message): await de_noneify(item) -async def aragorn_lookup(input_message, params, guid, infer, answer_qnode, bypass_cache): +async def aragorn_lookup(input_message, params, guid, infer, pathfinder, answer_qnode, bypass_cache): timeout_seconds = (input_message.get("parameters") or {}).get("timeout_seconds") if timeout_seconds: params["timeout_seconds"] = timeout_seconds if type(timeout_seconds) is int else 3 * 60 + if pathfinder: + return await shadowfax(input_message, guid, logger) if not infer: return await strider(input_message, params, guid, bypass_cache) # Now it's an infer query. From 214e3b712b59cefae1fc639cbda85253a7f5bd40 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 08:12:06 -0400 Subject: [PATCH 03/14] update requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index b8995ea..a57a474 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,3 +21,4 @@ opentelemetry-instrumentation-fastapi==0.37b0 opentelemetry-exporter-jaeger==1.16.0 opentelemetry-instrumentation-httpx==0.37b0 fakeredis<=2.10.2 +networkx==3.3 From 393e17a5673bf5438da2372c4e7ef2b6c678f758 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 08:15:49 -0400 Subject: [PATCH 04/14] remove debugging stuff --- src/pathfinder/get_cooccurrence.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/pathfinder/get_cooccurrence.py b/src/pathfinder/get_cooccurrence.py index 8b4de53..829048e 100644 --- a/src/pathfinder/get_cooccurrence.py +++ b/src/pathfinder/get_cooccurrence.py @@ -12,7 +12,6 @@ def get_the_pmids(curies: List[str]): port=6379, db=0 ) - start = time.time() curie_pmids = [] for curie in curies: pmids = r.get(curie) @@ -20,11 +19,8 @@ def get_the_pmids(curies: List[str]): pmids = [] else: pmids = json.loads(gzip.decompress(pmids)) - # print(len(pmids)) curie_pmids.append(pmids) answer = list(set.intersection(*map(set, curie_pmids))) - end = time.time() - # print(f"Got back {len(answer)} pmids in {end - start} seconds.") return answer def get_the_curies(pmid: str): @@ -33,18 +29,10 @@ def get_the_curies(pmid: str): port=6379, db=1 ) - start = time.time() curies = r.get(pmid) if curies is None: curies = [] else: curies = json.loads(gzip.decompress(curies)) answer = list(curies) - end = time.time() - # print(f"Got back {len(answer)} curies in {end - start} seconds.") return answer - - -if __name__ == "__main__": - print(len(get_the_pmids(["MONDO:0004979"]))) - print(len(get_the_pmids(["CHEBI:45783"]))) From 4e76e9da50f0fb568912c746dfbb8fcf2259d8b5 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 08:22:59 -0400 Subject: [PATCH 05/14] fix tests --- tests/test_result_merge.py | 66 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/tests/test_result_merge.py b/tests/test_result_merge.py index a2149dc..1ae60ec 100644 --- a/tests/test_result_merge.py +++ b/tests/test_result_merge.py @@ -31,7 +31,69 @@ def test_query_examination(): } } } - inferred, qnode, anode = examine_query(query) + inferred, qnode, anode, pathfinder = examine_query(query) assert inferred assert qnode == 'disease' - assert anode == 'chemical' \ No newline at end of file + assert anode == 'chemical' + assert not pathfinder + +def test_query_examination_pathfinder(): + query = { + "message": { + "query_graph": { + "nodes": { + "n0": { + "ids": [ + "PUBCHEM.COMPOUND:5291" + ], + "name": "imatinib" + }, + "n1": { + "ids": [ + "MONDO:0004979" + ], + "name": "asthma" + }, + "un": { + "categories": [ + "biolink:NamedThing" + ] + } + }, + "edges": { + "e0": { + "subject": "n0", + "object": "n1", + "predicates": [ + "biolink:related_to" + ], + "knowledge_type": "inferred" + }, + "e1": { + "subject": "n0", + "object": "un", + "predicates": [ + "biolink:related_to" + ], + "knowledge_type": "inferred" + }, + "e2": { + "subject": "n1", + "object": "un", + "predicates": [ + "biolink:related_to" + ], + "knowledge_type": "inferred" + } + } + }, + "knowledge_graph": { + "nodes": {}, + "edges": {} + }, + "results": [] + } + } + inferred, qnode, anode, pathfinder = examine_query(query) + assert not inferred + assert pathfinder \ No newline at end of file From bb67ae0882930da1a0d8cdd9b754e381330df89e Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 08:30:31 -0400 Subject: [PATCH 06/14] cahnge node norm to env variable --- src/shadowfax.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/shadowfax.py b/src/shadowfax.py index 1f7ec06..bc778f6 100644 --- a/src/shadowfax.py +++ b/src/shadowfax.py @@ -12,7 +12,7 @@ from src.pathfinder.get_cooccurrence import get_the_curies, get_the_pmids # this should be an environment variable -node_norm_url = "https://nodenormalization-sri.renci.org" +node_norm_url = os.environ.get("NODENORM_URL", "https://nodenormalization-sri.renci.org/") strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/") num_intermediate_hops = 3 TOTAL_PUBS = 27840000 From f51ea5ecce37286167c6866616d7cf1cd583f9d7 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 08:30:46 -0400 Subject: [PATCH 07/14] remove comment --- src/shadowfax.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/shadowfax.py b/src/shadowfax.py index bc778f6..7ad542c 100644 --- a/src/shadowfax.py +++ b/src/shadowfax.py @@ -11,7 +11,6 @@ from src.pathfinder.get_cooccurrence import get_the_curies, get_the_pmids -# this should be an environment variable node_norm_url = os.environ.get("NODENORM_URL", "https://nodenormalization-sri.renci.org/") strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/") num_intermediate_hops = 3 From 6374abb023bcca8cae00c4e02fe2b8d1e6c701cd Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 08:38:32 -0400 Subject: [PATCH 08/14] fix urls --- src/shadowfax.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/shadowfax.py b/src/shadowfax.py index 7ad542c..a0076b1 100644 --- a/src/shadowfax.py +++ b/src/shadowfax.py @@ -20,7 +20,7 @@ async def generate_from_strider(message): try: async with httpx.AsyncClient(timeout=3600) as client: lookup_response = await client.post( - url=strider_url, + url=strider_url + "query", json=message, ) lookup_response.raise_for_status() @@ -44,7 +44,7 @@ async def shadowfax(message, guid, logger): async with httpx.AsyncClient(timeout=900) as client: normalized_pinned_ids = await client.post( - url=node_norm_url + "/get_normalized_nodes", + url=node_norm_url + "get_normalized_nodes", json={ "curies": list(pinned_node_ids), "conflate": True, @@ -87,7 +87,7 @@ async def shadowfax(message, guid, logger): "drug_chemical_conflate": True } normalizer_response = await client.post( - url=node_norm_url + "/get_normalized_nodes", + url=node_norm_url + "get_normalized_nodes", json=normmalizer_input ) normalizer_response.raise_for_status() @@ -157,11 +157,13 @@ async def shadowfax(message, guid, logger): } strider_multiquery.append(m) lookup_messages = [] + logger.debug(f"{guid}: Sending {len(strider_multiquery)} requests to strider.") # We will want to use multistrider here eventually for lookup_message in await asyncio.gather(*[generate_from_strider(lookup_query) for lookup_query in strider_multiquery]): if lookup_message: lookup_message["query_graph"] = {"nodes": {}, "edges": {}} lookup_messages.append(lookup_message) + logger.debug(f"{guid}: Received {len(lookup_messages)} responses from strider.") merged_lookup_message = Message.parse_obj(lookup_messages[0]) lookup_results = [] From 35004d89fc36d54552412b8ace42c57eb4a90f16 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 10:49:43 -0400 Subject: [PATCH 09/14] fix redis --- src/pathfinder/get_cooccurrence.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/pathfinder/get_cooccurrence.py b/src/pathfinder/get_cooccurrence.py index 829048e..f24da98 100644 --- a/src/pathfinder/get_cooccurrence.py +++ b/src/pathfinder/get_cooccurrence.py @@ -1,16 +1,23 @@ """Get more than pairwise literature cooccurence for a given list of curies.""" import json import gzip +import os import redis import time from typing import List +REDIS_HOST = os.environ.get("REDIS_HOST", "localhost") +REDIS_PORT = os.environ.get("REDIS_PORT", 6379) +PMIDS_DB = os.environ.get("PMIDS_DB", 1) +CURIES_DB = os.environ.get("CURIES_DB", 2) +REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD") def get_the_pmids(curies: List[str]): r = redis.Redis( - host="localhost", - port=6379, - db=0 + host=REDIS_HOST, + port=REDIS_PORT, + db=PMIDS_DB, + password=REDIS_PASSWORD ) curie_pmids = [] for curie in curies: @@ -25,9 +32,10 @@ def get_the_pmids(curies: List[str]): def get_the_curies(pmid: str): r = redis.Redis( - host="localhost", - port=6379, - db=1 + host=REDIS_HOST, + port=REDIS_PORT, + db=CURIES_DB, + password=REDIS_PASSWORD ) curies = r.get(pmid) if curies is None: From ead94cffa2f98ab19b7fb7c1a126830fde20ebb3 Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 10:52:34 -0400 Subject: [PATCH 10/14] add another comment --- src/shadowfax.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/shadowfax.py b/src/shadowfax.py index a0076b1..3aef9e5 100644 --- a/src/shadowfax.py +++ b/src/shadowfax.py @@ -199,6 +199,7 @@ async def shadowfax(message, guid, logger): num_paths += 1 for curie in path: if curie not in [source_node, target_node] and curie in curie_info.keys(): + # Handles constraints if unpinned_node_category in curie_info[curie]["categories"]: curie_path_mapping[curie].append(path) logger.debug(f"{guid}: Found {num_paths} paths.") From ae7fd884b347025619a5a8e9c79ef925a7236a1a Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 11:00:43 -0400 Subject: [PATCH 11/14] add example --- documentation/pathfinder_example.json | 56 +++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 documentation/pathfinder_example.json diff --git a/documentation/pathfinder_example.json b/documentation/pathfinder_example.json new file mode 100644 index 0000000..eef2fbb --- /dev/null +++ b/documentation/pathfinder_example.json @@ -0,0 +1,56 @@ +{ + "message": { + "query_graph": { + "nodes": { + "n0": { + "ids": [ + "PUBCHEM.COMPOUND:5291" + ], + "name": "imatinib" + }, + "n1": { + "ids": [ + "MONDO:0004979" + ], + "name": "asthma" + }, + "un": { + "categories": [ + "biolink:NamedThing" + ] + } + }, + "edges": { + "e0": { + "subject": "n0", + "object": "n1", + "predicates": [ + "biolink:related_to" + ], + "knowledge_type": "inferred" + }, + "e1": { + "subject": "n0", + "object": "un", + "predicates": [ + "biolink:related_to" + ], + "knowledge_type": "inferred" + }, + "e2": { + "subject": "n1", + "object": "un", + "predicates": [ + "biolink:related_to" + ], + "knowledge_type": "inferred" + } + } + }, + "knowledge_graph": { + "nodes": {}, + "edges": {} + }, + "results": [] + } +} \ No newline at end of file From 99671a3bf267cedc528d572255f6a5a3f1bc258c Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 13:04:00 -0400 Subject: [PATCH 12/14] wrap node norm call --- src/shadowfax.py | 54 +++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/src/shadowfax.py b/src/shadowfax.py index 3aef9e5..14660df 100644 --- a/src/shadowfax.py +++ b/src/shadowfax.py @@ -43,17 +43,21 @@ async def shadowfax(message, guid, logger): return message, 500 async with httpx.AsyncClient(timeout=900) as client: - normalized_pinned_ids = await client.post( - url=node_norm_url + "get_normalized_nodes", - json={ - "curies": list(pinned_node_ids), - "conflate": True, - "description": False, - "drug_chemical_conflate": True - } - ) - normalized_pinned_ids.raise_for_status() - normalized_pinned_ids = normalized_pinned_ids.json() + try: + normalized_pinned_ids = await client.post( + url=node_norm_url + "get_normalized_nodes", + json={ + "curies": list(pinned_node_ids), + "conflate": True, + "description": False, + "drug_chemical_conflate": True + } + ) + normalized_pinned_ids.raise_for_status() + normalized_pinned_ids = normalized_pinned_ids.json() + except: + logger.info(f"{guid}: Failed to get a response from node norm") + return message, 500 source_node = normalized_pinned_ids.get(pinned_node_ids[0], {"id": {"identifier": pinned_node_ids[0]}})["id"]["identifier"] source_category = normalized_pinned_ids.get(pinned_node_ids[0], {"type": ["biolink:NamedThing"]})["type"][0] source_equivalent_ids = [i["identifier"] for i in normalized_pinned_ids.get(pinned_node_ids[0], {"equivalent_identifiers": []})["equivalent_identifiers"]] @@ -80,18 +84,22 @@ async def shadowfax(message, guid, logger): return message, 200 async with httpx.AsyncClient(timeout=900) as client: - normmalizer_input = { - "curies": list(curies), - "conflate": True, - "description": False, - "drug_chemical_conflate": True - } - normalizer_response = await client.post( - url=node_norm_url + "get_normalized_nodes", - json=normmalizer_input - ) - normalizer_response.raise_for_status() - normalizer_response = normalizer_response.json() + try: + normmalizer_input = { + "curies": list(curies), + "conflate": True, + "description": False, + "drug_chemical_conflate": True + } + normalizer_response = await client.post( + url=node_norm_url + "get_normalized_nodes", + json=normmalizer_input + ) + normalizer_response.raise_for_status() + normalizer_response = normalizer_response.json() + except: + logger.info(f"{guid}: Failed to get a response from node norm") + return message, 500 curie_info = defaultdict(dict) for curie, normalizer_info in normalizer_response.items(): if normalizer_info: From ee4df8f0781c13459b72328c577176ced16b2a2b Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 18:29:54 -0400 Subject: [PATCH 13/14] kl/at and other minor fixes --- src/shadowfax.py | 99 +++++++++++++++++++++++++++++------------------- 1 file changed, 59 insertions(+), 40 deletions(-) diff --git a/src/shadowfax.py b/src/shadowfax.py index 14660df..5b3aeaa 100644 --- a/src/shadowfax.py +++ b/src/shadowfax.py @@ -30,34 +30,38 @@ async def generate_from_strider(message): return lookup_response.get("message", {}) -async def shadowfax(message, guid, logger): - qgraph = message["message"]["query_graph"] - pinned_node_ids = [] - for node in qgraph["nodes"].values(): - if node.get("ids", None) is not None: - pinned_node_ids.append(node["ids"][0]) - else: - unpinned_node_category = node.get("categories", ["biolink:NamedThing"])[0] - if len(pinned_node_ids) != 2: - logger.info(f"{guid}: Pathfinder queries requrie two pinned nodes.") - return message, 500 - +async def get_normalized_curies(curies, guid, logger): async with httpx.AsyncClient(timeout=900) as client: try: - normalized_pinned_ids = await client.post( + normalizer_response = await client.post( url=node_norm_url + "get_normalized_nodes", json={ - "curies": list(pinned_node_ids), + "curies": list(curies), "conflate": True, "description": False, "drug_chemical_conflate": True } ) - normalized_pinned_ids.raise_for_status() - normalized_pinned_ids = normalized_pinned_ids.json() + normalizer_response.raise_for_status() + return normalizer_response.json() except: logger.info(f"{guid}: Failed to get a response from node norm") - return message, 500 + + +async def shadowfax(message, guid, logger): + qgraph = message["message"]["query_graph"] + pinned_node_ids = [] + for node in qgraph["nodes"].values(): + if node.get("ids", None) is not None: + pinned_node_ids.append(node["ids"][0]) + else: + unpinned_node_category = node.get("categories", ["biolink:NamedThing"])[0] + if len(pinned_node_ids) != 2: + logger.info(f"{guid}: Pathfinder queries require two pinned nodes.") + return message, 500 + + normalized_pinned_ids = await get_normalized_curies(pinned_node_ids, guid, logger) + source_node = normalized_pinned_ids.get(pinned_node_ids[0], {"id": {"identifier": pinned_node_ids[0]}})["id"]["identifier"] source_category = normalized_pinned_ids.get(pinned_node_ids[0], {"type": ["biolink:NamedThing"]})["type"][0] source_equivalent_ids = [i["identifier"] for i in normalized_pinned_ids.get(pinned_node_ids[0], {"equivalent_identifiers": []})["equivalent_identifiers"]] @@ -83,23 +87,8 @@ async def shadowfax(message, guid, logger): logger.info(f"{guid}: No curies found.") return message, 200 - async with httpx.AsyncClient(timeout=900) as client: - try: - normmalizer_input = { - "curies": list(curies), - "conflate": True, - "description": False, - "drug_chemical_conflate": True - } - normalizer_response = await client.post( - url=node_norm_url + "get_normalized_nodes", - json=normmalizer_input - ) - normalizer_response.raise_for_status() - normalizer_response = normalizer_response.json() - except: - logger.info(f"{guid}: Failed to get a response from node norm") - return message, 500 + normalizer_response = await get_normalized_curies(list(curies), guid, logger) + curie_info = defaultdict(dict) for curie, normalizer_info in normalizer_response.items(): if normalizer_info: @@ -178,9 +167,9 @@ async def shadowfax(message, guid, logger): for lookup_message in lookup_messages: # Build graph from results to avoid subclass loops # Results do not concatenate when they have different qnode ids - lookup_message_object = Message.parse_obj(lookup_message) - merged_lookup_message.update(lookup_message_object) - lookup_results.extend(lookup_message_object.dict().get("results", [])) + lookup_message_obj = Message.parse_obj(lookup_message) + merged_lookup_message.update(lookup_message_obj) + lookup_results.extend(lookup_message_obj.dict().get("results") or []) merged_lookup_message_dict = merged_lookup_message.dict() lookup_knowledge_graph = merged_lookup_message_dict.get("knowledge_graph", {"nodes": {}, "edges": {}}) lookup_aux_graphs = merged_lookup_message_dict.get("auxiliary_graphs", {}) @@ -300,7 +289,7 @@ async def shadowfax(message, guid, logger): "predicate": "biolink:related_to", "sources": [ { - "resource_id": "infores:shadowfax", + "resource_id": "infores:aragorn", "resource_role": "primary_knowledge_source", } ], @@ -308,6 +297,16 @@ async def shadowfax(message, guid, logger): { "attribute_type_id": "biolink:support_graphs", "value": aux_edges_keys + }, + { + "attribute_type_id": "biolink:agent_type", + "value": "computational_model", + "attribute_source": "infores:aragorn" + }, + { + "attribute_type_id": "biolink:knowledge_level", + "value": "prediction", + "attribute_source": "infores:aragorn" } ] } @@ -317,7 +316,7 @@ async def shadowfax(message, guid, logger): "predicate": "biolink:related_to", "sources": [ { - "resource_id": "infores:shadowfax", + "resource_id": "infores:aragorn", "resource_role": "primary_knowledge_source", } ], @@ -325,6 +324,16 @@ async def shadowfax(message, guid, logger): { "attribute_type_id": "biolink:support_graphs", "value": before_curie_edges_keys + }, + { + "attribute_type_id": "biolink:agent_type", + "value": "computational_model", + "attribute_source": "infores:aragorn" + }, + { + "attribute_type_id": "biolink:knowledge_level", + "value": "prediction", + "attribute_source": "infores:aragorn" } ] } @@ -334,7 +343,7 @@ async def shadowfax(message, guid, logger): "predicate": "biolink:related_to", "sources": [ { - "resource_id": "infores:shadowfax", + "resource_id": "infores:aragorn", "resource_role": "primary_knowledge_source", } ], @@ -342,6 +351,16 @@ async def shadowfax(message, guid, logger): { "attribute_type_id": "biolink:support_graphs", "value": after_curie_edges_keys + }, + { + "attribute_type_id": "biolink:agent_type", + "value": "computational_model", + "attribute_source": "infores:aragorn" + }, + { + "attribute_type_id": "biolink:knowledge_level", + "value": "prediction", + "attribute_source": "infores:aragorn" } ] } From 3565aab52bf62a669240395371b7fd0f6cff215b Mon Sep 17 00:00:00 2001 From: uhbrar Date: Fri, 6 Sep 2024 18:32:55 -0400 Subject: [PATCH 14/14] version bump --- openapi-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openapi-config.yaml b/openapi-config.yaml index 793e420..99e92a6 100644 --- a/openapi-config.yaml +++ b/openapi-config.yaml @@ -11,7 +11,7 @@ servers: # url: http://127.0.0.1:5000 termsOfService: http://robokop.renci.org:7055/tos?service_long=ARAGORN&provider_long=RENCI title: ARAGORN -version: 2.8.4 +version: 2.9.0 tags: - name: translator - name: ARA