Skip to content

Commit

Permalink
Table infrastructure, condition refactor (#163)
Browse files Browse the repository at this point in the history
* Table infrastructure, condition refactor

* Redo PR feedback & regression updates

* template utils cleanup

* updated regression data

* import style cleanup, some doc impovements
  • Loading branch information
dogversioning authored Jan 10, 2024
1 parent 4c66973 commit 660800f
Show file tree
Hide file tree
Showing 62 changed files with 1,735 additions and 1,216 deletions.
6 changes: 6 additions & 0 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ ignore_comment_lines = true
[sqlfluff:rules:capitalisation.keywords]
capitalisation_policy = upper


[sqlfluff:templater:jinja]
load_macros_from_path = cumulus_library/studies/core/core_templates

[sqlfluff:templater:jinja:context]
code_systems = ["http://snomed.info/sct", "http://hl7.org/fhir/sid/icd-10-cm"]
col_type_list = ["a string","b string"]
Expand All @@ -23,6 +27,7 @@ cc_columns = [{"name": "baz", "is_array": True}, {"name": "foobar", "is_array":
cc_column = 'code'
code_system_tables = [{table_name":"hasarray","column_name":"acol","is_bare_coding":False,"is_array":True, "has_data": True},{"table_name":"noarray","column_name":"col","is_bare_coding":False,"is_array":False, "has_data": True}{"table_name":"bare","column_name":"bcol","is_bare_coding":True,"is_array":False, "has_data": True},{"table_name":"empty","column_name":"empty","is_bare_coding":False,"is_array":False, "has_data": False}]
column_name = 'bar'
column_names = ['foo', 'bar']
conditions = ["1 > 0", "1 < 2"]
count_ref = count_ref
count_table = count_table
Expand All @@ -43,6 +48,7 @@ prefix = Test
primary_ref = encounter_ref
pos_source_table = pos_source_table
schema_name = test_schema
schema = {'condition': {'category': {'coding': True, 'code': True, 'display': True, 'system': True, 'userSelected': True, 'version': True, 'text': True}, 'clinicalstatus': {'coding': True, 'code': True, 'display': True, 'system': True, 'userSelected': True, 'version': True, 'text': True}, 'id': True, 'recordeddate': True, 'verificationstatus': {'coding': True, 'code': True, 'display': True, 'system': True, 'userSelected': True, 'version': True, 'text': True}, 'subject': {'reference': True, 'display': False, 'type': True}, 'encounter': {'reference': True, 'display': False, 'type': True}}}
source_table = source_table
source_id = source_id
table_cols = ["a","b"]
Expand Down
2 changes: 1 addition & 1 deletion cumulus_library/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Package metadata"""
__version__ = "1.5.0"
__version__ = "2.0.0"
14 changes: 10 additions & 4 deletions cumulus_library/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
"""Utility for building/retrieving data views in AWS Athena"""

import datetime
import json
import os
import sys
Expand Down Expand Up @@ -131,7 +130,10 @@ def clean_and_build_study(
if len(cleaned_tables) == 0:
stats_build = True
studyparser.run_table_builder(
self.cursor, self.schema_name, verbose=self.verbose
self.cursor,
self.schema_name,
verbose=self.verbose,
parser=self.db.parser(),
)
else:
self.update_transactions(studyparser.get_study_prefix(), "resumed")
Expand Down Expand Up @@ -166,7 +168,11 @@ def run_single_table_builder(
"""
studyparser = StudyManifestParser(target)
studyparser.run_single_table_builder(
self.cursor, self.schema_name, table_builder_name, self.verbose
self.cursor,
self.schema_name,
table_builder_name,
self.verbose,
parser=self.db.parser(),
)

def clean_and_build_all(self, study_dict: Dict, stats_build: bool) -> None:
Expand Down Expand Up @@ -282,8 +288,8 @@ def run_cli(args: Dict):

# all other actions require connecting to the database
else:
db_backend = create_db_backend(args)
try:
db_backend = create_db_backend(args)
builder = StudyBuilder(db_backend, data_path=args.get("data_path"))
if args["verbose"]:
builder.verbose = True
Expand Down
93 changes: 93 additions & 0 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,66 @@ def fetchall(self) -> Optional[list[list]]:
pass


class DatabaseParser(abc.ABC):
"""Parses information_schema results from a database"""

def _parse_found_schema(self, expected: dict[dict[list]], schema: list[tuple]):
"""Checks for presence of field for each column in a table
:param expected: A nested dict describing the expected data format of
a table. Expected format is like this:
{
"object_col":["member_a", "member_b"]
"primitive_col": []
}
:param schema: the results of a query from the get_column_datatype method
of the template_sql.templates function. It looks like this:
[
('object_col', 'row(member_a VARCHAR, member_b DATE)'),
('primitive_col', 'VARCHAR')
]
The actual contents of schema are database dependent. This naive method
is a first pass, ignoring complexities of differing database variable
types, just iterating through looking for column names.
TODO: on a per database instance, consider a more nuanced approach
if needed
"""
output = {}
for column, _ in expected.items():
output[column] = {}
if col_schema := schema[column.lower()]:
# is this an object column?
if len(expected[column]) > 0:
for field in expected[column]:
col_schema = col_schema.split(field, 1)
if len(col_schema) != 2:
output[column][field] = False
col_schema = col_schema[0]
else:
output[column][field] = True
col_schema = col_schema[1]
# otherwise this is a primitive col
else:
output[column] = True
else:
for field in expected[column]:
output[column][field] = False
return output

@abc.abstractmethod
def validate_table_schema(
self, expected: dict[dict[list]], schema: list[tuple]
) -> dict[bool]:
"""Public interface for investigating if fields are in a table schema.
This method should lightly format results and pass them to
_parse_found_schema(), or a more bespoke table analysis function if needed.
"""


class DatabaseBackend(abc.ABC):
"""A generic database backend, supporting basic cursor operations"""

Expand All @@ -67,6 +127,10 @@ def pandas_cursor(self) -> DatabaseCursor:
def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
"""Returns a pandas.DataFrame version of the results from the provided SQL"""

@abc.abstractmethod
def parser(self) -> DatabaseParser:
"""Returns parser object for interrogating DB schemas"""

def close(self) -> None:
"""Clean up any resources necessary"""

Expand Down Expand Up @@ -107,6 +171,17 @@ def pandas_cursor(self) -> AthenaPandasCursor:
def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
return self.pandas_cursor.execute(sql).as_pandas()

def parser(self) -> DatabaseParser:
return AthenaParser()


class AthenaParser(DatabaseParser):
def validate_table_schema(
self, expected: dict[dict[list]], schema: list[tuple]
) -> bool:
schema = dict(schema)
return self._parse_found_schema(expected, schema)


class DuckDatabaseBackend(DatabaseBackend):
"""Database backend that uses local files via duckdb"""
Expand Down Expand Up @@ -194,10 +269,28 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
# PyAthena seems to do this correctly for us, but not DuckDB.
return self.connection.execute(sql).df().convert_dtypes()

def parser(self) -> DatabaseParser:
return DuckDbParser()

def close(self) -> None:
self.connection.close()


class DuckDbParser(DatabaseParser):
"""Table parser for DuckDB schemas"""

def validate_table_schema(
self, expected: dict[dict[list]], schema: list[tuple]
) -> dict:
"""parses a information_schema.tables query response for valid columns"""
schema = dict(schema)
# since we're defaulting to athena naming conventions elsewhere,
# we'll lower case all the keys
schema = {k.lower(): v for k, v in schema.items()}

return self._parse_found_schema(expected, schema)


def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
"""Loads a directory tree of raw ndjson into schema-ful tables.
Expand Down
8 changes: 7 additions & 1 deletion cumulus_library/protected_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ class ProtectedTableBuilder(BaseTableBuilder):
display_text = "Creating/updating system tables..."

def prepare_queries(
self, cursor: object, schema: str, study_name: str, study_stats: dict
self,
cursor: object,
schema: str,
study_name: str,
study_stats: dict,
*args,
**kwargs,
):
self.queries.append(
get_ctas_empty_query(
Expand Down
10 changes: 8 additions & 2 deletions cumulus_library/statistics/counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,19 @@ def get_count_query(
:keyword min_subject: An integer setting the minimum bin size for inclusion
(default: 10)
:keyword fhir_resource: The type of FHIR resource to count (see
template_sql/templates.CountableFhirResource)
template_sql/templates.statistics.CountableFhirResource)
"""
if not table_name or not source_table or not table_cols:
raise CountsBuilderError(
"count_query missing required arguments. " f"output table: {table_name}"
)
for key in kwargs:
if key not in ["min_subject", "where_clauses", "fhir_resource"]:
if key not in [
"min_subject",
"where_clauses",
"fhir_resource",
"filter_resource",
]:
raise CountsBuilderError(f"count_query received unexpected key: {key}")
return counts_templates.get_count_query(
table_name, source_table, table_cols, **kwargs
Expand Down Expand Up @@ -114,6 +119,7 @@ def count_condition(
where_clauses=where_clauses,
min_subject=min_subject,
fhir_resource="condition",
filter_resource="encounter",
)

def count_document(
Expand Down
97 changes: 97 additions & 0 deletions cumulus_library/studies/core/builder_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from cumulus_library import base_table_builder
from cumulus_library import databases
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import templates


expected_table_cols = {
"condition": {
"category": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
"clinicalstatus": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
"id": [],
"recordeddate": [],
"verificationstatus": [
"coding",
"code",
"display",
"system",
"userSelected",
"version",
"text",
],
"subject": ["reference", "display", "type"],
"encounter": ["reference", "display", "type"],
}
}


class CoreConditionBuilder(base_table_builder.BaseTableBuilder):
def denormalize_codes(self):
preferred_config = templates.CodeableConceptConfig(
source_table="condition",
source_id="id",
column_name="code",
is_array=False,
target_table="core__condition_codable_concepts_display",
filter_priority=True,
code_systems=[
"http://snomed.info/sct",
"http://hl7.org/fhir/sid/icd-10-cm",
"http://hl7.org/fhir/sid/icd-9-cm",
],
)
self.queries.append(
templates.get_codeable_concept_denormalize_query(preferred_config)
)

all_config = templates.CodeableConceptConfig(
source_table="condition",
source_id="id",
column_name="code",
is_array=False,
target_table="core__condition_codable_concepts_all",
filter_priority=False,
)
self.queries.append(
templates.get_codeable_concept_denormalize_query(all_config)
)

def validate_schema(self, cursor: object, schema: str, expected_table_cols, parser):
validated_schema = {}
for table, cols in expected_table_cols.items():
query = templates.get_column_datatype_query(schema, table, cols.keys())
table_schema = cursor.execute(query).fetchall()
validated_schema[table] = parser.validate_table_schema(cols, table_schema)
return validated_schema

def prepare_queries(
self,
cursor: object,
schema: str,
*args,
parser: databases.DatabaseParser = None,
**kwargs,
):
self.denormalize_codes()
validated_schema = self.validate_schema(
cursor, schema, expected_table_cols, parser
)
self.queries.append(
core_templates.get_core_template("condition", validated_schema)
)
Loading

0 comments on commit 660800f

Please sign in to comment.