From 6c870d581f1e0938f46a9fd44e1ae4d8fa8292f8 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Tue, 14 May 2024 11:32:36 -0400 Subject: [PATCH] fix: if a table does not exist, use an empty schema (#236) * fix: if a table does not exist, use an empty schema (Instead of erroring out.) This helps in cases where some tables get optionally created (like the ETL completion tables). * feat: add DatabaseBackend.operational_errors() handling This adds a new backend method that classes can override to help catch certain operational exceptions (i.e. not syntax or similar). The first use is catching "table does not exist" errors that Athena throws when trying to get the schema for a missing table. --- cumulus_library/databases.py | 17 +++++ .../studies/core/builder_condition.py | 10 +-- .../studies/core/builder_documentreference.py | 14 ++-- .../studies/core/builder_encounter.py | 22 ++----- .../studies/core/builder_medication.py | 18 ++---- .../studies/core/builder_medicationrequest.py | 17 ++--- .../studies/core/builder_observation.py | 17 ++--- .../studies/core/builder_patient.py | 25 +++----- ...ervation_component_valuequantity.sql.jinja | 2 +- .../studies/discovery/code_detection.py | 24 +++---- cumulus_library/template_sql/sql_utils.py | 64 +++++++++---------- tests/test_template_sql_utils.py | 4 +- 12 files changed, 92 insertions(+), 142 deletions(-) diff --git a/cumulus_library/databases.py b/cumulus_library/databases.py index 73141e33..b27ef003 100644 --- a/cumulus_library/databases.py +++ b/cumulus_library/databases.py @@ -156,6 +156,17 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame: def parser(self) -> DatabaseParser: """Returns parser object for interrogating DB schemas""" + def operational_errors(self) -> tuple[Exception]: + """Returns a tuple of operational exception classes + + An operational error is something that went wrong while performing a database + query. So something like "table doesn't exist" but not like a network or + syntax error. + + This is designed to be used in an `except` clause. + """ + return () + def upload_file( self, *, @@ -219,6 +230,9 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame: def parser(self) -> DatabaseParser: return AthenaParser() + def operational_errors(self) -> tuple[Exception]: + return (pyathena.OperationalError,) + def upload_file( self, *, @@ -454,6 +468,9 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame: def parser(self) -> DatabaseParser: return DuckDbParser() + def operational_errors(self) -> tuple[Exception]: + return (duckdb.OperationalError,) + def close(self) -> None: self.connection.close() diff --git a/cumulus_library/studies/core/builder_condition.py b/cumulus_library/studies/core/builder_condition.py index 38d5f77f..8ab22e58 100644 --- a/cumulus_library/studies/core/builder_condition.py +++ b/cumulus_library/studies/core/builder_condition.py @@ -1,4 +1,4 @@ -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import base_templates, sql_utils @@ -75,16 +75,12 @@ def denormalize_codes(self): def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): self.denormalize_codes() - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries.append( core_templates.get_core_template("condition", validated_schema) ) diff --git a/cumulus_library/studies/core/builder_documentreference.py b/cumulus_library/studies/core/builder_documentreference.py index ac63424f..94a2a531 100644 --- a/cumulus_library/studies/core/builder_documentreference.py +++ b/cumulus_library/studies/core/builder_documentreference.py @@ -1,4 +1,4 @@ -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -19,16 +19,12 @@ class CoreDocumentreferenceBuilder(base_table_builder.BaseTableBuilder): def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): self.queries = sql_utils.denormalize_complex_objects( - schema, - cursor, - parser, + config.db, [ sql_utils.CodeableConceptConfig( source_table="documentreference", @@ -62,9 +58,7 @@ def prepare_queries( ), ], ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries.append( core_templates.get_core_template("documentreference", validated_schema) ) diff --git a/cumulus_library/studies/core/builder_encounter.py b/cumulus_library/studies/core/builder_encounter.py index 8f383e66..1e7ab7dd 100644 --- a/cumulus_library/studies/core/builder_encounter.py +++ b/cumulus_library/studies/core/builder_encounter.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -37,7 +37,7 @@ def __post_init__(self): class CoreEncounterBuilder(base_table_builder.BaseTableBuilder): display_text = "Creating Encounter tables..." - def denormalize_codes(self, schema, cursor, parser): + def denormalize_codes(self, database): code_configs = [ EncConfig( column_hierarchy=[("type", list)], @@ -103,26 +103,16 @@ def denormalize_codes(self, schema, cursor, parser): expected={"dischargedisposition": sql_utils.CODEABLE_CONCEPT}, ), ] - self.queries += sql_utils.denormalize_complex_objects( - schema, cursor, parser, code_configs - ) + self.queries += sql_utils.denormalize_complex_objects(database, code_configs) def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): - self.denormalize_codes( - schema, - cursor, - parser, - ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + self.denormalize_codes(config.db) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries += [ core_templates.get_core_template("encounter", validated_schema), core_templates.get_core_template("incomplete_encounter", validated_schema), diff --git a/cumulus_library/studies/core/builder_medication.py b/cumulus_library/studies/core/builder_medication.py index 9251a2e6..ddbe87cb 100644 --- a/cumulus_library/studies/core/builder_medication.py +++ b/cumulus_library/studies/core/builder_medication.py @@ -1,6 +1,6 @@ """Module for generating core medication table""" -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -19,17 +19,13 @@ class MedicationBuilder(base_table_builder.BaseTableBuilder): def prepare_queries( self, - cursor: databases.DatabaseCursor, - schema: str, - parser: databases.DatabaseParser = None, *args, + config: base_utils.StudyConfig, **kwargs, ) -> None: """Constructs queries related to condition codeableConcept - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor - :param parser: A database parser + :param config: A study config object """ code_sources = [ sql_utils.CodeableConceptConfig( @@ -57,12 +53,8 @@ def prepare_queries( ], ), ] - self.queries += sql_utils.denormalize_complex_objects( - schema, cursor, parser, code_sources - ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries += [ core_templates.get_core_template("medication", validated_schema), ] diff --git a/cumulus_library/studies/core/builder_medicationrequest.py b/cumulus_library/studies/core/builder_medicationrequest.py index 9509fd78..ce5a3e17 100644 --- a/cumulus_library/studies/core/builder_medicationrequest.py +++ b/cumulus_library/studies/core/builder_medicationrequest.py @@ -3,7 +3,7 @@ Note: This module assumes that you have already run builder_medication, as it leverages the core__medication table for data population""" -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -26,16 +26,13 @@ class MedicationRequestBuilder(base_table_builder.BaseTableBuilder): def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): """constructs queries related to patient extensions of interest - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor + :param config: A study config object """ code_sources = [ sql_utils.CodeableConceptConfig( @@ -45,12 +42,8 @@ def prepare_queries( target_table="core__medicationrequest_dn_category", ), ] - self.queries += sql_utils.denormalize_complex_objects( - schema, cursor, parser, code_sources - ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries.append( core_templates.get_core_template("medicationrequest", validated_schema) ) diff --git a/cumulus_library/studies/core/builder_observation.py b/cumulus_library/studies/core/builder_observation.py index c1d77447..d1d50f98 100644 --- a/cumulus_library/studies/core/builder_observation.py +++ b/cumulus_library/studies/core/builder_observation.py @@ -2,7 +2,7 @@ from dataclasses import dataclass -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -41,16 +41,13 @@ class ObservationBuilder(base_table_builder.BaseTableBuilder): def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): """constructs queries related to patient extensions of interest - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor + :param config: A study config object """ code_sources = [ ObsConfig(column_hierarchy=[("category", list)], filter_priority=False), @@ -88,12 +85,8 @@ def prepare_queries( ), ] - self.queries += sql_utils.denormalize_complex_objects( - schema, cursor, parser, code_sources - ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries += [ core_templates.get_core_template("observation", validated_schema), core_templates.get_core_template( diff --git a/cumulus_library/studies/core/builder_patient.py b/cumulus_library/studies/core/builder_patient.py index ddb4511c..fc4b256e 100644 --- a/cumulus_library/studies/core/builder_patient.py +++ b/cumulus_library/studies/core/builder_patient.py @@ -1,6 +1,6 @@ """Module for extracting US core extensions from patient records""" -from cumulus_library import databases +from cumulus_library import base_utils, databases from cumulus_library.base_table_builder import BaseTableBuilder from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import base_templates, sql_utils @@ -20,16 +20,12 @@ class PatientBuilder(BaseTableBuilder): @staticmethod def make_extension_query( - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, name: str, url: str, ) -> str: has_extensions = sql_utils.is_field_present( - schema=schema, - cursor=cursor, - parser=parser, + database=database, source_table="patient", source_col="extension", expected={ @@ -53,23 +49,20 @@ def make_extension_query( return base_templates.get_extension_denormalize_query(config) else: return base_templates.get_ctas_empty_query( - schema_name=schema, + schema_name=database.schema_name, table_name=f"core__patient_ext_{name}", table_cols=["id", "system", f"{name}_code", f"{name}_display"], ) def prepare_queries( self, - cursor: databases.DatabaseCursor, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): """constructs queries related to patient extensions of interest - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor + :param config: A study config object """ extension_types = [ { @@ -84,13 +77,11 @@ def prepare_queries( for extension in extension_types: self.queries.append( self.make_extension_query( - schema, cursor, parser, extension["name"], extension["fhirpath"] + config.db, extension["name"], extension["fhirpath"] ) ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries.append( core_templates.get_core_template("patient", validated_schema) ) diff --git a/cumulus_library/studies/core/core_templates/observation_component_valuequantity.sql.jinja b/cumulus_library/studies/core/core_templates/observation_component_valuequantity.sql.jinja index fe64463d..cfb7fbc1 100644 --- a/cumulus_library/studies/core/core_templates/observation_component_valuequantity.sql.jinja +++ b/cumulus_library/studies/core/core_templates/observation_component_valuequantity.sql.jinja @@ -60,7 +60,7 @@ CREATE TABLE core__observation_component_valuequantity AS ( 'x' AS comparator, 'x' AS unit, 'x' AS code_system, - 'x' AS code, + 'x' AS code WHERE 1=0 -- empty table {%- endif %} ); diff --git a/cumulus_library/studies/discovery/code_detection.py b/cumulus_library/studies/discovery/code_detection.py index 9dc8b444..111164cc 100644 --- a/cumulus_library/studies/discovery/code_detection.py +++ b/cumulus_library/studies/discovery/code_detection.py @@ -1,6 +1,6 @@ """Module for generating encounter codeableConcept table""" -from cumulus_library import base_table_builder, base_utils, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.discovery import code_definitions from cumulus_library.studies.discovery.discovery_templates import discovery_templates from cumulus_library.template_sql import sql_utils @@ -9,21 +9,17 @@ class CodeDetectionBuilder(base_table_builder.BaseTableBuilder): display_text = "Selecting unique code systems..." - def _check_coding_against_db(self, code_source, schema, cursor, parser): + def _check_coding_against_db(self, code_source, database): """selects the appropriate DB query to run""" return sql_utils.is_field_populated( - schema=schema, + database=database, source_table=code_source["table_name"], hierarchy=code_source["column_hierarchy"], expected=code_source.get("expected"), - cursor=cursor, - parser=parser, ) - def _check_codes_in_fields( - self, code_sources: list[dict], schema, cursor, parser - ) -> dict: + def _check_codes_in_fields(self, code_sources: list[dict], database) -> dict: """checks if Coding/CodeableConcept fields are present and populated""" with base_utils.get_progress_bar() as progress: @@ -33,24 +29,20 @@ def _check_codes_in_fields( ) for code_source in code_sources: code_source["has_data"] = self._check_coding_against_db( - code_source, schema, cursor, parser + code_source, database ) progress.advance(task) return code_sources def prepare_queries( self, - cursor: databases.DatabaseCursor, - schema: str, - parser: databases.DatabaseParser = None, *args, + config: base_utils.StudyConfig, **kwargs, ): """Constructs queries related to condition codeableConcept - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor - + :param config: A study config object """ code_sources = [] @@ -67,7 +59,7 @@ def prepare_queries( for key in code_definition.keys(): code_source[key] = code_definition[key] code_sources.append(code_source) - code_sources = self._check_codes_in_fields(code_sources, schema, cursor, parser) + code_sources = self._check_codes_in_fields(code_sources, config.db) query = discovery_templates.get_code_system_pairs( "discovery__code_sources", code_sources ) diff --git a/cumulus_library/template_sql/sql_utils.py b/cumulus_library/template_sql/sql_utils.py index 31dd5e25..d1cbdf3b 100644 --- a/cumulus_library/template_sql/sql_utils.py +++ b/cumulus_library/template_sql/sql_utils.py @@ -90,9 +90,7 @@ class ExtensionConfig(BaseConfig): def _check_data_in_fields( - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, code_sources: list[CodeableConceptConfig], ) -> dict: """checks if CodeableConcept fields actually have data available @@ -116,16 +114,14 @@ def _check_data_in_fields( with base_utils.get_progress_bar(transient=True) as progress: task = progress.add_task( - "Detecting available encounter codeableConcepts...", + "Detecting available codeableConcepts...", # Each column in code_sources requires at most 3 queries to # detect valid data is in the DB total=len(code_sources), ) for code_source in code_sources: code_source.has_data = is_field_populated( - schema=schema, - cursor=cursor, - parser=parser, + database=database, source_table=code_source.source_table, hierarchy=code_source.column_hierarchy, expected=code_source.expected, @@ -135,13 +131,11 @@ def _check_data_in_fields( def denormalize_complex_objects( - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, code_sources: list[BaseConfig], ): queries = [] - code_sources = _check_data_in_fields(schema, cursor, parser, code_sources) + code_sources = _check_data_in_fields(database, code_sources) for code_source in code_sources: # TODO: This method of pairing classed config objects to # specific queries should be considered temporary. This should be @@ -177,7 +171,7 @@ def denormalize_complex_objects( col_types += ["varchar"] * len(code_source.extra_fields) queries.append( base_templates.get_ctas_empty_query( - schema_name=schema, + schema_name=database.schema_name, table_name=code_source.target_table, table_cols=table_cols, table_cols_types=col_types, @@ -191,7 +185,7 @@ def denormalize_complex_objects( else: queries.append( base_templates.get_ctas_empty_query( - schema_name=schema, + schema_name=database.schema_name, table_name=code_source.target_table, table_cols=["id", "code", "code_system", "display"], ) @@ -201,24 +195,31 @@ def denormalize_complex_objects( def validate_schema( - cursor: databases.DatabaseCursor, - schema: str, + database: databases.DatabaseBackend, expected_table_cols: dict, - parser: databases.DatabaseParser, ) -> dict: validated_schema = {} for table, cols in expected_table_cols.items(): - query = base_templates.get_column_datatype_query(schema, table, cols.keys()) - table_schema = cursor.execute(query).fetchall() - validated_schema[table] = parser.validate_table_schema(cols, table_schema) + query = base_templates.get_column_datatype_query( + database.schema_name, table, cols.keys() + ) + + try: + table_schema = database.cursor().execute(query).fetchall() + except database.operational_errors(): + # A database backend might reasonably raise an exception in cases like + # the table not existing (Athena does this). + table_schema = [] + + validated_schema[table] = database.parser().validate_table_schema( + cols, table_schema + ) return validated_schema def is_field_populated( *, - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, source_table: str, hierarchy: list[tuple], expected: list | dict | None = None, @@ -228,8 +229,7 @@ def is_field_populated( Non-core studies that rely on the core tables shouldn't need this method. This is just to examine the weird and wonderful world of the raw FHIR tables. - :keyword schema: The schema/database name - :keyword cursor: a PEP-249 compliant database cursor + :keyword database: The database backend :keyword source_table: The table to query against :keyword hierarchy: a list of tuples defining the FHIR path to the element. Each tuple should be of the form ('element_name', dict | list), where @@ -239,9 +239,7 @@ def is_field_populated( :returns: a boolean indicating if valid data is present. """ if not is_field_present( - schema=schema, - cursor=cursor, - parser=parser, + database=database, source_table=source_table, source_col=hierarchy[0][0], expected=expected, @@ -271,7 +269,7 @@ def is_field_populated( query = base_templates.get_is_table_not_empty_query( source_table=source_table, field=".".join(source_field), unnests=unnests ) - res = cursor.execute(query).fetchall() + res = database.cursor().execute(query).fetchall() if len(res) == 0: return False return True @@ -279,18 +277,14 @@ def is_field_populated( def is_field_present( *, - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, source_table: str, source_col: str, expected: list | dict | None = None, ) -> bool: """Validation check for a column existing, and having the expected schema - :keyword schema: The schema/database name - :keyword cursor: a PEP-249 compliant database cursor - :keyword parser: a database schema parser + :keyword database: The database backend :keyword source_table: The table to query against :keyword source_col: The column to check the schema against :keyword expected: a list of elements that should be present in source_col. @@ -301,7 +295,7 @@ def is_field_present( expected = CODEABLE_CONCEPT table_cols = {source_table: {source_col: expected}} - schema = validate_schema(cursor, schema, table_cols, parser) + schema = validate_schema(database, table_cols) def _get_all_values(d: dict) -> list: all_values = [] diff --git a/tests/test_template_sql_utils.py b/tests/test_template_sql_utils.py index ee14f72c..cd3881b8 100644 --- a/tests/test_template_sql_utils.py +++ b/tests/test_template_sql_utils.py @@ -54,11 +54,9 @@ def test_is_field_populated(mock_db, table, hierarchy, expected, returns, raises): with raises: res = sql_utils.is_field_populated( - schema="main", + database=mock_db, source_table=table, hierarchy=hierarchy, expected=expected, - cursor=mock_db.cursor(), - parser=mock_db.parser(), ) assert res == returns