Skip to content

Commit

Permalink
core: add core__observation_component_* tables (#215)
Browse files Browse the repository at this point in the history
* core: add core__observation_component_* tables

New features:
- The headline features are new core__observation_component_* tables,
  holding Observation.component rows (with code, dataAbsentReason,
  interpretation, valueCodeableConcept, and valueQuantity tables)
- In core__observation, add a valueQuantity_code_system alias (but
  keep the old-name field of valueQuantity_system around to avoid
  breaking consumers)
- In core__observation_vital_signs, add valueQuantity_* fields
- In core__condition, add category_code_system
- For database backends, add proper table schema parsing, to allow
  deeper field schema checks (i.e. we can now ask if field.sub.sub
  exists, rather than just field.sub)

Bug fixes:
- For core__documentreference, the "date" field was always NULL.
  Now it should be filled in, as a day datetime field.
- For core__condition, don't leave out rows which don't have all
  three of category, clinicalStatus, and verificationStatus.
  Instead, show them with null values.
- Fix CountsBuilder.write_counts() to not raise an exception

* Convert concept & coding builders to new schema approach

- Remove a lot of codeable concept expected values, when passed to a
  jinja file. The concept builders handle that themselves.
- Move validate_schema() from core_templates to sql_utils
- Have the concept & coding builders use validate_schema to check for
  field existence, and pass down the full expected field schema in
  places that need it.
- Remove deprecated valueQuantity_system, as no one should be using it
  right now.
  • Loading branch information
mikix authored Apr 23, 2024
1 parent 7dedf01 commit c578855
Show file tree
Hide file tree
Showing 50 changed files with 1,714 additions and 615 deletions.
5 changes: 5 additions & 0 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ schema =
'category': {
'coding': True, 'code': True, 'display': True, 'system': True, 'text': True
},
'component': {
'valueQuantity': {
'code': True, 'comparator': True, 'system': True, 'unit': True, 'value': True
},
},
'status': True,
'code': {
'coding': True, 'code': True, 'display': True, 'system': True, 'text': True
Expand Down
2 changes: 1 addition & 1 deletion cumulus_library/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Package metadata"""

__version__ = "2.0.1"
__version__ = "2.1.0"
172 changes: 121 additions & 51 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import os
import sys
from pathlib import Path
from typing import Protocol
from typing import Any, Protocol

import cumulus_fhir_support
import duckdb
Expand Down Expand Up @@ -44,60 +44,78 @@ def fetchall(self) -> list[list] | None:
class DatabaseParser(abc.ABC):
"""Parses information_schema results from a database"""

def _parse_found_schema(
self, expected: dict[dict[list]], schema: dict[list]
) -> dict:
"""Checks for presence of field for each column in a table
@abc.abstractmethod
def parse_found_schema(self, schema: dict[str, str]) -> dict:
"""Parses a database-provided schema string.
:param expected: A nested dict describing the expected data format of
a table. Expected format is like this:
:param schema: the results of a query from the get_column_datatype method
of the template_sql.templates function. It looks like this (for athena at
least, but the values are opaque strings and database-provider-specific):
{
"object_col":["member_a", "member_b"]
"primitive_col": []
'object_col': 'row(member_a varchar, member_b date)',
'primitive_col': 'varchar',
}
:returns: a dictionary with an entry for every field present in the schema.
For the above example, this should return:
{
'object_col': {
'member_a': {},
'member_b': {},
},
'primitive_col': {},
}
:param schema: the results of a query from the get_column_datatype method
of the template_sql.templates function. It looks like this:
[
('object_col', 'row(member_a VARCHAR, member_b DATE)'),
('primitive_col', 'VARCHAR')
]
The actual contents of schema are database dependent. This naive method
is a first pass, ignoring complexities of differing database variable
types, just iterating through looking for column names.
TODO: on a per database instance, consider a more nuanced approach if needed
(compared to just checking if the schema contains the field name)
"""

def _recursively_validate(
self, expected: dict[str, Any], schema: dict[str, Any]
) -> dict[str, Any]:
schema = schema or {}
output = {}

for column, fields in expected.items():
column_lower = column.lower()

# is this an object column? (like: "subject": ["reference"])
if fields:
col_schema = schema.get(column_lower, "").lower()
output[column] = {
# TODO: make this check more robust
field: field.lower() in col_schema
for field in fields
}

# otherwise this is a primitive col (like: "recordedDate": None)
col_schema = schema.get(column.lower())

# Is `fields` an falsy? (like: "recordedDate": None or [] or {})
# This means we just want to check existance of `column`
# otherwise this is a primitive col
if not fields:
output[column] = col_schema is not None

# Is `fields` a list? (like: "subject": ["reference"])
# This means we should check existance of all mentioned children.
elif isinstance(fields, list):
for field in fields:
subschema = self._recursively_validate({field: None}, col_schema)
output.setdefault(column, {}).update(subschema)

# Is `fields` a dict?
# Like: "component": {"valueQuantity": ["unit", "value"]}
# This means we should descend one level
elif isinstance(fields, dict):
subschema = self._recursively_validate(fields, col_schema)
output[column] = subschema

else:
output[column] = column_lower in schema
raise ValueError("Bad expected schema provided")

return output

@abc.abstractmethod
def validate_table_schema(
self, expected: dict[str, list[str]], schema: list[tuple]
self, expected: dict[str, list], schema: list[tuple]
) -> dict:
"""Public interface for investigating if fields are in a table schema.
expected is a dictionary of string column names to *something*:
- falsy (like None or []): just check that the column exists
- list of strings: check all the column children exist
- dict of a new child 'expected' dictionary, with same above rules
This method should lightly format results and pass them to
_parse_found_schema(), or a more bespoke table analysis function if needed.
parse_found_schema(), or a more bespoke table analysis function if needed.
"""
parsed_schema = self.parse_found_schema(dict(schema))
return self._recursively_validate(expected, parsed_schema)


class DatabaseBackend(abc.ABC):
Expand Down Expand Up @@ -179,11 +197,53 @@ def close(self) -> None:


class AthenaParser(DatabaseParser):
def validate_table_schema(
self, expected: dict[dict[list]], schema: list[list]
) -> dict:
schema = dict(schema)
return self._parse_found_schema(expected, schema)
def _find_type_len(self, row: str) -> int:
"""Finds the end of a type string like row(...) or array(row(...))"""
# Note that this assumes the string is well formatted.
depth = 0
for i in range(len(row)):
match row[i]:
case ",":
if depth == 0:
break
case "(":
depth += 1
case ")":
depth -= 1
return i

def _split_row(self, row: str) -> dict[str, str]:
# Must handle "name type, name row(...), name type, name row(...)"
result = {}
# Real athena doesn't add extra spaces, but our unit tests do for
# readability, so let's strip out whitespace as we parse.
while row := row.strip():
name, remainder = row.split(" ", 1)
type_len = self._find_type_len(remainder)
result[name] = remainder[0:type_len]
row = remainder[type_len + 2 :] # skip comma and space
return result

def parse_found_schema(self, schema: dict[str, str]) -> dict:
# A sample response for table `observation`, column `component`:
# array(row(code varchar, display varchar)),
# text varchar, id varchar)
parsed = {}

for key, value in schema.items():
# Strip arrays out, they don't affect the shape of our schema.
while value.startswith("array("):
value = value.removeprefix("array(")
value = value.removesuffix(")")

if value.startswith("row("):
value = value.removeprefix("row(")
value = value.removesuffix(")")
parsed[key] = self.parse_found_schema(self._split_row(value))
else:
parsed[key] = {}

return parsed


class DuckDatabaseBackend(DatabaseBackend):
Expand Down Expand Up @@ -323,16 +383,26 @@ def close(self) -> None:
class DuckDbParser(DatabaseParser):
"""Table parser for DuckDB schemas"""

def validate_table_schema(
self, expected: dict[dict[list]], schema: list[tuple]
) -> dict:
def parse_found_schema(self, schema: dict[str, str]) -> dict:
"""parses a information_schema.tables query response for valid columns"""
schema = dict(schema)
# since we're defaulting to athena naming conventions elsewhere,
# we'll lower case all the keys
schema = {k.lower(): v for k, v in schema.items()}
parsed = {}

for key, value in schema.items():
if isinstance(value, str):
# DuckDB provides a parser to go from string -> type objects
value = duckdb.typing.DuckDBPyType(value)

# Collapse lists to the contained value
if value.id == "list":
value = value.children[0][1] # [('child', CONTAINED_TYPE)]

if value.id == "struct":
result = self.parse_found_schema(dict(value.children))
else:
result = {}
parsed[key.lower()] = result

return self._parse_found_schema(expected, schema)
return parsed


def _read_rows_from_table_dir(path: str) -> list[dict]:
Expand Down
2 changes: 1 addition & 1 deletion cumulus_library/statistics/counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def write_counts(self, filepath: str):
"""
self.prepare_queries(cursor=None, schema=None)
self.comment_queries()
self.write_queries(filename=filepath)
self.write_queries(path=Path(filepath))

def prepare_queries(self, cursor: object | None = None, schema: str | None = None):
"""Stub implementing abstract base class
Expand Down
121 changes: 59 additions & 62 deletions cumulus_library/studies/core/builder_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,10 @@

expected_table_cols = {
"condition": {
"category": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
"clinicalstatus": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
"id": [],
"recordedDate": [],
"verificationstatus": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
"subject": ["reference", "display", "type"],
"encounter": ["reference", "display", "type"],
"subject": sql_utils.REFERENCE,
"encounter": sql_utils.REFERENCE,
}
}

Expand All @@ -43,38 +16,62 @@ class CoreConditionBuilder(base_table_builder.BaseTableBuilder):
display_text = "Creating Condition tables..."

def denormalize_codes(self):
preferred_config = sql_utils.CodeableConceptConfig(
source_table="condition",
source_id="id",
column_hierarchy=[("code", dict)],
target_table="core__condition_codable_concepts_display",
filter_priority=True,
code_systems=[
"http://snomed.info/sct",
"http://hl7.org/fhir/sid/icd-10-cm",
"http://hl7.org/fhir/sid/icd-9-cm",
"http://hl7.org/fhir/sid/icd-9-cm/diagnosis",
# EPIC specific systems
"urn:oid:1.2.840.114350.1.13.71.2.7.2.728286",
"urn:oid:1.2.840.114350.1.13.71.2.7.4.698084.10375",
# Spec allowed code of last resort
"http://terminology.hl7.org/CodeSystem/data-absent-reason",
],
)
self.queries.append(
base_templates.get_codeable_concept_denormalize_query(preferred_config)
)

all_config = sql_utils.CodeableConceptConfig(
source_table="condition",
source_id="id",
column_hierarchy=[("code", dict)],
target_table="core__condition_codable_concepts_all",
filter_priority=False,
)
self.queries.append(
base_templates.get_codeable_concept_denormalize_query(all_config)
)
configs = [
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("category", list)],
target_table="core__condition_dn_category",
# This is an extensible binding, and US Core already suggests three
# different code systems to pull its recommended four values from.
# So let's not filter by system here.
),
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("clinicalStatus", dict)],
target_table="core__condition_dn_clinical_status",
filter_priority=True,
code_systems=[
# Restrict to just this required binding system
"http://terminology.hl7.org/CodeSystem/condition-clinical",
],
),
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("code", dict)],
target_table="core__condition_codable_concepts_display",
filter_priority=True,
code_systems=[
"http://snomed.info/sct",
"http://hl7.org/fhir/sid/icd-10-cm",
"http://hl7.org/fhir/sid/icd-9-cm",
"http://hl7.org/fhir/sid/icd-9-cm/diagnosis",
# EPIC specific systems
"urn:oid:1.2.840.114350.1.13.71.2.7.2.728286",
"urn:oid:1.2.840.114350.1.13.71.2.7.4.698084.10375",
# Spec allowed code of last resort
"http://terminology.hl7.org/CodeSystem/data-absent-reason",
],
),
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("code", dict)],
target_table="core__condition_codable_concepts_all",
),
sql_utils.CodeableConceptConfig(
source_table="condition",
column_hierarchy=[("verificationStatus", dict)],
target_table="core__condition_dn_verification_status",
filter_priority=True,
code_systems=[
# Restrict to just this required binding system
"http://terminology.hl7.org/CodeSystem/condition-ver-status",
],
),
]
self.queries += [
base_templates.get_codeable_concept_denormalize_query(config)
for config in configs
]

def prepare_queries(
self,
Expand All @@ -85,7 +82,7 @@ def prepare_queries(
**kwargs,
):
self.denormalize_codes()
validated_schema = core_templates.validate_schema(
validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
self.queries.append(
Expand Down
Loading

0 comments on commit c578855

Please sign in to comment.