Skip to content

Commit

Permalink
Merge branch 'master' into branding-orion
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanDietzMorris authored Feb 14, 2024
2 parents 2269f42 + 2c777a2 commit 2507270
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 227 deletions.
77 changes: 45 additions & 32 deletions Common/normalization.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os
import logging
import requests
import time

from robokop_genetics.genetics_normalization import GeneticsNormalizer
from Common.node_types import *
from Common.utils import LoggingUtil

NORMALIZATION_CODE_VERSION = '1.0'
NORMALIZATION_CODE_VERSION = '1.1'


class NodeNormalizer:
Expand Down Expand Up @@ -63,6 +64,37 @@ def __init__(self,
self.sequence_variant_normalizer = None
self.variant_node_types = None

def hit_node_norm_service(self, curies, retries=0):
resp: requests.models.Response = requests.post(f'{self.node_norm_endpoint}get_normalized_nodes',
json={'curies': curies,
'conflate': self.conflate_node_types,
'drug_chemical_conflate': self.conflate_node_types,
'description': True})
if resp.status_code == 200:
# if successful return the json as an object
return resp.json()
else:
error_message = f'Node norm response code: {resp.status_code}'
if resp.status_code >= 500:
# if 5xx retry 3 times
retries += 1
if retries == 4:
error_message += ', retried 3 times, giving up..'
self.logger.error(error_message)
resp.raise_for_status()
else:
error_message += f', retrying.. (attempt {retries})'
time.sleep(retries * 3)
self.logger.error(error_message)
return self.hit_node_norm_service(curies, retries)
else:
# we should never get a legitimate 4xx response from node norm,
# crash with an error for troubleshooting
if resp.status_code == 422:
error_message += f'(curies: {curies})'
self.logger.error(error_message)
resp.raise_for_status()

def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list:
"""
This method calls the NodeNormalization web service to get the normalized identifier and name of the node.
Expand All @@ -76,12 +108,9 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list:

self.logger.debug(f'Start of normalize_node_data. items: {len(node_list)}')

# init the cache list - this accumulates all of the results from the node norm service
# init the cache - this accumulates all the results from the node norm service
cached_node_norms: dict = {}

# save the node list count to avoid grabbing it over and over
node_count: int = len(node_list)

# create a unique set of node ids
tmp_normalize: set = set([node['id'] for node in node_list])

Expand Down Expand Up @@ -111,32 +140,16 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list:
# collect a slice of records from the data frame
data_chunk: list = to_normalize[start_index: end_index]

# get the data
resp: requests.models.Response = requests.post(f'{self.node_norm_endpoint}get_normalized_nodes',
json={'curies': data_chunk,
'conflate': self.conflate_node_types,
'description': True})

# did we get a good status code
if resp.status_code == 200:
# convert json to dict
rvs: dict = resp.json()

if rvs:
# merge this list with what we have gotten so far
cached_node_norms.update(**rvs)
else:
# this is a quick fix for the API returning empty dict instead of nulls when
# none of the curies normalize
empty_responses = {curie: None for curie in data_chunk}
cached_node_norms.update(empty_responses)
# hit the node norm api
normalization_json = self.hit_node_norm_service(curies=data_chunk)
if normalization_json:
# merge the normalization results with what we have gotten so far
cached_node_norms.update(**normalization_json)
else:
# we should never get a legitimate non-200 response from node norm here, just crash with an error
error_message = f'Node norm response code: {resp.status_code}'
if resp.status_code == 422:
error_message += f'(curies: {data_chunk})'
self.logger.error(error_message)
resp.raise_for_status()
# this shouldn't happen but if the API returns an empty dict instead of nulls,
# assume none of the curies normalize
empty_responses = {curie: None for curie in data_chunk}
cached_node_norms.update(empty_responses)

# move on down the list
start_index += block_size
Expand All @@ -155,8 +168,8 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list:
self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types()

# for each node update the node with normalized information
# store the normalized IDs for later look up
while node_idx < node_count:
# store the normalized IDs in self.node_normalization_lookup for later look up
while node_idx < len(node_list):

# get the next node list item by index
current_node = node_list[node_idx]
Expand Down
12 changes: 9 additions & 3 deletions Common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ def pull_via_ftp_binary(ftp_site, ftp_dir, ftp_file):
# return the data stream
return binary

def get_ftp_file_date(self, ftp_site, ftp_dir, ftp_file) -> str:
def get_ftp_file_date(self, ftp_site, ftp_dir, ftp_file, exclude_day=False) -> str:
"""
gets the modified date of the file from the ftp site
:param ftp_site:
:param ftp_dir:
:param ftp_file:
:param exclude_day:
:return:
"""
# init the return value
Expand All @@ -180,8 +181,13 @@ def get_ftp_file_date(self, ftp_site, ftp_dir, ftp_file) -> str:

# did we get something
if len(date_val) > 0:
# return the parsed date
return dp.parse(date_val[1]).strftime('%-m_%-d_%Y')
if exclude_day:
# if exclude_day format to month_year
file_date = dp.parse(date_val[1]).strftime('%-m_%Y')
else:
# otherwise return month_day_year
file_date = dp.parse(date_val[1]).strftime('%-m_%-d_%Y')
return file_date
except Exception as e:
error_message = f'Error getting modification date for ftp file: {ftp_site}{ftp_dir}{ftp_file}. {e}'
self.logger.error(error_message)
Expand Down
2 changes: 1 addition & 1 deletion graph_specs/yeast-graph-spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ graphs:
# graph_description: 'The baseline graph from which RobokopKG and other graphs are built.'
# conflation: True # (whether to conflate node types like Genes and Proteins)
graph_name: ROBOKOP Baseline
graph_description: 'ROBOKOP (KG) is an open-source biomedical KG that supports the ROBOKOP application. This is the baseline version of that graph, which does not include knowledge sources with specific genetic variants.'
graph_description: 'The ROBOKOP Knowledge Graph (ROBOKOP KG) is an open-source biomedical KG that supports the ROBOKOP application and currently contains millions of biomedical relationships derived from dozens of integrated and harmonized biological knowledge sources and bio-ontologies. The ROBOKOP KG includes curated components of most of the Automat KGs, as well as other knowledge sources. Most of the ROBOKOP knowledge sources are curated. However, the ROBOKOP KG also includes text-mined assertions from PubMed and PubMed Central that have been derived from natural language processing (NLP). Note that text-based assertions, while providing valuable information, must be interpreted with caution, as NLP algorithms may introduce false assertions.'
graph_url: http://robokopkg.renci.org/browser/
conflation: True
output_format: neo4j
Expand Down
39 changes: 23 additions & 16 deletions parsers/Reactome/src/loadReactome.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
"output": "biolink:has_output",
"input": "biolink:has_input",
"hasEvent": "biolink:contains_process",
"normalPathway":"biolink:contains_process", #TODO Choose better biolink predicate for normalPathways/Reactions/Etc.
"normalReaction":"biolink:contains_process", #TODO Choose better biolink predicate for normalPathways/Reactions/Etc.
"normalPathway": "biolink:contains_process", #TODO Choose better biolink predicate for normalPathways/Reactions/Etc.
"normalReaction": "biolink:contains_process", #TODO Choose better biolink predicate for normalPathways/Reactions/Etc.
#"normalEntity":"biolink:contains_process", #TODO Choose better biolink predicate for normalPathways/Reactions/Etc.
"precedingEvent": "biolink:precedes",
"activeUnit": "biolink:actively_involves",
Expand All @@ -36,9 +36,9 @@
"cellType": "biolink:located_in",
"goBiologicalProcess": "biolink:subclass_of",
"disease": "biolink:disease_has_basis_in",
"regulator":"biolink:regulates",
"species":"biolink:in_taxon",
"includedLocation":"biolink:located_in"}
"regulator": "biolink:affects",
"species": "biolink:in_taxon",
"includedLocation": "biolink:located_in"}


# TODO - use something like this instead of manipulating strings for individual cases
Expand Down Expand Up @@ -99,7 +99,7 @@ class ReactomeLoader(SourceDataLoader):
source_data_url = "https://reactome.org/"
license = "https://reactome.org/license"
attribution = "https://academic.oup.com/nar/article/50/D1/D687/6426058?login=false"
parsing_version = '1.1'
parsing_version = '1.2'

def __init__(self, test_mode: bool = False, source_data_dir: str = None):
"""
Expand Down Expand Up @@ -301,7 +301,6 @@ def write_neo4j_result_to_file(self, result: neo4j.Result, reference_entity_mapp
skipped_record_count = 0
for record in result:
record_data = record.data()

node_a_id = self.process_node_from_neo4j(reference_entity_mapping, record_data['a_id'], record_data['a'], record_data['a_labels'])
node_b_id = self.process_node_from_neo4j(reference_entity_mapping, record_data['b_id'], record_data['b'], record_data['b_labels'])
if node_a_id and node_b_id:
Expand All @@ -310,19 +309,19 @@ def write_neo4j_result_to_file(self, result: neo4j.Result, reference_entity_mapp
self.process_edge_from_neo4j(node_a_id,
record_data['r_type'],
node_b_id,
regulationType='positive',
regulation_type='positive',
complex_context=record_data.get('complex_context', None))
elif any("negative" in x.lower() for x in record_data['regulationType']):
self.process_edge_from_neo4j(node_a_id,
record_data['r_type'],
node_b_id,
regulationType='negative',
regulation_type='negative',
complex_context=record_data.get('complex_context', None))
else:
self.process_edge_from_neo4j(node_a_id,
record_data['r_type'],
node_b_id,
regulationType=None,
regulation_type=None,
complex_context=record_data.get('complex_context', None))
record_count += 1
else:
Expand Down Expand Up @@ -417,10 +416,15 @@ def process_node_from_neo4j(self, reference_entity_mapping, node_identity, node:
self.dbid_to_node_id_lookup[node['dbId']] = node_id
"""

def process_edge_from_neo4j(self, subject_id: str, relationship_type: str, object_id: str, regulationType=None, complex_context=None):
def process_edge_from_neo4j(self,
subject_id: str,
relationship_type: str,
object_id: str,
regulation_type=None,
complex_context=None):
predicate = PREDICATE_MAPPING.get(relationship_type, None)
if predicate:
if regulationType is None:
if not regulation_type:
output_edge = kgxedge(
subject_id=subject_id,
object_id=object_id,
Expand All @@ -429,14 +433,17 @@ def process_edge_from_neo4j(self, subject_id: str, relationship_type: str, objec
primary_knowledge_source=self.provenance_id
)
else:
if regulationType == "positive":
if regulation_type == 'positive':
direction = 'increased'
elif regulationType == "negative":
elif regulation_type == 'negative':
direction = 'decreased'
else:
self.logger.warning(f'Unexpected regulation type encountered: {regulation_type}')
return
edge_props = {
'qualified_predicate': 'biolink:causes',
'object_direction_qualifier': direction,
'qualified_predicate': 'causes',
'object_aspect_qualifier': 'expression',
'object_direction_qualifier': direction,
}
if complex_context:
edge_props['complex_context'] = complex_context
Expand Down
Loading

0 comments on commit 2507270

Please sign in to comment.