diff --git a/openapi-config.yaml b/openapi-config.yaml index 16a0edf..bd2beb9 100644 --- a/openapi-config.yaml +++ b/openapi-config.yaml @@ -11,7 +11,7 @@ servers: # url: http://127.0.0.1:5000 termsOfService: http://robokop.renci.org:7055/tos?service_long=ARAGORN&provider_long=RENCI title: ARAGORN -version: 2.6.3 +version: 2.7.0 tags: - name: translator - name: ARA @@ -23,7 +23,7 @@ x-translator: - Ranking Agent infores: infores:aragorn x-trapi: - version: 1.4.0 + version: 1.5.0 operations: - lookup - enrich_results diff --git a/requirements.txt b/requirements.txt index 8b0dffc..6191c6d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ aio-pika==8.1.1 fastapi==0.83.0 gunicorn==20.0.4 -httptools==0.5.0 +httptools==0.6.0 httpx==0.19.0 jsonschema==3.2.0 neo4j==5.0.1 @@ -10,14 +10,14 @@ pydantic==1.10.2 pytest==7.1.2 pytest-asyncio==0.15.1 pytest-dotenv==0.5.2 -pyyaml==6.0 -reasoner-pydantic==4.1.6 +pyyaml==6.0.1 +reasoner-pydantic==5.0.0 redis~=3.5.3 requests==2.28.1 uvicorn==0.17.6 -uvloop==0.17.0 +uvloop==0.19.0 opentelemetry-sdk==1.16.0 opentelemetry-instrumentation-fastapi==0.37b0 opentelemetry-exporter-jaeger==1.16.0 opentelemetry-instrumentation-httpx==0.37b0 -fakeredis<=2.10.2 \ No newline at end of file +fakeredis<=2.10.2 diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 78106a3..329153a 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -170,7 +170,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): except KeyError: return f"No query graph", 422 results_cache = ResultsCache() - override_cache = (message.get("parameters") or {}).get("override_cache") + override_cache = message.get("bypass_cache", False) override_cache = override_cache if type(override_cache) is bool else False results = None if infer: @@ -380,6 +380,21 @@ def has_unique_nodes(result): seen.add(knode_ids) return True +async def filter_promiscuous_results(response,guid): + """We have some rules like A<-treats-B-part_of->C<-part_of-D. This is saying B treats A, and D is like + B (because they are both part of C). This isn't the worst rule in the world, we find it statistically + useful. But, there are Cs that contain lllllooooootttttssss of stuff, and it creates a lot of bad results. + Not only are they bad, but they are basically the same in terms of score, so we create a lot of ties. + We are taking some approaches to fixing this in ranking, but really the results are just terrible, let's + get rid of them, but distinguish cases where the rule is doing something interesting from when it is not. + And note that "part_of" is not the only rule that follows this similarity-style pattern. The difference is + basically how many times C occurs. + What we'd really like to do is not use promiscuous nodes in the C spot (or other places really). But we + don't have a promiscuity score for the nodes, and can't really get one. """ + # Hopefully we have the query graph at this point? I know we were loosing them at some point... + print(response["message"]["query_graph"]) + print(len(response["message"]["results"])) + async def filter_repeated_nodes(response,guid): """We have some rules that include e.g. 2 chemicals. We don't want responses in which those two are the same. If you have A-B-A-C then what shows up in the ui is B-A-C which makes no sense.""" @@ -391,6 +406,8 @@ async def filter_repeated_nodes(response,guid): if len(results) != original_result_count: await filter_kgraph_orphans(response,{},guid) + + async def check_for_messages(guid, num_queries, num_previously_received=0): """Check for messages on the queue. Return a list of responses. This does not care if these are TRAPI or anything else. It does need to know whether to expect more than 1 query. @@ -690,6 +707,7 @@ async def aragorn_lookup(input_message, params, guid, infer, answer_qnode): if "knowledge_graph" not in rmessage["message"] or "results" not in rmessage["message"]: continue await filter_repeated_nodes(rmessage, guid) + #await filter_promiscuous_results(rmessage, guid) result_messages.append(rmessage) logger.info(f"{guid}: strider complete") #Clean out the repeat node stuff @@ -712,7 +730,7 @@ def merge_results_by_node_op(message, params, guid) -> (dict, int): async def strider(message, params, guid) -> (dict, int): # strider_url = os.environ.get("STRIDER_URL", "https://strider-dev.apps.renci.org/1.3/") - strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/1.4/") + strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/1.5/") #strider_url = os.environ.get("STRIDER_URL", "https://strider.transltr.io/1.3/") # select the type of query post. "test" will come from the tester @@ -756,7 +774,7 @@ async def robokop_lookup(message, params, guid, infer, question_qnode, answer_qn # For robokop, gotta normalize message = await normalize_qgraph_ids(message) if not infer: - kg_url = os.environ.get("ROBOKOPKG_URL", "https://automat.renci.org/robokopkg/1.4/") + kg_url = os.environ.get("ROBOKOPKG_URL", "https://automat.renci.org/robokopkg/1.5/") return await subservice_post("robokopkg", f"{kg_url}query", message, guid) # It's an infer, just look it up @@ -831,7 +849,7 @@ def expand_query(input_message, params, guid): async def multi_strider(messages, params, guid): #strider_url = os.environ.get("STRIDER_URL", "https://strider-dev.apps.renci.org/1.3/") - strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/1.4/") + strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/1.5/") strider_url += "multiquery" #We don't want to do subservice_post, because that assumes TRAPI in and out. @@ -847,7 +865,7 @@ def create_aux_graph(analysis): Look through the analysis edge bindings, get all the knowledge edges, and put them in an aux graph. Give it a random uuid as an id.""" aux_graph_id = str(uuid.uuid4()) - aux_graph = { "edges": [] } + aux_graph = { "edges": [] , "attributes": []} for edge_id, edgelist in analysis["edge_bindings"].items(): for edge in edgelist: aux_graph["edges"].append(edge["id"]) @@ -997,7 +1015,7 @@ def merge_answer(result_message, answer, results, qnode_ids, robokop=False): source = "infores:aragorn" analysis = { "resource_id": source, - "edge_bindings": {qedge_id:[ { "id":kid } for kid in knowledge_edge_ids ] } + "edge_bindings": {qedge_id:[ { "id":kid, "attributes": [] } for kid in knowledge_edge_ids ] } } mergedresult["analyses"].append(analysis) @@ -1052,7 +1070,7 @@ async def make_one_request(client, automat_url, message, sem): return r async def robokop_infer(input_message, guid, question_qnode, answer_qnode): - automat_url = os.environ.get("ROBOKOPKG_URL", "https://automat.transltr.io/robokopkg/1.4/") + automat_url = os.environ.get("ROBOKOPKG_URL", "https://automat.transltr.io/robokopkg/1.5/") max_conns = os.environ.get("MAX_CONNECTIONS", 5) nrules = int(os.environ.get("MAXIMUM_ROBOKOPKG_RULES", 101)) messages = expand_query(input_message, {}, guid) @@ -1182,7 +1200,7 @@ async def omnicorp(message, params, guid) -> (dict, int): with open("to_omni.json","w") as outf: json.dump(message, outf, indent=2) - url = f'{os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")}omnicorp_overlay' + url = f'{os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.5/")}omnicorp_overlay' rval, omni_status = await subservice_post("omnicorp", url, message, guid) @@ -1205,7 +1223,7 @@ async def score(message, params, guid) -> (dict, int): with open("to_score.json","w") as outf: json.dump(message, outf, indent=2) - ranker_url = os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/") + ranker_url = os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.5/") score_url = f"{ranker_url}score" return await subservice_post("score", score_url, message, guid) diff --git a/tests/test_operations.py b/tests/test_operations.py index c0bbfe6..46ef99e 100644 --- a/tests/test_operations.py +++ b/tests/test_operations.py @@ -8,13 +8,13 @@ def create_message(): "nodes": {"input": {"ids": ["MONDO:1234"]}, "output": {"categories": ["biolink:ChemicalEntity"]}}, "edges": {"e": {"subject": "output", "object": "input", "predicates": ["biolink:treats"]}}} result_message = {"query_graph": query_graph, - "knowledge_graph": {"nodes": {"MONDO:1234":{"categories":["biolink:Disease"]}}, "edges": {}}, + "knowledge_graph": {"nodes": {"MONDO:1234":{"categories":["biolink:Disease"], "attributes": []}}, "edges": {}}, "auxiliary_graphs": {}, "results": []} return result_message def add_new_node(message, new_nodes): newnode_id = f"node_{len(message['knowledge_graph']['nodes'])}" - node = {"categories": ["biolink:NamedThing"]} + node = {"categories": ["biolink:NamedThing"], "attributes": []} message["knowledge_graph"]["nodes"][newnode_id] = node new_nodes.add(newnode_id) return newnode_id @@ -22,14 +22,15 @@ def add_new_node(message, new_nodes): def add_new_edge(message, new_edges, subject, object): new_edge_id = f"edge_{len(message['knowledge_graph']['edges'])}" new_edge = {"subject": subject, "object": object, "predicate": "biolink:blahblah", - "sources": [{"resource_id": "infores:madeup", "resource_role":"primary_knowledge_source"}]} + "sources": [{"resource_id": "infores:madeup", "resource_role":"primary_knowledge_source"}], + "attributes": []} message["knowledge_graph"]["edges"][new_edge_id] = new_edge new_edges.add(new_edge_id) return new_edge_id def add_new_auxgraph(message, new_auxgraphs, edge1_id, edge2_id): new_auxgraph_id = f"auxgraph_{len(message['auxiliary_graphs'])}" - new_auxgraph = {"edges": [edge1_id, edge2_id]} + new_auxgraph = {"edges": [edge1_id, edge2_id], "attributes": []} message["auxiliary_graphs"][new_auxgraph_id] = new_auxgraph return new_auxgraph_id @@ -56,12 +57,12 @@ def add_result(message, scores=[0.5]): new_auxgraphs = set() #Add the result with a new answer node newnode_id = add_new_node(message, new_nodes) - result = {"node_bindings": {"input":[{"id":DISEASE}], "output": [{"id": newnode_id}]}, "analyses": []} + result = {"node_bindings": {"input":[{"id":DISEASE, "attributes": []}], "output": [{"id": newnode_id, "attributes":[]} ]}, "analyses": []} message["results"].append(result) for score in scores: #Make an analysis, add an edge_binding to a new creative edge new_edge_id = add_new_edge(message, new_edges, newnode_id, DISEASE) - analysis = {"edge_bindings": {"e": [{"id": new_edge_id}]}, "score": score, "resource_id": "infores:madeup"} + analysis = {"edge_bindings": {"e": [{"id": new_edge_id, "attributes": []}]}, "score": score, "resource_id": "infores:madeup"} #Add a support graph to the creative edge new_auxgraph_id = add_two_hop(message, new_edges, new_nodes, new_auxgraphs, DISEASE, newnode_id) add_support_graph_to_edge(message, new_auxgraph_id, new_edge_id) @@ -86,7 +87,7 @@ async def test_deorphaning(): #Now remove the first result message["results"] = message["results"][1:] #remove orphans - response = {"message": message} + response = {"message": message, "logs": []} pdresponse = Response(**response) await filter_kgraph_orphans(response,{}, "") #Make sure that the nodes and edges from the first result are gone diff --git a/tests/test_query_exam.py b/tests/test_query_exam.py index 1beebd5..43573f3 100644 --- a/tests/test_query_exam.py +++ b/tests/test_query_exam.py @@ -10,19 +10,19 @@ def create_result_graph(): query_graph = {"nodes": {"input": {"ids": ["MONDO:1234"]}, "output": {"categories": ["biolink:ChemicalEntity"]}}, "edges": {"e": {"subject": "output", "object": "input", "predicates": ["biolink:treats"]}}} result_message = {"query_graph": query_graph, "knowledge_graph": {"nodes":{}, "edges":{}}, "auxiliary_graphs": set(), "results": []} - pydantic_message = Response(**{"message":result_message}) + pydantic_message = Response(**{"message":result_message, "logs": []}) return pydantic_message def create_result(node_bindings: dict[str,str], edge_bindings: dict[str,str]) -> Result: """Pretend that we've had a creative mode query, and then broken it into rules. THose rules run as individual queries in strider or robokop, and this is a result.""" - analysis = Analysis(edge_bindings = {k:[EdgeBinding(id=v)] for k,v in edge_bindings.items()},resource_id="infores:KP") - result = Result(node_bindings = {k:[NodeBinding(id=v)] for k,v in node_bindings.items()}, analyses = set([analysis])) + analysis = Analysis(edge_bindings = {k:[EdgeBinding(id=v, attributes=[])] for k,v in edge_bindings.items()},resource_id="infores:KP") + result = Result(node_bindings = {k:[NodeBinding(id=v, attributes=[])] for k,v in node_bindings.items()}, analyses = set([analysis])) return result def create_pretend_knowledge_edge(subject, object, predicate, infores): """Create a pretend knowledge edge.""" - ke = {"subject":subject, "object":object, "predicate":predicate, "sources":[{"resource_id":infores, "resource_role": "primary_knowledge_source"}]} + ke = {"subject":subject, "object":object, "predicate":predicate, "sources":[{"resource_id":infores, "resource_role": "primary_knowledge_source"}], "attributes": []} return ke def test_merge_answer_creative_only(): @@ -85,7 +85,7 @@ def test_merge_answer_lookup_only(): assert len(result_message["message"]["results"][0]["analyses"][0]["edge_bindings"]) == 1 # e is the name of the query edge defined in create_result_graph() assert "e" in result_message["message"]["results"][0]["analyses"][0]["edge_bindings"] - assert result_message["message"]["results"][0]["analyses"][0]["edge_bindings"]["e"] == [{"id":"lookup:1"}, {"id":"lookup:2"}] + assert result_message["message"]["results"][0]["analyses"][0]["edge_bindings"]["e"] == [{"id":"lookup:1", "attributes": []}, {"id":"lookup:2", "attributes": []}] assert "auxiliary_graphs" not in result_message Response.parse_obj(result_message) @@ -130,7 +130,7 @@ def create_3hop_query () -> Response: "edge_1": { "subject": "i", "object": "e", "predicates": [ "biolink:has_input" ] }, "edge_2": { "subject": "e", "object": "disease", "predicates": [ "biolink:treats" ] } } } result_message = {"query_graph": query_graph, "knowledge_graph": {"nodes": {}, "edges": {}}, "auxiliary_graphs": set(), "results": []} - pydantic_message = Response(**{"message": result_message}) + pydantic_message = Response(**{"message": result_message, "logs": []}) return pydantic_message @@ -162,8 +162,8 @@ async def test_filter_repeats(): def test_create_aux_graph(): """Given an analysis object with multiple edge bindings, test that create_aux_graph() returns a valid aux graph.""" - eb1 = EdgeBinding(id = 'eb1') - eb2 = EdgeBinding(id = 'eb2') + eb1 = EdgeBinding(id = 'eb1', attributes=[]) + eb2 = EdgeBinding(id = 'eb2', attributes=[]) analysis = Analysis(resource_id = "example.com", edge_bindings = {"qedge1":[eb1], "qedge2":[eb2]}) agid, aux_graph = create_aux_graph(analysis.to_dict()) assert len(agid) == 36