Skip to content

Commit

Permalink
Merge pull request #279 from ranking-agent/shadowfax
Browse files Browse the repository at this point in the history
Shadowfax
  • Loading branch information
uhbrar authored Sep 9, 2024
2 parents dabdf88 + 3565aab commit 0f6bc64
Show file tree
Hide file tree
Showing 7 changed files with 616 additions and 13 deletions.
56 changes: 56 additions & 0 deletions documentation/pathfinder_example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"message": {
"query_graph": {
"nodes": {
"n0": {
"ids": [
"PUBCHEM.COMPOUND:5291"
],
"name": "imatinib"
},
"n1": {
"ids": [
"MONDO:0004979"
],
"name": "asthma"
},
"un": {
"categories": [
"biolink:NamedThing"
]
}
},
"edges": {
"e0": {
"subject": "n0",
"object": "n1",
"predicates": [
"biolink:related_to"
],
"knowledge_type": "inferred"
},
"e1": {
"subject": "n0",
"object": "un",
"predicates": [
"biolink:related_to"
],
"knowledge_type": "inferred"
},
"e2": {
"subject": "n1",
"object": "un",
"predicates": [
"biolink:related_to"
],
"knowledge_type": "inferred"
}
}
},
"knowledge_graph": {
"nodes": {},
"edges": {}
},
"results": []
}
}
2 changes: 1 addition & 1 deletion openapi-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ servers:
# url: http://127.0.0.1:5000
termsOfService: http://robokop.renci.org:7055/tos?service_long=ARAGORN&provider_long=RENCI
title: ARAGORN
version: 2.8.4
version: 2.9.0
tags:
- name: translator
- name: ARA
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ opentelemetry-instrumentation-fastapi==0.37b0
opentelemetry-exporter-jaeger==1.16.0
opentelemetry-instrumentation-httpx==0.37b0
fakeredis<=2.10.2
networkx==3.3
46 changes: 46 additions & 0 deletions src/pathfinder/get_cooccurrence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Get more than pairwise literature cooccurence for a given list of curies."""
import json
import gzip
import os
import redis
import time
from typing import List

REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = os.environ.get("REDIS_PORT", 6379)
PMIDS_DB = os.environ.get("PMIDS_DB", 1)
CURIES_DB = os.environ.get("CURIES_DB", 2)
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")

def get_the_pmids(curies: List[str]):
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=PMIDS_DB,
password=REDIS_PASSWORD
)
curie_pmids = []
for curie in curies:
pmids = r.get(curie)
if pmids is None:
pmids = []
else:
pmids = json.loads(gzip.decompress(pmids))
curie_pmids.append(pmids)
answer = list(set.intersection(*map(set, curie_pmids)))
return answer

def get_the_curies(pmid: str):
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=CURIES_DB,
password=REDIS_PASSWORD
)
curies = r.get(pmid)
if curies is None:
curies = []
else:
curies = json.loads(gzip.decompress(curies))
answer = list(curies)
return answer
34 changes: 24 additions & 10 deletions src/service_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from asyncio.exceptions import TimeoutError
from reasoner_pydantic import Query, KnowledgeGraph, QueryGraph
from reasoner_pydantic import Response as PDResponse
from src.shadowfax import shadowfax
import uuid

DUMPTRUCK = False
Expand All @@ -47,6 +48,8 @@ def examine_query(message):
# queries that are any shape with all lookup edges
# OR
# A 1-hop infer query.
# OR
# Pathfinder query
try:
# this can still fail if the input looks like e.g.:
# "query_graph": None
Expand All @@ -57,13 +60,14 @@ def examine_query(message):
for edge_id in qedges:
if qedges.get(edge_id, {}).get("knowledge_type", "lookup") == "inferred":
n_infer_edges += 1
if n_infer_edges > 1:
pathfinder = n_infer_edges == 3
if n_infer_edges > 1 and n_infer_edges and not pathfinder:
raise Exception("Only a single infer edge is supported", 400)
if (n_infer_edges > 0) and (n_infer_edges < len(qedges)):
raise Exception("Mixed infer and lookup queries not supported", 400)
infer = n_infer_edges == 1
if not infer:
return infer, None, None
return infer, None, None, pathfinder
qnodes = message.get("message", {}).get("query_graph", {}).get("nodes", {})
question_node = None
answer_node = None
Expand All @@ -76,7 +80,7 @@ def examine_query(message):
raise Exception("Both nodes of creative edge pinned", 400)
if question_node is None:
raise Exception("No nodes of creative edge pinned", 400)
return infer, question_node, answer_node
return infer, question_node, answer_node, pathfinder


def match_results_to_query(results, query_message, query_source, query_target, query_qedge_id):
Expand Down Expand Up @@ -111,7 +115,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
"""

try:
infer, question_qnode, answer_qnode = examine_query(message)
infer, question_qnode, answer_qnode, pathfinder = examine_query(message)
except Exception as e:
print(e)
return None, 500
Expand All @@ -127,7 +131,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
# e.g. our score operation will include both weighting and scoring for now.
# Also gives us a place to handle function specific logic
known_operations = {
"lookup": partial(lookup, caller=caller, infer=infer, answer_qnode=answer_qnode, question_qnode=question_qnode, bypass_cache=bypass_cache),
"lookup": partial(lookup, caller=caller, infer=infer, pathfinder=pathfinder, answer_qnode=answer_qnode, question_qnode=question_qnode, bypass_cache=bypass_cache),
"enrich_results": partial(answercoalesce, coalesce_type=coalesce_type),
"overlay_connect_knodes": omnicorp,
"score": score,
Expand All @@ -154,6 +158,13 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
{"id": "score"},
{"id": "filter_message_top_n", "parameters": {"max_results": 500}},
]
elif pathfinder:
workflow_def = [
{"id": "lookup"},
{"id": "overlay_connect_knodes"},
{"id": "score"},
{"id": "filter_message_top_n", "parameters": {"max_results": 500}}
]
else:
# TODO: if this is robokop, need to normalize.
workflow_def = [
Expand All @@ -171,7 +182,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
# We told the world what we can do!
# Workflow will be a list of the functions, and the parameters if there are any

read_from_cache = not (bypass_cache or overwrite_cache)
read_from_cache = not (bypass_cache or overwrite_cache) and not pathfinder

try:
query_graph = message["message"]["query_graph"]
Expand Down Expand Up @@ -221,7 +232,8 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
if overwrite_cache or (not bypass_cache):
if infer:
results_cache.set_result(input_id, predicate, qualifiers, source_input, caller, workflow_def, mcq, member_ids, final_answer)
elif {"id": "lookup"} in workflow_def:
elif {"id": "lookup"} in workflow_def and not pathfinder:
# We won't cache pathfinder results for now
results_cache.set_lookup_result(workflow_def, query_graph, final_answer)

# return the answer
Expand Down Expand Up @@ -708,7 +720,7 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params={})
return ret_val, status_code


async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qnode=None, question_qnode=None, bypass_cache=False) -> (dict, int):
async def lookup(message, params, guid, infer=False, pathfinder=False, caller="ARAGORN", answer_qnode=None, question_qnode=None, bypass_cache=False) -> (dict, int):
"""
Performs lookup, parameterized by ARAGORN/ROBOKOP and whether the query is an infer type query
Expand All @@ -720,7 +732,7 @@ async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qn
"""
message = await normalize_qgraph_ids(message)
if caller == "ARAGORN":
return await aragorn_lookup(message, params, guid, infer, answer_qnode, bypass_cache)
return await aragorn_lookup(message, params, guid, infer, pathfinder, answer_qnode, bypass_cache)
elif caller == "ROBOKOP":
robo_results, robo_status = await robokop_lookup(message, params, guid, infer, question_qnode, answer_qnode)
return await add_provenance(robo_results), robo_status
Expand Down Expand Up @@ -755,10 +767,12 @@ async def de_noneify(message):
await de_noneify(item)


async def aragorn_lookup(input_message, params, guid, infer, answer_qnode, bypass_cache):
async def aragorn_lookup(input_message, params, guid, infer, pathfinder, answer_qnode, bypass_cache):
timeout_seconds = (input_message.get("parameters") or {}).get("timeout_seconds")
if timeout_seconds:
params["timeout_seconds"] = timeout_seconds if type(timeout_seconds) is int else 3 * 60
if pathfinder:
return await shadowfax(input_message, guid, logger)
if not infer:
return await strider(input_message, params, guid, bypass_cache)
# Now it's an infer query.
Expand Down
Loading

0 comments on commit 0f6bc64

Please sign in to comment.