diff --git a/cumulus_library/databases.py b/cumulus_library/databases.py index 73141e33..b27ef003 100644 --- a/cumulus_library/databases.py +++ b/cumulus_library/databases.py @@ -156,6 +156,17 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame: def parser(self) -> DatabaseParser: """Returns parser object for interrogating DB schemas""" + def operational_errors(self) -> tuple[Exception]: + """Returns a tuple of operational exception classes + + An operational error is something that went wrong while performing a database + query. So something like "table doesn't exist" but not like a network or + syntax error. + + This is designed to be used in an `except` clause. + """ + return () + def upload_file( self, *, @@ -219,6 +230,9 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame: def parser(self) -> DatabaseParser: return AthenaParser() + def operational_errors(self) -> tuple[Exception]: + return (pyathena.OperationalError,) + def upload_file( self, *, @@ -454,6 +468,9 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame: def parser(self) -> DatabaseParser: return DuckDbParser() + def operational_errors(self) -> tuple[Exception]: + return (duckdb.OperationalError,) + def close(self) -> None: self.connection.close() diff --git a/cumulus_library/studies/core/builder_condition.py b/cumulus_library/studies/core/builder_condition.py index 38d5f77f..8ab22e58 100644 --- a/cumulus_library/studies/core/builder_condition.py +++ b/cumulus_library/studies/core/builder_condition.py @@ -1,4 +1,4 @@ -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import base_templates, sql_utils @@ -75,16 +75,12 @@ def denormalize_codes(self): def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): self.denormalize_codes() - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries.append( core_templates.get_core_template("condition", validated_schema) ) diff --git a/cumulus_library/studies/core/builder_documentreference.py b/cumulus_library/studies/core/builder_documentreference.py index ac63424f..94a2a531 100644 --- a/cumulus_library/studies/core/builder_documentreference.py +++ b/cumulus_library/studies/core/builder_documentreference.py @@ -1,4 +1,4 @@ -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -19,16 +19,12 @@ class CoreDocumentreferenceBuilder(base_table_builder.BaseTableBuilder): def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): self.queries = sql_utils.denormalize_complex_objects( - schema, - cursor, - parser, + config.db, [ sql_utils.CodeableConceptConfig( source_table="documentreference", @@ -62,9 +58,7 @@ def prepare_queries( ), ], ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries.append( core_templates.get_core_template("documentreference", validated_schema) ) diff --git a/cumulus_library/studies/core/builder_encounter.py b/cumulus_library/studies/core/builder_encounter.py index 8f383e66..1e7ab7dd 100644 --- a/cumulus_library/studies/core/builder_encounter.py +++ b/cumulus_library/studies/core/builder_encounter.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -37,7 +37,7 @@ def __post_init__(self): class CoreEncounterBuilder(base_table_builder.BaseTableBuilder): display_text = "Creating Encounter tables..." - def denormalize_codes(self, schema, cursor, parser): + def denormalize_codes(self, database): code_configs = [ EncConfig( column_hierarchy=[("type", list)], @@ -103,26 +103,16 @@ def denormalize_codes(self, schema, cursor, parser): expected={"dischargedisposition": sql_utils.CODEABLE_CONCEPT}, ), ] - self.queries += sql_utils.denormalize_complex_objects( - schema, cursor, parser, code_configs - ) + self.queries += sql_utils.denormalize_complex_objects(database, code_configs) def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): - self.denormalize_codes( - schema, - cursor, - parser, - ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + self.denormalize_codes(config.db) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries += [ core_templates.get_core_template("encounter", validated_schema), core_templates.get_core_template("incomplete_encounter", validated_schema), diff --git a/cumulus_library/studies/core/builder_medication.py b/cumulus_library/studies/core/builder_medication.py index 9251a2e6..ddbe87cb 100644 --- a/cumulus_library/studies/core/builder_medication.py +++ b/cumulus_library/studies/core/builder_medication.py @@ -1,6 +1,6 @@ """Module for generating core medication table""" -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -19,17 +19,13 @@ class MedicationBuilder(base_table_builder.BaseTableBuilder): def prepare_queries( self, - cursor: databases.DatabaseCursor, - schema: str, - parser: databases.DatabaseParser = None, *args, + config: base_utils.StudyConfig, **kwargs, ) -> None: """Constructs queries related to condition codeableConcept - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor - :param parser: A database parser + :param config: A study config object """ code_sources = [ sql_utils.CodeableConceptConfig( @@ -57,12 +53,8 @@ def prepare_queries( ], ), ] - self.queries += sql_utils.denormalize_complex_objects( - schema, cursor, parser, code_sources - ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries += [ core_templates.get_core_template("medication", validated_schema), ] diff --git a/cumulus_library/studies/core/builder_medicationrequest.py b/cumulus_library/studies/core/builder_medicationrequest.py index 9509fd78..ce5a3e17 100644 --- a/cumulus_library/studies/core/builder_medicationrequest.py +++ b/cumulus_library/studies/core/builder_medicationrequest.py @@ -3,7 +3,7 @@ Note: This module assumes that you have already run builder_medication, as it leverages the core__medication table for data population""" -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -26,16 +26,13 @@ class MedicationRequestBuilder(base_table_builder.BaseTableBuilder): def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): """constructs queries related to patient extensions of interest - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor + :param config: A study config object """ code_sources = [ sql_utils.CodeableConceptConfig( @@ -45,12 +42,8 @@ def prepare_queries( target_table="core__medicationrequest_dn_category", ), ] - self.queries += sql_utils.denormalize_complex_objects( - schema, cursor, parser, code_sources - ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries.append( core_templates.get_core_template("medicationrequest", validated_schema) ) diff --git a/cumulus_library/studies/core/builder_observation.py b/cumulus_library/studies/core/builder_observation.py index c1d77447..d1d50f98 100644 --- a/cumulus_library/studies/core/builder_observation.py +++ b/cumulus_library/studies/core/builder_observation.py @@ -2,7 +2,7 @@ from dataclasses import dataclass -from cumulus_library import base_table_builder, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import sql_utils @@ -41,16 +41,13 @@ class ObservationBuilder(base_table_builder.BaseTableBuilder): def prepare_queries( self, - cursor: object, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): """constructs queries related to patient extensions of interest - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor + :param config: A study config object """ code_sources = [ ObsConfig(column_hierarchy=[("category", list)], filter_priority=False), @@ -88,12 +85,8 @@ def prepare_queries( ), ] - self.queries += sql_utils.denormalize_complex_objects( - schema, cursor, parser, code_sources - ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries += [ core_templates.get_core_template("observation", validated_schema), core_templates.get_core_template( diff --git a/cumulus_library/studies/core/builder_patient.py b/cumulus_library/studies/core/builder_patient.py index ddb4511c..fc4b256e 100644 --- a/cumulus_library/studies/core/builder_patient.py +++ b/cumulus_library/studies/core/builder_patient.py @@ -1,6 +1,6 @@ """Module for extracting US core extensions from patient records""" -from cumulus_library import databases +from cumulus_library import base_utils, databases from cumulus_library.base_table_builder import BaseTableBuilder from cumulus_library.studies.core.core_templates import core_templates from cumulus_library.template_sql import base_templates, sql_utils @@ -20,16 +20,12 @@ class PatientBuilder(BaseTableBuilder): @staticmethod def make_extension_query( - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, name: str, url: str, ) -> str: has_extensions = sql_utils.is_field_present( - schema=schema, - cursor=cursor, - parser=parser, + database=database, source_table="patient", source_col="extension", expected={ @@ -53,23 +49,20 @@ def make_extension_query( return base_templates.get_extension_denormalize_query(config) else: return base_templates.get_ctas_empty_query( - schema_name=schema, + schema_name=database.schema_name, table_name=f"core__patient_ext_{name}", table_cols=["id", "system", f"{name}_code", f"{name}_display"], ) def prepare_queries( self, - cursor: databases.DatabaseCursor, - schema: str, *args, - parser: databases.DatabaseParser = None, + config: base_utils.StudyConfig, **kwargs, ): """constructs queries related to patient extensions of interest - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor + :param config: A study config object """ extension_types = [ { @@ -84,13 +77,11 @@ def prepare_queries( for extension in extension_types: self.queries.append( self.make_extension_query( - schema, cursor, parser, extension["name"], extension["fhirpath"] + config.db, extension["name"], extension["fhirpath"] ) ) - validated_schema = sql_utils.validate_schema( - cursor, schema, expected_table_cols, parser - ) + validated_schema = sql_utils.validate_schema(config.db, expected_table_cols) self.queries.append( core_templates.get_core_template("patient", validated_schema) ) diff --git a/cumulus_library/studies/core/core_templates/observation_component_valuequantity.sql.jinja b/cumulus_library/studies/core/core_templates/observation_component_valuequantity.sql.jinja index fe64463d..cfb7fbc1 100644 --- a/cumulus_library/studies/core/core_templates/observation_component_valuequantity.sql.jinja +++ b/cumulus_library/studies/core/core_templates/observation_component_valuequantity.sql.jinja @@ -60,7 +60,7 @@ CREATE TABLE core__observation_component_valuequantity AS ( 'x' AS comparator, 'x' AS unit, 'x' AS code_system, - 'x' AS code, + 'x' AS code WHERE 1=0 -- empty table {%- endif %} ); diff --git a/cumulus_library/studies/discovery/code_detection.py b/cumulus_library/studies/discovery/code_detection.py index 9dc8b444..111164cc 100644 --- a/cumulus_library/studies/discovery/code_detection.py +++ b/cumulus_library/studies/discovery/code_detection.py @@ -1,6 +1,6 @@ """Module for generating encounter codeableConcept table""" -from cumulus_library import base_table_builder, base_utils, databases +from cumulus_library import base_table_builder, base_utils from cumulus_library.studies.discovery import code_definitions from cumulus_library.studies.discovery.discovery_templates import discovery_templates from cumulus_library.template_sql import sql_utils @@ -9,21 +9,17 @@ class CodeDetectionBuilder(base_table_builder.BaseTableBuilder): display_text = "Selecting unique code systems..." - def _check_coding_against_db(self, code_source, schema, cursor, parser): + def _check_coding_against_db(self, code_source, database): """selects the appropriate DB query to run""" return sql_utils.is_field_populated( - schema=schema, + database=database, source_table=code_source["table_name"], hierarchy=code_source["column_hierarchy"], expected=code_source.get("expected"), - cursor=cursor, - parser=parser, ) - def _check_codes_in_fields( - self, code_sources: list[dict], schema, cursor, parser - ) -> dict: + def _check_codes_in_fields(self, code_sources: list[dict], database) -> dict: """checks if Coding/CodeableConcept fields are present and populated""" with base_utils.get_progress_bar() as progress: @@ -33,24 +29,20 @@ def _check_codes_in_fields( ) for code_source in code_sources: code_source["has_data"] = self._check_coding_against_db( - code_source, schema, cursor, parser + code_source, database ) progress.advance(task) return code_sources def prepare_queries( self, - cursor: databases.DatabaseCursor, - schema: str, - parser: databases.DatabaseParser = None, *args, + config: base_utils.StudyConfig, **kwargs, ): """Constructs queries related to condition codeableConcept - :param cursor: A database cursor object - :param schema: the schema/db name, matching the cursor - + :param config: A study config object """ code_sources = [] @@ -67,7 +59,7 @@ def prepare_queries( for key in code_definition.keys(): code_source[key] = code_definition[key] code_sources.append(code_source) - code_sources = self._check_codes_in_fields(code_sources, schema, cursor, parser) + code_sources = self._check_codes_in_fields(code_sources, config.db) query = discovery_templates.get_code_system_pairs( "discovery__code_sources", code_sources ) diff --git a/cumulus_library/template_sql/sql_utils.py b/cumulus_library/template_sql/sql_utils.py index 31dd5e25..d1cbdf3b 100644 --- a/cumulus_library/template_sql/sql_utils.py +++ b/cumulus_library/template_sql/sql_utils.py @@ -90,9 +90,7 @@ class ExtensionConfig(BaseConfig): def _check_data_in_fields( - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, code_sources: list[CodeableConceptConfig], ) -> dict: """checks if CodeableConcept fields actually have data available @@ -116,16 +114,14 @@ def _check_data_in_fields( with base_utils.get_progress_bar(transient=True) as progress: task = progress.add_task( - "Detecting available encounter codeableConcepts...", + "Detecting available codeableConcepts...", # Each column in code_sources requires at most 3 queries to # detect valid data is in the DB total=len(code_sources), ) for code_source in code_sources: code_source.has_data = is_field_populated( - schema=schema, - cursor=cursor, - parser=parser, + database=database, source_table=code_source.source_table, hierarchy=code_source.column_hierarchy, expected=code_source.expected, @@ -135,13 +131,11 @@ def _check_data_in_fields( def denormalize_complex_objects( - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, code_sources: list[BaseConfig], ): queries = [] - code_sources = _check_data_in_fields(schema, cursor, parser, code_sources) + code_sources = _check_data_in_fields(database, code_sources) for code_source in code_sources: # TODO: This method of pairing classed config objects to # specific queries should be considered temporary. This should be @@ -177,7 +171,7 @@ def denormalize_complex_objects( col_types += ["varchar"] * len(code_source.extra_fields) queries.append( base_templates.get_ctas_empty_query( - schema_name=schema, + schema_name=database.schema_name, table_name=code_source.target_table, table_cols=table_cols, table_cols_types=col_types, @@ -191,7 +185,7 @@ def denormalize_complex_objects( else: queries.append( base_templates.get_ctas_empty_query( - schema_name=schema, + schema_name=database.schema_name, table_name=code_source.target_table, table_cols=["id", "code", "code_system", "display"], ) @@ -201,24 +195,31 @@ def denormalize_complex_objects( def validate_schema( - cursor: databases.DatabaseCursor, - schema: str, + database: databases.DatabaseBackend, expected_table_cols: dict, - parser: databases.DatabaseParser, ) -> dict: validated_schema = {} for table, cols in expected_table_cols.items(): - query = base_templates.get_column_datatype_query(schema, table, cols.keys()) - table_schema = cursor.execute(query).fetchall() - validated_schema[table] = parser.validate_table_schema(cols, table_schema) + query = base_templates.get_column_datatype_query( + database.schema_name, table, cols.keys() + ) + + try: + table_schema = database.cursor().execute(query).fetchall() + except database.operational_errors(): + # A database backend might reasonably raise an exception in cases like + # the table not existing (Athena does this). + table_schema = [] + + validated_schema[table] = database.parser().validate_table_schema( + cols, table_schema + ) return validated_schema def is_field_populated( *, - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, source_table: str, hierarchy: list[tuple], expected: list | dict | None = None, @@ -228,8 +229,7 @@ def is_field_populated( Non-core studies that rely on the core tables shouldn't need this method. This is just to examine the weird and wonderful world of the raw FHIR tables. - :keyword schema: The schema/database name - :keyword cursor: a PEP-249 compliant database cursor + :keyword database: The database backend :keyword source_table: The table to query against :keyword hierarchy: a list of tuples defining the FHIR path to the element. Each tuple should be of the form ('element_name', dict | list), where @@ -239,9 +239,7 @@ def is_field_populated( :returns: a boolean indicating if valid data is present. """ if not is_field_present( - schema=schema, - cursor=cursor, - parser=parser, + database=database, source_table=source_table, source_col=hierarchy[0][0], expected=expected, @@ -271,7 +269,7 @@ def is_field_populated( query = base_templates.get_is_table_not_empty_query( source_table=source_table, field=".".join(source_field), unnests=unnests ) - res = cursor.execute(query).fetchall() + res = database.cursor().execute(query).fetchall() if len(res) == 0: return False return True @@ -279,18 +277,14 @@ def is_field_populated( def is_field_present( *, - schema: str, - cursor: databases.DatabaseCursor, - parser: databases.DatabaseParser, + database: databases.DatabaseBackend, source_table: str, source_col: str, expected: list | dict | None = None, ) -> bool: """Validation check for a column existing, and having the expected schema - :keyword schema: The schema/database name - :keyword cursor: a PEP-249 compliant database cursor - :keyword parser: a database schema parser + :keyword database: The database backend :keyword source_table: The table to query against :keyword source_col: The column to check the schema against :keyword expected: a list of elements that should be present in source_col. @@ -301,7 +295,7 @@ def is_field_present( expected = CODEABLE_CONCEPT table_cols = {source_table: {source_col: expected}} - schema = validate_schema(cursor, schema, table_cols, parser) + schema = validate_schema(database, table_cols) def _get_all_values(d: dict) -> list: all_values = [] diff --git a/tests/test_template_sql_utils.py b/tests/test_template_sql_utils.py index ee14f72c..cd3881b8 100644 --- a/tests/test_template_sql_utils.py +++ b/tests/test_template_sql_utils.py @@ -54,11 +54,9 @@ def test_is_field_populated(mock_db, table, hierarchy, expected, returns, raises): with raises: res = sql_utils.is_field_populated( - schema="main", + database=mock_db, source_table=table, hierarchy=hierarchy, expected=expected, - cursor=mock_db.cursor(), - parser=mock_db.parser(), ) assert res == returns