diff --git a/parsers/monarchkg/src/loadMonarchKG.py b/parsers/monarchkg/src/loadMonarchKG.py index 2417cb2f..73b4ee10 100644 --- a/parsers/monarchkg/src/loadMonarchKG.py +++ b/parsers/monarchkg/src/loadMonarchKG.py @@ -2,11 +2,12 @@ import os import tarfile import orjson +import requests from Common.loader_interface import SourceDataLoader from Common.kgxmodel import kgxedge from Common.biolink_constants import * -from Common.utils import GetData +from Common.utils import GetData, GetDataPullError ############## @@ -29,7 +30,7 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): # there is a /latest/ for this url, but without a valid get_latest_source_version function, # it could create a mismatch, pin to this version for now - self.data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/2024-03-18/' + self.data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/latest/' self.monarch_graph_archive = 'monarch-kg.jsonl.tar.gz' self.monarch_edge_file_archive_path = 'monarch-kg_edges.jsonl' self.data_files = [self.monarch_graph_archive] @@ -63,9 +64,17 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None): } def get_latest_source_version(self) -> str: - # possible to retrieve from /latest/index.html with beautifulsoup or some html parser but not ideal, - # planning to try to set up a better method with owners - latest_version = '2024-03-18' + """ + Gets the name of latest monarch kg version from metadata. + """ + latest_version = None + try: + metadata_yaml : requests.Response = requests.get("https://data.monarchinitiative.org/monarch-kg-dev/latest/metadata.yaml") + for line in metadata_yaml.text.split('\n'): + if("kg-version:" in line): latest_version = line.replace("kg-version:","").strip() + if(latest_version==None):raise ValueError("Cannot find 'kg-version' in Monarch KG metadata yaml.") + except Exception as e: + raise GetDataPullError(error_message=f'Unable to determine latest version for Monarch KG: {e}') return latest_version def get_data(self) -> bool: @@ -85,6 +94,10 @@ def parse_data(self) -> dict: skipped_ignore_knowledge_source = 0 skipped_undesired_predicate = 0 full_tar_path = os.path.join(self.data_path, self.monarch_graph_archive) + protected_edge_labels = [SUBJECT_ID, OBJECT_ID, PREDICATE,PRIMARY_KNOWLEDGE_SOURCE, + AGGREGATOR_KNOWLEDGE_SOURCES, KNOWLEDGE_LEVEL, AGENT_TYPE, + PUBLICATIONS, "biolink:primary_knowledge_source", "biolink:aggregator_knowledge_source"] + with tarfile.open(full_tar_path, 'r') as tar_files: with tar_files.extractfile(self.monarch_edge_file_archive_path) as edges_file: for line in edges_file: @@ -116,13 +129,14 @@ def parse_data(self) -> dict: KNOWLEDGE_LEVEL: monarch_edge[KNOWLEDGE_LEVEL] if KNOWLEDGE_LEVEL in monarch_edge else NOT_PROVIDED, AGENT_TYPE: monarch_edge[AGENT_TYPE] if AGENT_TYPE in monarch_edge else NOT_PROVIDED } + if monarch_edge[PUBLICATIONS]: edge_properties[PUBLICATIONS] = monarch_edge[PUBLICATIONS] + for edge_attribute in monarch_edge: - if '_qualifier' in edge_attribute and monarch_edge[edge_attribute]: + if edge_attribute not in protected_edge_labels and monarch_edge[edge_attribute]: edge_properties[edge_attribute] = monarch_edge[edge_attribute] - elif edge_attribute == QUALIFIED_PREDICATE and monarch_edge[QUALIFIED_PREDICATE]: - edge_properties[QUALIFIED_PREDICATE] = monarch_edge[QUALIFIED_PREDICATE] + output_edge = kgxedge( subject_id=subject_id, predicate=predicate,