Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: add core__observation_component_* tables #215

Merged
merged 2 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ schema =
'category': {
'coding': True, 'code': True, 'display': True, 'system': True, 'text': True
},
'component': {
'valueQuantity': {
'code': True, 'comparator': True, 'system': True, 'unit': True, 'value': True
},
},
mikix marked this conversation as resolved.
Show resolved Hide resolved
'status': True,
'code': {
'coding': True, 'code': True, 'display': True, 'system': True, 'text': True
Expand Down
2 changes: 1 addition & 1 deletion cumulus_library/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Package metadata"""

__version__ = "2.0.1"
__version__ = "2.1.0"
172 changes: 121 additions & 51 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import os
import sys
from pathlib import Path
from typing import Protocol
from typing import Any, Protocol

import cumulus_fhir_support
import duckdb
Expand Down Expand Up @@ -44,60 +44,78 @@ def fetchall(self) -> list[list] | None:
class DatabaseParser(abc.ABC):
"""Parses information_schema results from a database"""

def _parse_found_schema(
self, expected: dict[dict[list]], schema: dict[list]
) -> dict:
"""Checks for presence of field for each column in a table
@abc.abstractmethod
def parse_found_schema(self, schema: dict[str, str]) -> dict:
"""Parses a database-provided schema string.
mikix marked this conversation as resolved.
Show resolved Hide resolved

:param expected: A nested dict describing the expected data format of
a table. Expected format is like this:
:param schema: the results of a query from the get_column_datatype method
of the template_sql.templates function. It looks like this (for athena at
least, but the values are opaque strings and database-provider-specific):
{
"object_col":["member_a", "member_b"]
"primitive_col": []
'object_col': 'row(member_a varchar, member_b date)',
'primitive_col': 'varchar',
}

:returns: a dictionary with an entry for every field present in the schema.
For the above example, this should return:
{
'object_col': {
'member_a': {},
'member_b': {},
},
'primitive_col': {},
}
:param schema: the results of a query from the get_column_datatype method
of the template_sql.templates function. It looks like this:
[
('object_col', 'row(member_a VARCHAR, member_b DATE)'),
('primitive_col', 'VARCHAR')
]

The actual contents of schema are database dependent. This naive method
is a first pass, ignoring complexities of differing database variable
types, just iterating through looking for column names.

TODO: on a per database instance, consider a more nuanced approach if needed
(compared to just checking if the schema contains the field name)
"""

def _recursively_validate(
self, expected: dict[str, Any], schema: dict[str, Any]
) -> dict[str, Any]:
schema = schema or {}
output = {}

for column, fields in expected.items():
column_lower = column.lower()

# is this an object column? (like: "subject": ["reference"])
if fields:
col_schema = schema.get(column_lower, "").lower()
output[column] = {
# TODO: make this check more robust
field: field.lower() in col_schema
for field in fields
}

# otherwise this is a primitive col (like: "recordedDate": None)
col_schema = schema.get(column.lower())

# Is `fields` an falsy? (like: "recordedDate": None or [] or {})
# This means we just want to check existance of `column`
# otherwise this is a primitive col
if not fields:
output[column] = col_schema is not None

# Is `fields` a list? (like: "subject": ["reference"])
# This means we should check existance of all mentioned children.
elif isinstance(fields, list):
for field in fields:
subschema = self._recursively_validate({field: None}, col_schema)
output.setdefault(column, {}).update(subschema)

# Is `fields` a dict?
# Like: "component": {"valueQuantity": ["unit", "value"]}
# This means we should descend one level
elif isinstance(fields, dict):
subschema = self._recursively_validate(fields, col_schema)
output[column] = subschema

else:
output[column] = column_lower in schema
raise ValueError("Bad expected schema provided")

return output

@abc.abstractmethod
def validate_table_schema(
self, expected: dict[str, list[str]], schema: list[tuple]
self, expected: dict[str, list], schema: list[tuple]
) -> dict:
"""Public interface for investigating if fields are in a table schema.

expected is a dictionary of string column names to *something*:
- falsy (like None or []): just check that the column exists
- list of strings: check all the column children exist
- dict of a new child 'expected' dictionary, with same above rules

This method should lightly format results and pass them to
_parse_found_schema(), or a more bespoke table analysis function if needed.
parse_found_schema(), or a more bespoke table analysis function if needed.
"""
parsed_schema = self.parse_found_schema(dict(schema))
return self._recursively_validate(expected, parsed_schema)


class DatabaseBackend(abc.ABC):
Expand Down Expand Up @@ -179,11 +197,53 @@ def close(self) -> None:


class AthenaParser(DatabaseParser):
def validate_table_schema(
self, expected: dict[dict[list]], schema: list[list]
) -> dict:
schema = dict(schema)
return self._parse_found_schema(expected, schema)
def _find_type_len(self, row: str) -> int:
"""Finds the end of a type string like row(...) or array(row(...))"""
# Note that this assumes the string is well formatted.
depth = 0
for i in range(len(row)):
match row[i]:
case ",":
if depth == 0:
break
case "(":
depth += 1
case ")":
depth -= 1
return i

def _split_row(self, row: str) -> dict[str, str]:
# Must handle "name type, name row(...), name type, name row(...)"
result = {}
# Real athena doesn't add extra spaces, but our unit tests do for
# readability, so let's strip out whitespace as we parse.
while row := row.strip():
name, remainder = row.split(" ", 1)
type_len = self._find_type_len(remainder)
result[name] = remainder[0:type_len]
row = remainder[type_len + 2 :] # skip comma and space
return result

def parse_found_schema(self, schema: dict[str, str]) -> dict:
# A sample response for table `observation`, column `component`:
# array(row(code varchar, display varchar)),
# text varchar, id varchar)
parsed = {}

for key, value in schema.items():
# Strip arrays out, they don't affect the shape of our schema.
while value.startswith("array("):
value = value.removeprefix("array(")
value = value.removesuffix(")")

if value.startswith("row("):
value = value.removeprefix("row(")
value = value.removesuffix(")")
parsed[key] = self.parse_found_schema(self._split_row(value))
else:
parsed[key] = {}

return parsed


class DuckDatabaseBackend(DatabaseBackend):
Expand Down Expand Up @@ -323,16 +383,26 @@ def close(self) -> None:
class DuckDbParser(DatabaseParser):
"""Table parser for DuckDB schemas"""

def validate_table_schema(
self, expected: dict[dict[list]], schema: list[tuple]
) -> dict:
def parse_found_schema(self, schema: dict[str, str]) -> dict:
"""parses a information_schema.tables query response for valid columns"""
schema = dict(schema)
# since we're defaulting to athena naming conventions elsewhere,
# we'll lower case all the keys
schema = {k.lower(): v for k, v in schema.items()}
parsed = {}

for key, value in schema.items():
if isinstance(value, str):
# DuckDB provides a parser to go from string -> type objects
value = duckdb.typing.DuckDBPyType(value)

# Collapse lists to the contained value
if value.id == "list":
value = value.children[0][1] # [('child', CONTAINED_TYPE)]

if value.id == "struct":
result = self.parse_found_schema(dict(value.children))
else:
result = {}
parsed[key.lower()] = result

return self._parse_found_schema(expected, schema)
return parsed


def _read_rows_from_table_dir(path: str) -> list[dict]:
Expand Down
2 changes: 1 addition & 1 deletion cumulus_library/statistics/counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def write_counts(self, filepath: str):
"""
self.prepare_queries(cursor=None, schema=None)
self.comment_queries()
self.write_queries(filename=filepath)
self.write_queries(path=Path(filepath))

def prepare_queries(self, cursor: object | None = None, schema: str | None = None):
"""Stub implementing abstract base class
Expand Down
118 changes: 59 additions & 59 deletions cumulus_library/studies/core/builder_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,11 @@

expected_table_cols = {
"condition": {
"category": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
mikix marked this conversation as resolved.
Show resolved Hide resolved
"clinicalstatus": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
"category": [],
"clinicalstatus": [],
"id": [],
"recordedDate": [],
"verificationstatus": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
"verificationstatus": [],
"subject": ["reference", "display", "type"],
"encounter": ["reference", "display", "type"],
}
Expand All @@ -43,38 +19,62 @@ class CoreConditionBuilder(base_table_builder.BaseTableBuilder):
display_text = "Creating Condition tables..."

def denormalize_codes(self):
preferred_config = sql_utils.CodeableConceptConfig(
source_table="condition",
source_id="id",
column_hierarchy=[("code", dict)],
target_table="core__condition_codable_concepts_display",
filter_priority=True,
code_systems=[
"http://snomed.info/sct",
"http://hl7.org/fhir/sid/icd-10-cm",
"http://hl7.org/fhir/sid/icd-9-cm",
"http://hl7.org/fhir/sid/icd-9-cm/diagnosis",
# EPIC specific systems
"urn:oid:1.2.840.114350.1.13.71.2.7.2.728286",
"urn:oid:1.2.840.114350.1.13.71.2.7.4.698084.10375",
# Spec allowed code of last resort
"http://terminology.hl7.org/CodeSystem/data-absent-reason",
],
)
self.queries.append(
base_templates.get_codeable_concept_denormalize_query(preferred_config)
)

all_config = sql_utils.CodeableConceptConfig(
source_table="condition",
source_id="id",
column_hierarchy=[("code", dict)],
target_table="core__condition_codable_concepts_all",
filter_priority=False,
)
self.queries.append(
base_templates.get_codeable_concept_denormalize_query(all_config)
)
configs = [
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("category", list)],
target_table="core__condition_dn_category",
# This is an extensible binding, and US Core already suggests three
# different code systems to pull its recommended four values from.
# So let's not filter by system here.
),
Comment on lines +20 to +27
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added three new denormalization tables for condition, because in testing the hypertension upgrade, I noticed my unit test conditions weren't appearing anymore. The old SQL was unnesting directly on the condition table, and so if a row didn't have any entries to unnest, it would be skipped and not make it into core__condition.

So I switched us to separate denormalization here.

sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("clinicalStatus", dict)],
target_table="core__condition_dn_clinical_status",
filter_priority=True,
code_systems=[
# Restrict to just this required binding system
"http://terminology.hl7.org/CodeSystem/condition-clinical",
],
),
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("code", dict)],
target_table="core__condition_codable_concepts_display",
filter_priority=True,
code_systems=[
"http://snomed.info/sct",
"http://hl7.org/fhir/sid/icd-10-cm",
"http://hl7.org/fhir/sid/icd-9-cm",
"http://hl7.org/fhir/sid/icd-9-cm/diagnosis",
# EPIC specific systems
"urn:oid:1.2.840.114350.1.13.71.2.7.2.728286",
"urn:oid:1.2.840.114350.1.13.71.2.7.4.698084.10375",
# Spec allowed code of last resort
"http://terminology.hl7.org/CodeSystem/data-absent-reason",
],
),
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("code", dict)],
target_table="core__condition_codable_concepts_all",
),
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("verificationStatus", dict)],
target_table="core__condition_dn_verification_status",
filter_priority=True,
code_systems=[
# Restrict to just this required binding system
"http://terminology.hl7.org/CodeSystem/condition-ver-status",
],
),
]
self.queries += [
base_templates.get_codeable_concept_denormalize_query(config)
for config in configs
]

def prepare_queries(
self,
Expand Down
3 changes: 2 additions & 1 deletion cumulus_library/studies/core/builder_documentreference.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
"id": [],
"type": [],
"status": [],
"date": [],
"docStatus": [],
"subject": ["reference"],
"context": ["encounter", "period", "start"],
"context": {"encounter": ["reference"], "period": ["start"]},
"category": [],
}
}
Expand Down
Loading