From e3de1bbfaf0b9f2771eb9e31dee3401953019aad Mon Sep 17 00:00:00 2001 From: uhbrar Date: Mon, 25 Nov 2024 13:04:21 -0500 Subject: [PATCH] update documentation --- README.md | 5 +++-- src/pathfinder/get_cooccurrence.py | 2 ++ src/service_aggregator.py | 29 ++++++++++++++++++----------- src/shadowfax.py | 19 +++++++++++++++++-- 4 files changed, 40 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 4ac7e0e..2d73901 100644 --- a/README.md +++ b/README.md @@ -8,13 +8,14 @@ A tool to query Knowledge Providers (KPs) and synthesize highly ranked answers r * Bridges the precision mismatch between data specificity in KPs and more abstract levels of user queries. * Generalizes answer ranking. * Normalizes data to use preferred and equivalent identifiers. +* Uses mined rules to perform inference and provide predicted answers for certain queries. +* Provides a way to run pathfinder queries, using literature co-occurrence to find multi-hop paths between any two nodes. The ARAGORN tool relies on a number of external services to perform a standardized ranking of a user-specified question. - Strider - Accepts a query and provides knowledge-provider querying, answer generation and ranking. - - Answer Coalesce - Accepts a query containing Strider answers and returns answers that have been coalesced by property, graph and/or ontology analysis. - Node normalization - A Translator SRI service that provides the preferred CURIE and equivalent identifiers for data in the query. - - ARAGORN Ranker - Accepts a query and provides Omnicorp overlays, score and weight-correctness rankings of coalesced answers. + - ARAGORN Ranker - Accepts a query and provides Omnicorp overlays and scores of answers. ## Demonstration diff --git a/src/pathfinder/get_cooccurrence.py b/src/pathfinder/get_cooccurrence.py index f24da98..3283af5 100644 --- a/src/pathfinder/get_cooccurrence.py +++ b/src/pathfinder/get_cooccurrence.py @@ -13,6 +13,7 @@ REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD") def get_the_pmids(curies: List[str]): + """Returns a list of pmids for papers that mention all curies in list""" r = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, @@ -31,6 +32,7 @@ def get_the_pmids(curies: List[str]): return answer def get_the_curies(pmid: str): + """Returns a list of all curies in the paper corresponding to the pmid.""" r = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 5cd57b2..1d19a8f 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -771,6 +771,7 @@ async def aragorn_lookup(input_message, params, guid, infer, pathfinder, answer_ timeout_seconds = (input_message.get("parameters") or {}).get("timeout_seconds") if timeout_seconds: params["timeout_seconds"] = timeout_seconds if type(timeout_seconds) is int else 3 * 60 + # If pathfinder, run shadowfax if pathfinder: return await shadowfax(input_message, guid, logger) if not infer: @@ -824,6 +825,7 @@ def merge_results_by_node_op(message, params, guid) -> (dict, int): async def strider(message, params, guid, bypass_cache) -> (dict, int): + """Runs a single strider query.""" strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/") # select the type of query post. "test" will come from the tester @@ -841,6 +843,7 @@ async def strider(message, params, guid, bypass_cache) -> (dict, int): async def normalize_qgraph_ids(m): + """Normalizes qgraph so ids are the ones referred to by node norm.""" url = f'{os.environ.get("NODENORM_URL", "https://nodenormalization-sri.renci.org/")}get_normalized_nodes' qnodes = m["message"]["query_graph"]["nodes"] qnode_ids = set() @@ -880,7 +883,9 @@ def get_infer_parameters(input_message): qualifiers: the qualifiers of the inferred edge source: the query node id of the source node target: the query node id of the target node - source_input: True if the source node is the input node, False if the target node is the input node""" + source_input: True if the source node is the input node, False if the target node is the input node + mcq: True if this is a set input query, false otherwise + member_ids: The members of the set""" for edge_id, edge in input_message["message"]["query_graph"]["edges"].items(): source = edge["subject"] target = edge["object"] @@ -917,22 +922,23 @@ def get_rule_key(predicate, qualifiers, mcq): return json.dumps(keydict,sort_keys=True) def expand_query(input_message, params, guid): - #Contract: 1. there is a single edge in the query graph 2. The edge is marked inferred. 3. Either the source + """Expands query based on rules""" + # Contract: 1. there is a single edge in the query graph 2. The edge is marked inferred. 3. Either the source # or the target has IDs, but not both. 4. The number of ids on the query node is 1. input_id, predicate, qualifiers, source, source_input, target, qedge_id, mcq, member_ids = get_infer_parameters(input_message) key = get_rule_key(predicate, qualifiers, mcq) - #We want to run the non-inferred version of the query as well + # We want to run the non-inferred version of the query as well qg = deepcopy(input_message["message"]["query_graph"]) for eid,edge in qg["edges"].items(): del edge["knowledge_type"] messages = [{"message": {"query_graph":qg}, "parameters": input_message.get("parameters") or {}}] - #If it's an MCQ, then we also copy the KG which has the member_of edges + # If it's an MCQ, then we also copy the KG which has the member_of edges if mcq: messages[0]["message"]["knowledge_graph"] = deepcopy(input_message["message"]["knowledge_graph"]) - #If we don't have any AMIE expansions, this will just generate the direct query + # If we don't have any AMIE expansions, this will just generate the direct query for rule_def in AMIE_EXPANSIONS.get(key,[]): query_template = Template(json.dumps(rule_def["template"])) - #need to do a bit of surgery depending on what the input is. + # need to do a bit of surgery depending on what the input is. if source_input: qs = query_template.substitute(source=source,target=target,source_id = input_id, target_id='') else: @@ -961,12 +967,13 @@ def expand_query(input_message, params, guid): async def multi_strider(messages, params, guid, bypass_cache): + """Runs multi strider""" strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/") strider_url += "multiquery" - #We don't want to do subservice_post, because that assumes TRAPI in and out. - #it leads to confusion. - #response, status_code = await subservice_post("strider", strider_url, messages, guid, asyncquery=True) + # We don't want to do subservice_post, because that assumes TRAPI in and out. + # it leads to confusion. + # response, status_code = await subservice_post("strider", strider_url, messages, guid, asyncquery=True) responses = await post_with_callback(strider_url,messages,guid,params,bypass_cache) return responses @@ -988,9 +995,9 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer, robokop): """Create a new knowledge edge in the result message, with the aux graph ids as support.""" # Find the subject, object, and predicate of the original query query_graph = result_message["message"]["query_graph"] - #get the first key and value from the edges + # get the first key and value from the edges qedge_id, qedge = next(iter(query_graph["edges"].items())) - #For the nodes, if there is an id, then use it in the knowledge edge. If there is not, then use the answer + # For the nodes, if there is an id, then use it in the knowledge edge. If there is not, then use the answer qnode_subject_id = qedge["subject"] qnode_object_id = qedge["object"] if "ids" in query_graph["nodes"][qnode_subject_id] and query_graph["nodes"][qnode_subject_id]["ids"] is not None: diff --git a/src/shadowfax.py b/src/shadowfax.py index 5b3aeaa..3bc5f44 100644 --- a/src/shadowfax.py +++ b/src/shadowfax.py @@ -1,6 +1,7 @@ import asyncio from collections import defaultdict import copy +import json import uuid import hashlib import os @@ -17,6 +18,7 @@ TOTAL_PUBS = 27840000 async def generate_from_strider(message): + """Generates knowledge graphs from strider.""" try: async with httpx.AsyncClient(timeout=3600) as client: lookup_response = await client.post( @@ -31,6 +33,8 @@ async def generate_from_strider(message): async def get_normalized_curies(curies, guid, logger): + """Gives us normalized curies that we can look up in our database, assuming + the database is also properly normalized.""" async with httpx.AsyncClient(timeout=900) as client: try: normalizer_response = await client.post( @@ -49,6 +53,9 @@ async def get_normalized_curies(curies, guid, logger): async def shadowfax(message, guid, logger): + """Processes pathfinder queries. This is done by using literature co-occurrence + to find nodes that occur in publications with our input nodes, then finding + paths that connect our input nodes through these intermediate nodes.""" qgraph = message["message"]["query_graph"] pinned_node_ids = [] for node in qgraph["nodes"].values(): @@ -69,13 +76,15 @@ async def shadowfax(message, guid, logger): target_equivalent_ids = [i["identifier"] for i in normalized_pinned_ids.get(pinned_node_ids[1], {"equivalent_identifiers": []})["equivalent_identifiers"]] target_category = normalized_pinned_ids.get(pinned_node_ids[1], {"type": ["biolink:NamedThing"]})["type"][0] + # Find shared publications between input nodes source_pubs = len(get_the_pmids([source_node])) target_pubs = len(get_the_pmids([target_node])) pairwise_pubs = get_the_pmids([source_node, target_node]) if source_pubs == 0 or target_pubs == 0 or len(pairwise_pubs) == 0: logger.info(f"{guid}: No publications found.") return message, 200 - + + # Find other nodes from those shared publications curies = set() for pub in pairwise_pubs: curie_list = get_the_curies(pub) @@ -105,6 +114,7 @@ async def shadowfax(message, guid, logger): ) / 10000)) + # Find the nodes with most significant co-occurrence pruned_curies = [] while len(pruned_curies) < 50 and len(pruned_curies) < len(curies): max_cov = 0 @@ -174,6 +184,7 @@ async def shadowfax(message, guid, logger): lookup_knowledge_graph = merged_lookup_message_dict.get("knowledge_graph", {"nodes": {}, "edges": {}}) lookup_aux_graphs = merged_lookup_message_dict.get("auxiliary_graphs", {}) + # Build a large kg and find paths kg = networkx.Graph() kg.add_nodes_from([source_node, target_node]) for lookup_result in lookup_results: @@ -199,8 +210,8 @@ async def shadowfax(message, guid, logger): # Handles constraints if unpinned_node_category in curie_info[curie]["categories"]: curie_path_mapping[curie].append(path) - logger.debug(f"{guid}: Found {num_paths} paths.") + # Build knowledge graph from paths results = [] aux_graphs = {} knowledge_graph = {"nodes": copy.deepcopy(lookup_knowledge_graph["nodes"]), "edges": {}} @@ -220,6 +231,8 @@ async def shadowfax(message, guid, logger): after_edge_key = edge_key else: main_edge_key = edge_key + + # Builds results from paths according to structure for curie, curie_paths in curie_path_mapping.items(): aux_edges_list = [] before_curie_edges_list = [] @@ -247,8 +260,10 @@ async def shadowfax(message, guid, logger): if kedge_key not in aux_edges: aux_edges.append(kedge_key) if before_curie and kedge_key not in before_curie_edges: + # these edges come before the intermediate node before_curie_edges.append(kedge_key) elif kedge_key not in after_curie_edges: + # these edges come after the intermediate node after_curie_edges.append(kedge_key) aux_edges_list.append(aux_edges) before_curie_edges_list.append(before_curie_edges)