Skip to content

Commit

Permalink
fix: if a table does not exist, use an empty schema (#236)
Browse files Browse the repository at this point in the history
* fix: if a table does not exist, use an empty schema

(Instead of erroring out.)

This helps in cases where some tables get optionally created (like the
ETL completion tables).

* feat: add DatabaseBackend.operational_errors() handling

This adds a new backend method that classes can override to help
catch certain operational exceptions (i.e. not syntax or similar).

The first use is catching "table does not exist" errors that Athena
throws when trying to get the schema for a missing table.
  • Loading branch information
mikix authored May 14, 2024
1 parent 1932655 commit 6c870d5
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 142 deletions.
17 changes: 17 additions & 0 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
def parser(self) -> DatabaseParser:
"""Returns parser object for interrogating DB schemas"""

def operational_errors(self) -> tuple[Exception]:
"""Returns a tuple of operational exception classes
An operational error is something that went wrong while performing a database
query. So something like "table doesn't exist" but not like a network or
syntax error.
This is designed to be used in an `except` clause.
"""
return ()

def upload_file(
self,
*,
Expand Down Expand Up @@ -219,6 +230,9 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
def parser(self) -> DatabaseParser:
return AthenaParser()

def operational_errors(self) -> tuple[Exception]:
return (pyathena.OperationalError,)

def upload_file(
self,
*,
Expand Down Expand Up @@ -454,6 +468,9 @@ def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
def parser(self) -> DatabaseParser:
return DuckDbParser()

def operational_errors(self) -> tuple[Exception]:
return (duckdb.OperationalError,)

def close(self) -> None:
self.connection.close()

Expand Down
10 changes: 3 additions & 7 deletions cumulus_library/studies/core/builder_condition.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from cumulus_library import base_table_builder, databases
from cumulus_library import base_table_builder, base_utils
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import base_templates, sql_utils

Expand Down Expand Up @@ -75,16 +75,12 @@ def denormalize_codes(self):

def prepare_queries(
self,
cursor: object,
schema: str,
*args,
parser: databases.DatabaseParser = None,
config: base_utils.StudyConfig,
**kwargs,
):
self.denormalize_codes()
validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
validated_schema = sql_utils.validate_schema(config.db, expected_table_cols)
self.queries.append(
core_templates.get_core_template("condition", validated_schema)
)
14 changes: 4 additions & 10 deletions cumulus_library/studies/core/builder_documentreference.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from cumulus_library import base_table_builder, databases
from cumulus_library import base_table_builder, base_utils
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import sql_utils

Expand All @@ -19,16 +19,12 @@ class CoreDocumentreferenceBuilder(base_table_builder.BaseTableBuilder):

def prepare_queries(
self,
cursor: object,
schema: str,
*args,
parser: databases.DatabaseParser = None,
config: base_utils.StudyConfig,
**kwargs,
):
self.queries = sql_utils.denormalize_complex_objects(
schema,
cursor,
parser,
config.db,
[
sql_utils.CodeableConceptConfig(
source_table="documentreference",
Expand Down Expand Up @@ -62,9 +58,7 @@ def prepare_queries(
),
],
)
validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
validated_schema = sql_utils.validate_schema(config.db, expected_table_cols)
self.queries.append(
core_templates.get_core_template("documentreference", validated_schema)
)
22 changes: 6 additions & 16 deletions cumulus_library/studies/core/builder_encounter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass

from cumulus_library import base_table_builder, databases
from cumulus_library import base_table_builder, base_utils
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import sql_utils

Expand Down Expand Up @@ -37,7 +37,7 @@ def __post_init__(self):
class CoreEncounterBuilder(base_table_builder.BaseTableBuilder):
display_text = "Creating Encounter tables..."

def denormalize_codes(self, schema, cursor, parser):
def denormalize_codes(self, database):
code_configs = [
EncConfig(
column_hierarchy=[("type", list)],
Expand Down Expand Up @@ -103,26 +103,16 @@ def denormalize_codes(self, schema, cursor, parser):
expected={"dischargedisposition": sql_utils.CODEABLE_CONCEPT},
),
]
self.queries += sql_utils.denormalize_complex_objects(
schema, cursor, parser, code_configs
)
self.queries += sql_utils.denormalize_complex_objects(database, code_configs)

def prepare_queries(
self,
cursor: object,
schema: str,
*args,
parser: databases.DatabaseParser = None,
config: base_utils.StudyConfig,
**kwargs,
):
self.denormalize_codes(
schema,
cursor,
parser,
)
validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
self.denormalize_codes(config.db)
validated_schema = sql_utils.validate_schema(config.db, expected_table_cols)
self.queries += [
core_templates.get_core_template("encounter", validated_schema),
core_templates.get_core_template("incomplete_encounter", validated_schema),
Expand Down
18 changes: 5 additions & 13 deletions cumulus_library/studies/core/builder_medication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Module for generating core medication table"""

from cumulus_library import base_table_builder, databases
from cumulus_library import base_table_builder, base_utils
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import sql_utils

Expand All @@ -19,17 +19,13 @@ class MedicationBuilder(base_table_builder.BaseTableBuilder):

def prepare_queries(
self,
cursor: databases.DatabaseCursor,
schema: str,
parser: databases.DatabaseParser = None,
*args,
config: base_utils.StudyConfig,
**kwargs,
) -> None:
"""Constructs queries related to condition codeableConcept
:param cursor: A database cursor object
:param schema: the schema/db name, matching the cursor
:param parser: A database parser
:param config: A study config object
"""
code_sources = [
sql_utils.CodeableConceptConfig(
Expand Down Expand Up @@ -57,12 +53,8 @@ def prepare_queries(
],
),
]
self.queries += sql_utils.denormalize_complex_objects(
schema, cursor, parser, code_sources
)
validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources)
validated_schema = sql_utils.validate_schema(config.db, expected_table_cols)
self.queries += [
core_templates.get_core_template("medication", validated_schema),
]
17 changes: 5 additions & 12 deletions cumulus_library/studies/core/builder_medicationrequest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Note: This module assumes that you have already run builder_medication,
as it leverages the core__medication table for data population"""

from cumulus_library import base_table_builder, databases
from cumulus_library import base_table_builder, base_utils
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import sql_utils

Expand All @@ -26,16 +26,13 @@ class MedicationRequestBuilder(base_table_builder.BaseTableBuilder):

def prepare_queries(
self,
cursor: object,
schema: str,
*args,
parser: databases.DatabaseParser = None,
config: base_utils.StudyConfig,
**kwargs,
):
"""constructs queries related to patient extensions of interest
:param cursor: A database cursor object
:param schema: the schema/db name, matching the cursor
:param config: A study config object
"""
code_sources = [
sql_utils.CodeableConceptConfig(
Expand All @@ -45,12 +42,8 @@ def prepare_queries(
target_table="core__medicationrequest_dn_category",
),
]
self.queries += sql_utils.denormalize_complex_objects(
schema, cursor, parser, code_sources
)
validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources)
validated_schema = sql_utils.validate_schema(config.db, expected_table_cols)
self.queries.append(
core_templates.get_core_template("medicationrequest", validated_schema)
)
17 changes: 5 additions & 12 deletions cumulus_library/studies/core/builder_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dataclasses import dataclass

from cumulus_library import base_table_builder, databases
from cumulus_library import base_table_builder, base_utils
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import sql_utils

Expand Down Expand Up @@ -41,16 +41,13 @@ class ObservationBuilder(base_table_builder.BaseTableBuilder):

def prepare_queries(
self,
cursor: object,
schema: str,
*args,
parser: databases.DatabaseParser = None,
config: base_utils.StudyConfig,
**kwargs,
):
"""constructs queries related to patient extensions of interest
:param cursor: A database cursor object
:param schema: the schema/db name, matching the cursor
:param config: A study config object
"""
code_sources = [
ObsConfig(column_hierarchy=[("category", list)], filter_priority=False),
Expand Down Expand Up @@ -88,12 +85,8 @@ def prepare_queries(
),
]

self.queries += sql_utils.denormalize_complex_objects(
schema, cursor, parser, code_sources
)
validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
self.queries += sql_utils.denormalize_complex_objects(config.db, code_sources)
validated_schema = sql_utils.validate_schema(config.db, expected_table_cols)
self.queries += [
core_templates.get_core_template("observation", validated_schema),
core_templates.get_core_template(
Expand Down
25 changes: 8 additions & 17 deletions cumulus_library/studies/core/builder_patient.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Module for extracting US core extensions from patient records"""

from cumulus_library import databases
from cumulus_library import base_utils, databases
from cumulus_library.base_table_builder import BaseTableBuilder
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import base_templates, sql_utils
Expand All @@ -20,16 +20,12 @@ class PatientBuilder(BaseTableBuilder):

@staticmethod
def make_extension_query(
schema: str,
cursor: databases.DatabaseCursor,
parser: databases.DatabaseParser,
database: databases.DatabaseBackend,
name: str,
url: str,
) -> str:
has_extensions = sql_utils.is_field_present(
schema=schema,
cursor=cursor,
parser=parser,
database=database,
source_table="patient",
source_col="extension",
expected={
Expand All @@ -53,23 +49,20 @@ def make_extension_query(
return base_templates.get_extension_denormalize_query(config)
else:
return base_templates.get_ctas_empty_query(
schema_name=schema,
schema_name=database.schema_name,
table_name=f"core__patient_ext_{name}",
table_cols=["id", "system", f"{name}_code", f"{name}_display"],
)

def prepare_queries(
self,
cursor: databases.DatabaseCursor,
schema: str,
*args,
parser: databases.DatabaseParser = None,
config: base_utils.StudyConfig,
**kwargs,
):
"""constructs queries related to patient extensions of interest
:param cursor: A database cursor object
:param schema: the schema/db name, matching the cursor
:param config: A study config object
"""
extension_types = [
{
Expand All @@ -84,13 +77,11 @@ def prepare_queries(
for extension in extension_types:
self.queries.append(
self.make_extension_query(
schema, cursor, parser, extension["name"], extension["fhirpath"]
config.db, extension["name"], extension["fhirpath"]
)
)

validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
validated_schema = sql_utils.validate_schema(config.db, expected_table_cols)
self.queries.append(
core_templates.get_core_template("patient", validated_schema)
)
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ CREATE TABLE core__observation_component_valuequantity AS (
'x' AS comparator,
'x' AS unit,
'x' AS code_system,
'x' AS code,
'x' AS code
WHERE 1=0 -- empty table
{%- endif %}
);
Loading

0 comments on commit 6c870d5

Please sign in to comment.