Skip to content

Commit

Permalink
update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
uhbrar committed Nov 25, 2024
1 parent 44be292 commit e3de1bb
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 15 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ A tool to query Knowledge Providers (KPs) and synthesize highly ranked answers r
* Bridges the precision mismatch between data specificity in KPs and more abstract levels of user queries.
* Generalizes answer ranking.
* Normalizes data to use preferred and equivalent identifiers.
* Uses mined rules to perform inference and provide predicted answers for certain queries.
* Provides a way to run pathfinder queries, using literature co-occurrence to find multi-hop paths between any two nodes.

The ARAGORN tool relies on a number of external services to perform a standardized ranking of a user-specified question.

- Strider - Accepts a query and provides knowledge-provider querying, answer generation and ranking.
- Answer Coalesce - Accepts a query containing Strider answers and returns answers that have been coalesced by property, graph and/or ontology analysis.
- Node normalization - A Translator SRI service that provides the preferred CURIE and equivalent identifiers for data in the query.
- ARAGORN Ranker - Accepts a query and provides Omnicorp overlays, score and weight-correctness rankings of coalesced answers.
- ARAGORN Ranker - Accepts a query and provides Omnicorp overlays and scores of answers.

## Demonstration

Expand Down
2 changes: 2 additions & 0 deletions src/pathfinder/get_cooccurrence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")

def get_the_pmids(curies: List[str]):
"""Returns a list of pmids for papers that mention all curies in list"""
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
Expand All @@ -31,6 +32,7 @@ def get_the_pmids(curies: List[str]):
return answer

def get_the_curies(pmid: str):
"""Returns a list of all curies in the paper corresponding to the pmid."""
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
Expand Down
29 changes: 18 additions & 11 deletions src/service_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ async def aragorn_lookup(input_message, params, guid, infer, pathfinder, answer_
timeout_seconds = (input_message.get("parameters") or {}).get("timeout_seconds")
if timeout_seconds:
params["timeout_seconds"] = timeout_seconds if type(timeout_seconds) is int else 3 * 60
# If pathfinder, run shadowfax
if pathfinder:
return await shadowfax(input_message, guid, logger)
if not infer:
Expand Down Expand Up @@ -824,6 +825,7 @@ def merge_results_by_node_op(message, params, guid) -> (dict, int):


async def strider(message, params, guid, bypass_cache) -> (dict, int):
"""Runs a single strider query."""
strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/")

# select the type of query post. "test" will come from the tester
Expand All @@ -841,6 +843,7 @@ async def strider(message, params, guid, bypass_cache) -> (dict, int):


async def normalize_qgraph_ids(m):
"""Normalizes qgraph so ids are the ones referred to by node norm."""
url = f'{os.environ.get("NODENORM_URL", "https://nodenormalization-sri.renci.org/")}get_normalized_nodes'
qnodes = m["message"]["query_graph"]["nodes"]
qnode_ids = set()
Expand Down Expand Up @@ -880,7 +883,9 @@ def get_infer_parameters(input_message):
qualifiers: the qualifiers of the inferred edge
source: the query node id of the source node
target: the query node id of the target node
source_input: True if the source node is the input node, False if the target node is the input node"""
source_input: True if the source node is the input node, False if the target node is the input node
mcq: True if this is a set input query, false otherwise
member_ids: The members of the set"""
for edge_id, edge in input_message["message"]["query_graph"]["edges"].items():
source = edge["subject"]
target = edge["object"]
Expand Down Expand Up @@ -917,22 +922,23 @@ def get_rule_key(predicate, qualifiers, mcq):
return json.dumps(keydict,sort_keys=True)

def expand_query(input_message, params, guid):
#Contract: 1. there is a single edge in the query graph 2. The edge is marked inferred. 3. Either the source
"""Expands query based on rules"""
# Contract: 1. there is a single edge in the query graph 2. The edge is marked inferred. 3. Either the source
# or the target has IDs, but not both. 4. The number of ids on the query node is 1.
input_id, predicate, qualifiers, source, source_input, target, qedge_id, mcq, member_ids = get_infer_parameters(input_message)
key = get_rule_key(predicate, qualifiers, mcq)
#We want to run the non-inferred version of the query as well
# We want to run the non-inferred version of the query as well
qg = deepcopy(input_message["message"]["query_graph"])
for eid,edge in qg["edges"].items():
del edge["knowledge_type"]
messages = [{"message": {"query_graph":qg}, "parameters": input_message.get("parameters") or {}}]
#If it's an MCQ, then we also copy the KG which has the member_of edges
# If it's an MCQ, then we also copy the KG which has the member_of edges
if mcq:
messages[0]["message"]["knowledge_graph"] = deepcopy(input_message["message"]["knowledge_graph"])
#If we don't have any AMIE expansions, this will just generate the direct query
# If we don't have any AMIE expansions, this will just generate the direct query
for rule_def in AMIE_EXPANSIONS.get(key,[]):
query_template = Template(json.dumps(rule_def["template"]))
#need to do a bit of surgery depending on what the input is.
# need to do a bit of surgery depending on what the input is.
if source_input:
qs = query_template.substitute(source=source,target=target,source_id = input_id, target_id='')
else:
Expand Down Expand Up @@ -961,12 +967,13 @@ def expand_query(input_message, params, guid):


async def multi_strider(messages, params, guid, bypass_cache):
"""Runs multi strider"""
strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/")

strider_url += "multiquery"
#We don't want to do subservice_post, because that assumes TRAPI in and out.
#it leads to confusion.
#response, status_code = await subservice_post("strider", strider_url, messages, guid, asyncquery=True)
# We don't want to do subservice_post, because that assumes TRAPI in and out.
# it leads to confusion.
# response, status_code = await subservice_post("strider", strider_url, messages, guid, asyncquery=True)
responses = await post_with_callback(strider_url,messages,guid,params,bypass_cache)

return responses
Expand All @@ -988,9 +995,9 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer, robokop):
"""Create a new knowledge edge in the result message, with the aux graph ids as support."""
# Find the subject, object, and predicate of the original query
query_graph = result_message["message"]["query_graph"]
#get the first key and value from the edges
# get the first key and value from the edges
qedge_id, qedge = next(iter(query_graph["edges"].items()))
#For the nodes, if there is an id, then use it in the knowledge edge. If there is not, then use the answer
# For the nodes, if there is an id, then use it in the knowledge edge. If there is not, then use the answer
qnode_subject_id = qedge["subject"]
qnode_object_id = qedge["object"]
if "ids" in query_graph["nodes"][qnode_subject_id] and query_graph["nodes"][qnode_subject_id]["ids"] is not None:
Expand Down
19 changes: 17 additions & 2 deletions src/shadowfax.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from collections import defaultdict
import copy
import json
import uuid
import hashlib
import os
Expand All @@ -17,6 +18,7 @@
TOTAL_PUBS = 27840000

async def generate_from_strider(message):
"""Generates knowledge graphs from strider."""
try:
async with httpx.AsyncClient(timeout=3600) as client:
lookup_response = await client.post(
Expand All @@ -31,6 +33,8 @@ async def generate_from_strider(message):


async def get_normalized_curies(curies, guid, logger):
"""Gives us normalized curies that we can look up in our database, assuming
the database is also properly normalized."""
async with httpx.AsyncClient(timeout=900) as client:
try:
normalizer_response = await client.post(
Expand All @@ -49,6 +53,9 @@ async def get_normalized_curies(curies, guid, logger):


async def shadowfax(message, guid, logger):
"""Processes pathfinder queries. This is done by using literature co-occurrence
to find nodes that occur in publications with our input nodes, then finding
paths that connect our input nodes through these intermediate nodes."""
qgraph = message["message"]["query_graph"]
pinned_node_ids = []
for node in qgraph["nodes"].values():
Expand All @@ -69,13 +76,15 @@ async def shadowfax(message, guid, logger):
target_equivalent_ids = [i["identifier"] for i in normalized_pinned_ids.get(pinned_node_ids[1], {"equivalent_identifiers": []})["equivalent_identifiers"]]
target_category = normalized_pinned_ids.get(pinned_node_ids[1], {"type": ["biolink:NamedThing"]})["type"][0]

# Find shared publications between input nodes
source_pubs = len(get_the_pmids([source_node]))
target_pubs = len(get_the_pmids([target_node]))
pairwise_pubs = get_the_pmids([source_node, target_node])
if source_pubs == 0 or target_pubs == 0 or len(pairwise_pubs) == 0:
logger.info(f"{guid}: No publications found.")
return message, 200


# Find other nodes from those shared publications
curies = set()
for pub in pairwise_pubs:
curie_list = get_the_curies(pub)
Expand Down Expand Up @@ -105,6 +114,7 @@ async def shadowfax(message, guid, logger):
)
/ 10000))

# Find the nodes with most significant co-occurrence
pruned_curies = []
while len(pruned_curies) < 50 and len(pruned_curies) < len(curies):
max_cov = 0
Expand Down Expand Up @@ -174,6 +184,7 @@ async def shadowfax(message, guid, logger):
lookup_knowledge_graph = merged_lookup_message_dict.get("knowledge_graph", {"nodes": {}, "edges": {}})
lookup_aux_graphs = merged_lookup_message_dict.get("auxiliary_graphs", {})

# Build a large kg and find paths
kg = networkx.Graph()
kg.add_nodes_from([source_node, target_node])
for lookup_result in lookup_results:
Expand All @@ -199,8 +210,8 @@ async def shadowfax(message, guid, logger):
# Handles constraints
if unpinned_node_category in curie_info[curie]["categories"]:
curie_path_mapping[curie].append(path)
logger.debug(f"{guid}: Found {num_paths} paths.")

# Build knowledge graph from paths
results = []
aux_graphs = {}
knowledge_graph = {"nodes": copy.deepcopy(lookup_knowledge_graph["nodes"]), "edges": {}}
Expand All @@ -220,6 +231,8 @@ async def shadowfax(message, guid, logger):
after_edge_key = edge_key
else:
main_edge_key = edge_key

# Builds results from paths according to structure
for curie, curie_paths in curie_path_mapping.items():
aux_edges_list = []
before_curie_edges_list = []
Expand Down Expand Up @@ -247,8 +260,10 @@ async def shadowfax(message, guid, logger):
if kedge_key not in aux_edges:
aux_edges.append(kedge_key)
if before_curie and kedge_key not in before_curie_edges:
# these edges come before the intermediate node
before_curie_edges.append(kedge_key)
elif kedge_key not in after_curie_edges:
# these edges come after the intermediate node
after_curie_edges.append(kedge_key)
aux_edges_list.append(aux_edges)
before_curie_edges_list.append(before_curie_edges)
Expand Down

0 comments on commit e3de1bb

Please sign in to comment.