Skip to content

Commit

Permalink
Merge pull request #260 from RobokopU24/Issue257
Browse files Browse the repository at this point in the history
Add in all Monarch KG edge properties on ingest.
  • Loading branch information
beasleyjonm authored Nov 21, 2024
2 parents 52149ee + 224d0b5 commit 1867684
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions parsers/monarchkg/src/loadMonarchKG.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import os
import tarfile
import orjson
import requests

from Common.loader_interface import SourceDataLoader
from Common.kgxmodel import kgxedge
from Common.biolink_constants import *
from Common.utils import GetData
from Common.utils import GetData, GetDataPullError


##############
Expand All @@ -29,7 +30,7 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):

# there is a /latest/ for this url, but without a valid get_latest_source_version function,
# it could create a mismatch, pin to this version for now
self.data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/2024-03-18/'
self.data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/latest/'
self.monarch_graph_archive = 'monarch-kg.jsonl.tar.gz'
self.monarch_edge_file_archive_path = 'monarch-kg_edges.jsonl'
self.data_files = [self.monarch_graph_archive]
Expand Down Expand Up @@ -63,9 +64,17 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):
}

def get_latest_source_version(self) -> str:
# possible to retrieve from /latest/index.html with beautifulsoup or some html parser but not ideal,
# planning to try to set up a better method with owners
latest_version = '2024-03-18'
"""
Gets the name of latest monarch kg version from metadata.
"""
latest_version = None
try:
metadata_yaml : requests.Response = requests.get("https://data.monarchinitiative.org/monarch-kg-dev/latest/metadata.yaml")
for line in metadata_yaml.text.split('\n'):
if("kg-version:" in line): latest_version = line.replace("kg-version:","").strip()
if(latest_version==None):raise ValueError("Cannot find 'kg-version' in Monarch KG metadata yaml.")
except Exception as e:
raise GetDataPullError(error_message=f'Unable to determine latest version for Monarch KG: {e}')
return latest_version

def get_data(self) -> bool:
Expand All @@ -85,6 +94,10 @@ def parse_data(self) -> dict:
skipped_ignore_knowledge_source = 0
skipped_undesired_predicate = 0
full_tar_path = os.path.join(self.data_path, self.monarch_graph_archive)
protected_edge_labels = [SUBJECT_ID, OBJECT_ID, PREDICATE,PRIMARY_KNOWLEDGE_SOURCE,
AGGREGATOR_KNOWLEDGE_SOURCES, KNOWLEDGE_LEVEL, AGENT_TYPE,
PUBLICATIONS, "biolink:primary_knowledge_source", "biolink:aggregator_knowledge_source"]

with tarfile.open(full_tar_path, 'r') as tar_files:
with tar_files.extractfile(self.monarch_edge_file_archive_path) as edges_file:
for line in edges_file:
Expand Down Expand Up @@ -116,13 +129,14 @@ def parse_data(self) -> dict:
KNOWLEDGE_LEVEL: monarch_edge[KNOWLEDGE_LEVEL] if KNOWLEDGE_LEVEL in monarch_edge else NOT_PROVIDED,
AGENT_TYPE: monarch_edge[AGENT_TYPE] if AGENT_TYPE in monarch_edge else NOT_PROVIDED
}

if monarch_edge[PUBLICATIONS]:
edge_properties[PUBLICATIONS] = monarch_edge[PUBLICATIONS]

for edge_attribute in monarch_edge:
if '_qualifier' in edge_attribute and monarch_edge[edge_attribute]:
if edge_attribute not in protected_edge_labels and monarch_edge[edge_attribute]:
edge_properties[edge_attribute] = monarch_edge[edge_attribute]
elif edge_attribute == QUALIFIED_PREDICATE and monarch_edge[QUALIFIED_PREDICATE]:
edge_properties[QUALIFIED_PREDICATE] = monarch_edge[QUALIFIED_PREDICATE]

output_edge = kgxedge(
subject_id=subject_id,
predicate=predicate,
Expand Down

0 comments on commit 1867684

Please sign in to comment.