Skip to content

Commit

Permalink
core: add core__observation_component_* tables
Browse files Browse the repository at this point in the history
New features:
- The headline features are new core__observation_component_* tables,
  holding Observation.component rows (with code, dataAbsentReason,
  interpretation, valueCodeableConcept, and valueQuantity tables)
- In core__observation, add a valueQuantity_code_system alias (but
  keep the old-name field of valueQuantity_system around to avoid
  breaking consumers)
- In core__observation_vital_signs, add valueQuantity_* fields
- For database backends, add proper table schema parsing, to allow
  deeper field schema checks (i.e. we can now ask if field.sub.sub
  exists, rather than just field.sub)

Bug fixes:
- For core__documentreference, the "date" field was always NULL.
  Now it should be filled in, as a day datetime field.
  • Loading branch information
mikix committed Apr 19, 2024
1 parent 2041d21 commit 9cd73e5
Show file tree
Hide file tree
Showing 31 changed files with 1,290 additions and 394 deletions.
5 changes: 5 additions & 0 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ schema =
'category': {
'coding': True, 'code': True, 'display': True, 'system': True, 'text': True
},
'component': {
'valueQuantity': {
'code': True, 'comparator': True, 'system': True, 'unit': True, 'value': True
},
},
'status': True,
'code': {
'coding': True, 'code': True, 'display': True, 'system': True, 'text': True
Expand Down
171 changes: 120 additions & 51 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import os
import sys
from pathlib import Path
from typing import Protocol
from typing import Any, Protocol

import cumulus_fhir_support
import duckdb
Expand Down Expand Up @@ -44,60 +44,77 @@ def fetchall(self) -> list[list] | None:
class DatabaseParser(abc.ABC):
"""Parses information_schema results from a database"""

def _parse_found_schema(
self, expected: dict[dict[list]], schema: dict[list]
) -> dict:
"""Checks for presence of field for each column in a table
@abc.abstractmethod
def parse_found_schema(self, schema: dict[str, str]) -> dict:
"""Parses a database-provided schema string.
:param expected: A nested dict describing the expected data format of
a table. Expected format is like this:
:param schema: the results of a query from the get_column_datatype method
of the template_sql.templates function. It looks like this (for athena at
least, but the values are opaque strings and database-provider-specific):
{
"object_col":["member_a", "member_b"]
"primitive_col": []
'object_col': 'row(member_a varchar, member_b date)',
'primitive_col': 'varchar',
}
:returns: a dictionary with an entry for every field present in the schema.
For the above example, this should return:
{
'object_col': {
'member_a': {},
'member_b': {},
},
'primitive_col': {},
}
:param schema: the results of a query from the get_column_datatype method
of the template_sql.templates function. It looks like this:
[
('object_col', 'row(member_a VARCHAR, member_b DATE)'),
('primitive_col', 'VARCHAR')
]
The actual contents of schema are database dependent. This naive method
is a first pass, ignoring complexities of differing database variable
types, just iterating through looking for column names.
TODO: on a per database instance, consider a more nuanced approach if needed
(compared to just checking if the schema contains the field name)
"""

def _recursively_validate(
self, expected: dict[str, Any], schema: dict[str, Any]
) -> dict[str, Any]:
schema = schema or {}
output = {}

for column, fields in expected.items():
column_lower = column.lower()

# is this an object column? (like: "subject": ["reference"])
if fields:
col_schema = schema.get(column_lower, "").lower()
output[column] = {
# TODO: make this check more robust
field: field.lower() in col_schema
for field in fields
}

# otherwise this is a primitive col (like: "recordedDate": None)
col_schema = schema.get(column.lower())

# Is `fields` an falsy? (like: "recordedDate": None or [] or {})
# This means we just want to check existance of `column`
# otherwise this is a primitive col
if not fields:
output[column] = col_schema is not None

# Is `fields` a list? (like: "subject": ["reference"])
# This means we should check existance of all mentioned children.
elif isinstance(fields, list):
for field in fields:
subschema = self._recursively_validate({field: None}, col_schema)
output.setdefault(column, {}).update(subschema)

# Is `fields` a dict? (like: "component": {"valueQuantity": ["unit", "value"]})
# This means we should descend one level
elif isinstance(fields, dict):
subschema = self._recursively_validate(fields, col_schema)
output[column] = subschema

else:
output[column] = column_lower in schema
raise ValueError("Bad expected schema provided")

return output

@abc.abstractmethod
def validate_table_schema(
self, expected: dict[str, list[str]], schema: list[tuple]
self, expected: dict[str, list], schema: list[tuple]
) -> dict:
"""Public interface for investigating if fields are in a table schema.
expected is a dictionary of string column names to *something*:
- falsy (like None or []): just check that the column exists
- list of strings: check all the column children exist
- dict of a new child 'expected' dictionary, with same above rules
This method should lightly format results and pass them to
_parse_found_schema(), or a more bespoke table analysis function if needed.
parse_found_schema(), or a more bespoke table analysis function if needed.
"""
parsed_schema = self.parse_found_schema(dict(schema))
return self._recursively_validate(expected, parsed_schema)


class DatabaseBackend(abc.ABC):
Expand Down Expand Up @@ -179,11 +196,53 @@ def close(self) -> None:


class AthenaParser(DatabaseParser):
def validate_table_schema(
self, expected: dict[dict[list]], schema: list[list]
) -> dict:
schema = dict(schema)
return self._parse_found_schema(expected, schema)
def _find_type_len(self, row: str) -> int:
"""Finds the end of a type string like row(...) or array(row(...))"""
# Note that this assumes the string is well formatted.
depth = 0
for i in range(len(row)):
match row[i]:
case ",":
if depth == 0:
break
case "(":
depth += 1
case ")":
depth -= 1
return i

def _split_row(self, row: str) -> dict[str, str]:
# Must handle "name type, name row(...), name type, name row(...)"
result = {}
# Real athena doesn't add extra spaces, but our unit tests do for
# readability, so let's strip out whitespace as we parse.
while row := row.strip():
name, remainder = row.split(" ", 1)
type_len = self._find_type_len(remainder)
result[name] = remainder[0:type_len]
row = remainder[type_len + 2 :] # skip comma and space
return result

def parse_found_schema(self, schema: dict[str, str]) -> dict:
# A sample response for table `observation`, column `component`:
# array(row(code varchar, display varchar)),
# text varchar, id varchar)
parsed = {}

for key, value in schema.items():
# Strip arrays out, they don't affect the shape of our schema.
while value.startswith("array("):
value = value.removeprefix("array(")
value = value.removesuffix(")")

if value.startswith("row("):
value = value.removeprefix("row(")
value = value.removesuffix(")")
parsed[key] = self.parse_found_schema(self._split_row(value))
else:
parsed[key] = {}

return parsed


class DuckDatabaseBackend(DatabaseBackend):
Expand Down Expand Up @@ -323,16 +382,26 @@ def close(self) -> None:
class DuckDbParser(DatabaseParser):
"""Table parser for DuckDB schemas"""

def validate_table_schema(
self, expected: dict[dict[list]], schema: list[tuple]
) -> dict:
def parse_found_schema(self, schema: dict[str, str]) -> dict:
"""parses a information_schema.tables query response for valid columns"""
schema = dict(schema)
# since we're defaulting to athena naming conventions elsewhere,
# we'll lower case all the keys
schema = {k.lower(): v for k, v in schema.items()}
parsed = {}

for key, value in schema.items():
if isinstance(value, str):
# DuckDB provides a parser to go from string -> type objects
value = duckdb.typing.DuckDBPyType(value)

# Collapse lists to the contained value
if value.id == "list":
value = value.children[0][1] # [('child', CONTAINED_TYPE)]

if value.id == "struct":
result = self.parse_found_schema(dict(value.children))
else:
result = {}
parsed[key.lower()] = result

return self._parse_found_schema(expected, schema)
return parsed


def _read_rows_from_table_dir(path: str) -> list[dict]:
Expand Down
3 changes: 2 additions & 1 deletion cumulus_library/studies/core/builder_documentreference.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
"id": [],
"type": [],
"status": [],
"date": [],
"docStatus": [],
"subject": ["reference"],
"context": ["encounter", "period", "start"],
"context": {"encounter": ["reference"], "period": ["start"]},
"category": [],
}
}
Expand Down
46 changes: 38 additions & 8 deletions cumulus_library/studies/core/builder_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
expected_table_cols = {
"observation": {
"id": [],
"category": ["coding", "code", "display", "system", "text"],
"category": [],
"component": {
"code": [],
"dataAbsentReason": [],
"valueQuantity": ["code", "comparator", "system", "unit", "value"],
},
"status": [],
"code": ["coding", "code", "display", "system", "text"],
"interpretation": ["coding", "code", "display", "system", "text"],
"code": [],
"interpretation": [],
"referenceRange": [
"low",
"high",
Expand All @@ -24,7 +29,7 @@
],
"effectiveDateTime": [],
"valueQuantity": ["value", "comparator", "unit", "system", "code"],
"valueCodeableConcept": ["coding", "code", "display", "system"],
"valueCodeableConcept": [],
"valueString": [],
"subject": ["reference"],
"encounter": ["reference"],
Expand All @@ -35,9 +40,15 @@
@dataclass(kw_only=True)
class ObsConfig(sql_utils.CodeableConceptConfig):
source_table: str = "observation"
is_public: bool = False

def __post_init__(self):
self.target_table = f"core__observation_dn_{self.column_hierarchy[-1][0]}"
# Consideration for future: should all denormalized tables be public?
# For now, we'll mark the ones we want to encourage use of,
# and for those, remove the maybe-confusing denormalization tag.
table_suffix = "" if self.is_public else "dn_"
table_suffix += "_".join(c[0] for c in self.column_hierarchy)
self.target_table = f"core__observation_{table_suffix}"


class ObservationBuilder(base_table_builder.BaseTableBuilder):
Expand All @@ -59,6 +70,22 @@ def prepare_queries(
code_sources = [
ObsConfig(column_hierarchy=[("category", list)], filter_priority=False),
ObsConfig(column_hierarchy=[("code", dict)], filter_priority=False),
ObsConfig(
is_public=True,
column_hierarchy=[("component", list), ("code", dict)],
),
ObsConfig(
is_public=True,
column_hierarchy=[("component", list), ("dataabsentreason", dict)],
),
ObsConfig(
is_public=True,
column_hierarchy=[("component", list), ("interpretation", list)],
),
ObsConfig(
is_public=True,
column_hierarchy=[("component", list), ("valuecodeableconcept", dict)],
),
ObsConfig(
column_hierarchy=[("interpretation", list)], filter_priority=False
),
Expand All @@ -78,6 +105,9 @@ def prepare_queries(
validated_schema = core_templates.validate_schema(
cursor, schema, expected_table_cols, parser
)
self.queries.append(
core_templates.get_core_template("observation", validated_schema)
)
self.queries += [
core_templates.get_core_template("observation", validated_schema),
core_templates.get_core_template(
"observation_component_valuequantity", validated_schema
),
]
24 changes: 12 additions & 12 deletions cumulus_library/studies/core/core_templates/core_utils.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,26 @@ targets is an array expecting a data type of the following:
{#- depth one nested column-#}
{%- if col is not string and col|length ==3%}
{%- if schema[table][col[0]][col[1]] %}
{{ alias }}.{{ col[0] }}.{{ col[1] }} AS {{col[2].lower()}}
{{ alias }}.{{ col[0] }}.{{ col[1] }} AS {{ col[2] }}
{%- else %}
cast(NULL as varchar) AS {{ col[2].lower() }}
cast(NULL as varchar) AS {{ col[2] }}
{%- endif %}
{#- depth two nested column -#}
{%- elif col is not string and col|length ==4%}
{%- if schema[table][col[0]][col[2]] %}
{{ alias }}.{{ col[0] }}.{{ col[1] }}.{{ col[2] }} AS {{col[3].lower()}}
{%- if schema[table][col[0]][col[1]][col[2]] %}
{{ alias }}.{{ col[0] }}.{{ col[1] }}.{{ col[2] }} AS {{ col[3] }}
{%- else %}
cast(NULL as varchar) AS {{ col[3].lower()}}
cast(NULL as varchar) AS {{ col[3] }}
{%- endif %}
{#- SQL primitive column column-#}
{%- elif schema[table][col] %}
{{ alias }}.{{ col }}
{%- else %}
{#- workaround for docref date-#}
{%- if col == "date"%}
cast(NULL as timestamp) AS "{{ col.lower() }}"
cast(NULL as timestamp) AS "{{ col }}"
{%- else %}
cast(NULL as varchar) AS {{ col.lower() }}
cast(NULL as varchar) AS {{ col }}
{%- endif %}
{%- endif %}
{{- syntax.comma_delineate(loop) }}
Expand All @@ -66,20 +66,20 @@ targets is an array expecting a data type of the following:
{%- if schema[table][col[0]][col[1]] %}
date(from_iso8601_timestamp({{ alias }}.{{ col[0] }}.{{ col[1] }})) AS {{ col[2] }}
{%- else %}
cast(NULL AS date) AS {{ col[1].lower() }}
cast(NULL AS date) AS {{ col[1] }}
{% endif %}
{#- depth two nested column -#}
{%- elif col is not string and col|length ==4%}
{%- if schema[table][col[0]][col[2]] %}
{%- if schema[table][col[0]][col[1]][col[2]] %}
date(from_iso8601_timestamp({{ alias }}.{{ col[0] }}.{{ col[1] }}.{{ col[2] }})) AS {{col[3]}}
{%- else %}
cast(NULL AS date) AS {{ col[3].lower() }}
cast(NULL AS date) AS {{ col[3] }}
{%- endif %}
{#- SQL primitive column column-#}
{%- elif schema[table][col] %}
date(from_iso8601_timestamp({{ alias }}.{{ col }})) AS {{ col }}
{%- else %}
cast(NULL AS date) AS {{ col.lower() }}
cast(NULL AS date) AS {{ col }}
{%- endif %}
{{- syntax.comma_delineate(loop) }}
{%- endfor %}
Expand All @@ -103,7 +103,7 @@ targets is assumed to be a list of tuples of one of the following format:
{% endif %}
{#- depth two nested column -#}
{%- elif col is not string and col|length ==5%}
{%- if schema[table][col[0]][col[2]] %}
{%- if schema[table][col[0]][col[1]][col[2]] %}
date_trunc('{{ col[4] }}', date(from_iso8601_timestamp({{ alias }}."{{ col[0] }}"."{{ col[1] }}"."{{col[2]}}")))
AS {{ col[3] }}
{%- else %}
Expand Down
Loading

0 comments on commit 9cd73e5

Please sign in to comment.