From 8ee5515eda952c3cd0df645230091c6d097ab0be Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Mon, 18 Dec 2023 13:33:14 -0500 Subject: [PATCH] PSM CLI, table persistence --- cumulus_library/base_table_builder.py | 19 +- cumulus_library/cli.py | 152 ++++++++++------ cumulus_library/cli_parser.py | 28 ++- cumulus_library/databases.py | 8 +- cumulus_library/enums.py | 17 ++ cumulus_library/protected_table_builder.py | 58 ++++++ cumulus_library/statistics/psm.py | 148 +++++++++++++--- cumulus_library/study_parser.py | 167 +++++++++++++++--- .../template_sql/ctas_empty.sql.jinja | 7 +- .../psm_create_covariate_table.sql.jinja | 2 +- .../template_sql/statistics/psm_templates.py | 2 + cumulus_library/template_sql/templates.py | 10 +- docs/statistics/propensity-score-matching.md | 4 + tests/test_data/duckdb_data/duck.db | Bin 12288 -> 0 bytes tests/test_data/psm/manifest.toml | 7 + tests/test_data/psm/psm_cohort.sql | 3 + tests/test_data/psm/psm_config.toml | 7 +- .../test_data/psm/psm_config_no_optional.toml | 6 +- tests/test_psm.py | 2 +- tests/test_templates.py | 43 +++++ 20 files changed, 574 insertions(+), 116 deletions(-) create mode 100644 cumulus_library/enums.py create mode 100644 cumulus_library/protected_table_builder.py delete mode 100644 tests/test_data/duckdb_data/duck.db create mode 100644 tests/test_data/psm/manifest.toml create mode 100644 tests/test_data/psm/psm_cohort.sql diff --git a/cumulus_library/base_table_builder.py b/cumulus_library/base_table_builder.py index 512ae4d2..dfcd41fe 100644 --- a/cumulus_library/base_table_builder.py +++ b/cumulus_library/base_table_builder.py @@ -1,5 +1,6 @@ """ abstract base for python-based study executors """ import re +import sys from abc import ABC, abstractmethod from typing import final @@ -21,7 +22,7 @@ def __init__(self): self.queries = [] @abstractmethod - def prepare_queries(self, cursor: object, schema: str): + def prepare_queries(self, cursor: object, schema: str, *args, **kwargs): """Main entrypoint for python table builders. When completed, prepare_queries should populate self.queries with sql @@ -29,7 +30,8 @@ def prepare_queries(self, cursor: object, schema: str): :param cursor: A PEP-249 compatible cursor :param schema: A schema name - :param verbose: toggle for verbose output mode + :param db_type: The db system being used (only relevant for db-specific + query construction) """ raise NotImplementedError @@ -40,6 +42,8 @@ def execute_queries( schema: str, verbose: bool, drop_table: bool = False, + *args, + **kwargs, ): """Executes queries set up by a prepare_queries call @@ -48,7 +52,7 @@ def execute_queries( :param verbose: toggle for verbose output mode :param drop_table: drops any tables found in prepared_queries results """ - self.prepare_queries(cursor, schema) + self.prepare_queries(cursor, schema, *args, **kwargs) if drop_table: table_names = [] for query in self.queries: @@ -73,8 +77,11 @@ def execute_queries( ) for query in self.queries: query_console_output(verbose, query, progress, task) - cursor.execute(query) - self.post_execution(cursor, schema, verbose, drop_table) + try: + cursor.execute(query) + except Exception as e: + sys.exit(e) + self.post_execution(cursor, schema, verbose, drop_table, *args, **kwargs) def post_execution( self, @@ -82,6 +89,8 @@ def post_execution( schema: str, verbose: bool, drop_table: bool = False, + *args, + **kwargs, ): """Hook for any additional actions to run after execute_queries""" pass diff --git a/cumulus_library/cli.py b/cumulus_library/cli.py index 523f8e1d..b8d04de0 100755 --- a/cumulus_library/cli.py +++ b/cumulus_library/cli.py @@ -6,6 +6,7 @@ import sys import sysconfig +from datetime import datetime from pathlib import Path, PosixPath from typing import Dict, List, Optional @@ -19,57 +20,50 @@ DatabaseBackend, create_db_backend, ) +from cumulus_library.enums import PROTECTED_TABLES +from cumulus_library.protected_table_builder import TRANSACTIONS_COLS from cumulus_library.study_parser import StudyManifestParser +from cumulus_library.template_sql.templates import get_insert_into_query from cumulus_library.upload import upload_files -# ** Don't delete! ** -# This class isn't used in the rest of the code, -# but it is used manually as a quick & dirty alternative to the CLI. -class CumulusEnv: # pylint: disable=too-few-public-methods - """ - Wrapper for Cumulus Environment vars. - Simplifies connections to StudyBuilder without requiring CLI parsing. - """ - - def __init__(self): - self.region = os.environ.get("CUMULUS_LIBRARY_REGION", "us-east-1") - self.workgroup = os.environ.get("CUMULUS_LIBRARY_WORKGROUP", "cumulus") - self.profile = os.environ.get("CUMULUS_LIBRARY_PROFILE") - self.schema_name = os.environ.get("CUMULUS_LIBRARY_DATABASE") - - def get_study_builder(self): - """Convenience method for getting athena args from environment""" - db = AthenaDatabaseBackend( - self.region, self.workgroup, self.profile, self.schema_name - ) - return StudyBuilder(db) - - class StudyBuilder: """Class for managing Athena cursors and executing Cumulus queries""" verbose = False schema_name = None - def __init__(self, db: DatabaseBackend): + def __init__(self, db: DatabaseBackend, data_path: str): self.db = db + self.data_path = data_path self.cursor = db.cursor() self.schema_name = db.schema_name - def reset_data_path(self, study: PosixPath) -> None: - """ - Removes existing exports from a study's local data dir - """ - project_path = Path(__file__).resolve().parents[1] - path = Path(f"{str(project_path)}/data_export/{study}/") - if path.exists(): - for file in path.glob("*"): - file.unlink() + def update_transactions(self, prefix: str, status: str): + self.cursor.execute( + get_insert_into_query( + f"{prefix}__{PROTECTED_TABLES.TRANSACTIONS.value}", + TRANSACTIONS_COLS, + [ + [ + prefix, + __version__, + status, + datetime.now().replace(microsecond=0).isoformat(), + ] + ], + ) + ) ### Creating studies - def clean_study(self, targets: List[str], study_dict, prefix=False) -> None: + def clean_study( + self, + targets: List[str], + study_dict: Dict, + stats_clean: bool, + prefix: bool = False, + ) -> None: """Removes study table/views from Athena. While this is usually not required, since it it done as part of a build, @@ -86,25 +80,69 @@ def clean_study(self, targets: List[str], study_dict, prefix=False) -> None: if prefix: parser = StudyManifestParser() parser.clean_study( - self.cursor, self.schema_name, self.verbose, prefix=target + self.cursor, + self.schema_name, + verbose=self.verbose, + stats_clean=stats_clean, + prefix=target, ) else: parser = StudyManifestParser(study_dict[target]) - parser.clean_study(self.cursor, self.schema_name, self.verbose) + parser.clean_study( + self.cursor, + self.schema_name, + verbose=self.verbose, + stats_clean=stats_clean, + ) def clean_and_build_study( - self, target: PosixPath, continue_from: str = None + self, + target: PosixPath, + export_dir: PosixPath, + stats_build: bool, + continue_from: str = None, ) -> None: """Recreates study views/tables :param target: A PosixPath to the study directory """ - studyparser = StudyManifestParser(target) - if not continue_from: - studyparser.clean_study(self.cursor, self.schema_name, self.verbose) - studyparser.run_table_builder(self.cursor, self.schema_name, self.verbose) - studyparser.build_study(self.cursor, self.verbose, continue_from) - studyparser.run_counts_builder(self.cursor, self.schema_name, self.verbose) + + studyparser = StudyManifestParser(target, self.data_path) + try: + if not continue_from: + studyparser.run_protected_table_builder( + self.cursor, self.schema_name, verbose=self.verbose + ) + self.update_transactions(studyparser.get_study_prefix(), "started") + cleaned_tables = studyparser.clean_study( + self.cursor, + self.schema_name, + verbose=self.verbose, + stats_clean=False, + ) + # If the study hasn't been created before, force stats table generation + if len(cleaned_tables) == 0: + stats_build = True + studyparser.run_table_builder( + self.cursor, self.schema_name, verbose=self.verbose + ) + else: + self.update_transactions(studyparser.get_study_prefix(), "resumed") + studyparser.build_study(self.cursor, self.verbose, continue_from) + studyparser.run_counts_builders( + self.cursor, self.schema_name, verbose=self.verbose + ) + studyparser.run_statistics_builders( + self.cursor, + self.schema_name, + verbose=self.verbose, + stats_build=stats_build, + data_path=self.data_path, + ) + self.update_transactions(studyparser.get_study_prefix(), "finished") + except Exception as e: + self.update_transactions(studyparser.get_study_prefix(), "error") + raise e def run_single_table_builder( self, target: PosixPath, table_builder_name: str @@ -118,7 +156,7 @@ def run_single_table_builder( self.cursor, self.schema_name, table_builder_name, self.verbose ) - def clean_and_build_all(self, study_dict: Dict) -> None: + def clean_and_build_all(self, study_dict: Dict, export_dir: PosixPath) -> None: """Builds views for all studies. NOTE: By design, this method will always exclude the `template` study dir, @@ -129,7 +167,7 @@ def clean_and_build_all(self, study_dict: Dict) -> None: study_dict = dict(study_dict) study_dict.pop("template") for precursor_study in ["vocab", "core"]: - self.clean_and_build_study(study_dict[precursor_study]) + self.clean_and_build_study(study_dict[precursor_study], export_dir) study_dict.pop(precursor_study) for key in study_dict: self.clean_and_build_study(study_dict[key]) @@ -142,7 +180,7 @@ def export_study(self, target: PosixPath, data_path: PosixPath) -> None: """ if data_path is None: sys.exit("Missing destination - please provide a path argument.") - studyparser = StudyManifestParser(target) + studyparser = StudyManifestParser(target, data_path) studyparser.export_study(self.db, data_path) def export_all(self, study_dict: Dict, data_path: PosixPath): @@ -229,7 +267,7 @@ def run_cli(args: Dict): # all other actions require connecting to AWS else: db_backend = create_db_backend(args) - builder = StudyBuilder(db_backend) + builder = StudyBuilder(db_backend, args["data_path"]) if args["verbose"]: builder.verbose = True print("Testing connection to database...") @@ -250,20 +288,26 @@ def run_cli(args: Dict): builder.clean_study( args["target"], study_dict, + args["stats_clean"], args["prefix"], ) elif args["action"] == "build": if "all" in args["target"]: - builder.clean_and_build_all(study_dict) + builder.clean_and_build_all( + study_dict, args["export_dir"], args["stats_build"] + ) else: for target in args["target"]: if args["builder"]: builder.run_single_table_builder( - study_dict[target], args["builder"] + study_dict[target], args["builder"], args["stats_build"] ) else: builder.clean_and_build_study( - study_dict[target], continue_from=args["continue_from"] + study_dict[target], + args["data_path"], + args["stats_build"], + continue_from=args["continue_from"], ) elif args["action"] == "export": @@ -273,6 +317,13 @@ def run_cli(args: Dict): for target in args["target"]: builder.export_study(study_dict[target], args["data_path"]) + # print(set(builder.cursor.execute("""SELECT table_name + # FROM information_schema.tables + # where table_name ilike '%_lib_%' + # or table_name ilike '%_psm_%'""").fetchall())) + # print(builder.cursor.execute("select * from psm_test__lib_statistics").fetchall()) + # print(builder.cursor.execute("select * from psm_test__lib_transactions").fetchall()) + # print(builder.cursor.execute("select * from psm_test__psm_encounter_covariate").fetchall()) db_backend.close() # returning the builder for ease of unit testing return builder @@ -337,6 +388,7 @@ def main(cli_args=None): if args.get("data_path"): args["data_path"] = get_abs_posix_path(args["data_path"]) + return run_cli(args) diff --git a/cumulus_library/cli_parser.py b/cumulus_library/cli_parser.py index 5f951988..fd5c6805 100644 --- a/cumulus_library/cli_parser.py +++ b/cumulus_library/cli_parser.py @@ -1,6 +1,8 @@ """Manages configuration for argparse""" import argparse +from cumulus_library.errors import CumulusLibraryError + def add_target_argument(parser: argparse.ArgumentParser) -> None: """Adds --target arg to a subparser""" @@ -122,6 +124,8 @@ def get_parser() -> argparse.ArgumentParser: dest="action", ) + # Study creation + create = actions.add_parser( "create", help="Create a study instance from a template" ) @@ -135,6 +139,8 @@ def get_parser() -> argparse.ArgumentParser: ), ) + # Database cleaning + clean = actions.add_parser( "clean", help="Removes tables & views beginning with '[target]__' from Athena" ) @@ -143,12 +149,20 @@ def get_parser() -> argparse.ArgumentParser: add_study_dir_argument(clean) add_verbose_argument(clean) add_db_config(clean) + clean.add_argument( + "--statistics", + action="store_true", + help="Remove artifacts of previous statistics runs", + dest="stats_clean", + ) clean.add_argument( "--prefix", action="store_true", help=argparse.SUPPRESS, ) + # Database building + build = actions.add_parser( "build", help="Removes and recreates Athena tables & views for specified studies", @@ -158,7 +172,15 @@ def get_parser() -> argparse.ArgumentParser: add_study_dir_argument(build) add_verbose_argument(build) add_db_config(build) - + build.add_argument( + "--statistics", + action="store_true", + help=( + "Force regenerating statistics data from latest dataset. " + "Stats are created by default when study is initially run" + ), + dest="stats_build", + ) build.add_argument( "--load-ndjson-dir", help="Load ndjson files from this folder", metavar="DIR" ) @@ -168,6 +190,8 @@ def get_parser() -> argparse.ArgumentParser: help=argparse.SUPPRESS, ) + # Database export + export = actions.add_parser( "export", help="Generates files on disk from Athena views" ) @@ -177,6 +201,8 @@ def get_parser() -> argparse.ArgumentParser: add_verbose_argument(export) add_db_config(export) + # Aggregator upload + upload = actions.add_parser( "upload", help="Bulk uploads data to Cumulus aggregator" ) diff --git a/cumulus_library/databases.py b/cumulus_library/databases.py index c3c728d2..afc1b089 100644 --- a/cumulus_library/databases.py +++ b/cumulus_library/databases.py @@ -264,10 +264,10 @@ def create_db_backend(args: dict[str, str]) -> DatabaseBackend: args["profile"], database, ) - if load_ndjson_dir: - sys.exit( - "Loading an ndjson dir is not supported with --db-type=athena (try duckdb)" - ) + # if load_ndjson_dir: + # sys.exit( + # "Loading an ndjson dir is not supported with --db-type=athena (try duckdb)" + # ) else: raise ValueError(f"Unexpected --db-type value '{db_type}'") diff --git a/cumulus_library/enums.py b/cumulus_library/enums.py new file mode 100644 index 00000000..d65af4e4 --- /dev/null +++ b/cumulus_library/enums.py @@ -0,0 +1,17 @@ +""" Holds enums used across more than one module """ +from enum import Enum + + +class PROTECTED_TABLE_KEYWORDS(Enum): + """Tables with a pattern like '_{keyword}_' are not manually dropped.""" + + ETL = "etl" + LIB = "lib" + NLP = "nlp" + + +class PROTECTED_TABLES(Enum): + """Tables created by cumulus for persistence outside of study rebuilds""" + + STATISTICS = "lib_statistics" + TRANSACTIONS = "lib_transactions" diff --git a/cumulus_library/protected_table_builder.py b/cumulus_library/protected_table_builder.py new file mode 100644 index 00000000..6953d04a --- /dev/null +++ b/cumulus_library/protected_table_builder.py @@ -0,0 +1,58 @@ +""" Builder for creating tables for tracking state/logging changes""" +import datetime + +from cumulus_library.base_table_builder import BaseTableBuilder +from cumulus_library.enums import PROTECTED_TABLES +from cumulus_library.template_sql.templates import ( + get_ctas_empty_query, + get_create_view_query, +) + +TRANSACTIONS_COLS = ["study_name", "library_version", "status", "event_time"] +STATISTICS_COLS = [ + "study_name", + "library_version", + "table_type", + "table_name", + "view_name", + "created_on", +] + + +class ProtectedTableBuilder(BaseTableBuilder): + display_text = "Creating/updating system tables..." + + def prepare_queries(self, cursor: object, schema: str, study_name: str): + safe_timestamp = ( + datetime.datetime.now() + .replace(microsecond=0) + .isoformat() + .replace(":", "_") + .replace("-", "_") + ) + self.queries.append( + get_ctas_empty_query( + schema, + f"{study_name}__{PROTECTED_TABLES.TRANSACTIONS.value}", + # while it may seem redundant, study name is included for ease + # of constructing a view of multiple transaction tables + TRANSACTIONS_COLS, + ["varchar", "varchar", "varchar", "timestamp"], + ) + ) + self.queries.append( + get_ctas_empty_query( + schema, + f"{study_name}__{PROTECTED_TABLES.STATISTICS.value}", + # same redundancy note about study_name, and also view_name, applies here + STATISTICS_COLS, + [ + "varchar", + "varchar", + "varchar", + "varchar", + "varchar", + "timestamp", + ], + ) + ) diff --git a/cumulus_library/statistics/psm.py b/cumulus_library/statistics/psm.py index da18f13e..6d1b38cc 100644 --- a/cumulus_library/statistics/psm.py +++ b/cumulus_library/statistics/psm.py @@ -1,18 +1,22 @@ # Module for generating Propensity Score matching cohorts -import numpy as np -import pandas +import json +import os import sys + +from pathlib import PosixPath +from dataclasses import dataclass + +import pandas import toml from psmpy import PsmPy +# these imports are mimicing PsmPy imports for re-implemented functions +from psmpy.functions import cohenD +import matplotlib.pyplot as plt +import seaborn as sns -import json -from pathlib import PosixPath -from dataclasses import dataclass - -from cumulus_library.cli import StudyBuilder from cumulus_library.databases import DatabaseCursor from cumulus_library.base_table_builder import BaseTableBuilder from cumulus_library.template_sql.templates import ( @@ -53,11 +57,12 @@ class PsmBuilder(BaseTableBuilder): display_text = "Building PSM tables..." - def __init__(self, toml_config_path: str): + def __init__(self, toml_config_path: str, export_path: PosixPath): """Loads PSM job details from a PSM configuration file""" super().__init__() # We're stashing the toml path for error reporting later self.toml_path = toml_config_path + self.export_path = export_path try: with open(self.toml_path, encoding="UTF-8") as file: toml_config = toml.load(file) @@ -132,7 +137,9 @@ def _get_sampled_ids( df[dependent_variable] = is_positive return df - def _create_covariate_table(self, cursor: DatabaseCursor, schema: str): + def _create_covariate_table( + self, cursor: DatabaseCursor, schema: str, table_suffix: str + ): """Creates a covariate table from the loaded toml config""" # checks for primary & link ref being the same source_refs = list({self.config.primary_ref, self.config.count_ref} - {None}) @@ -163,23 +170,24 @@ def _create_covariate_table(self, cursor: DatabaseCursor, schema: str): # Replace table (if it exists) # TODO - replace with timestamp prepended table in future PR - drop = get_drop_view_table( - f"{self.config.pos_source_table}_sampled_ids", "TABLE" - ) - cursor.execute(drop) + # drop = get_drop_view_table( + # f"{self.config.pos_source_table}_sampled_ids", "TABLE" + # ) + # cursor.execute(drop) ctas_query = get_ctas_query_from_df( schema, - f"{self.config.pos_source_table}_sampled_ids", + f"{self.config.pos_source_table}_sampled_ids_{table_suffix}", cohort, ) self.queries.append(ctas_query) # TODO - replace with timestamp prepended table - drop = get_drop_view_table(self.config.target_table, "TABLE") - cursor.execute(drop) + # drop = get_drop_view_table(self.config.target_table, "TABLE") + # cursor.execute(drop) dataset_query = get_create_covariate_table( - target_table=self.config.target_table, + target_table=f"{self.config.target_table}_{table_suffix}", pos_source_table=self.config.pos_source_table, neg_source_table=self.config.neg_source_table, + table_suffix=table_suffix, primary_ref=self.config.primary_ref, dependent_variable=self.config.dependent_variable, join_cols_by_table=self.config.join_cols_by_table, @@ -188,9 +196,90 @@ def _create_covariate_table(self, cursor: DatabaseCursor, schema: str): ) self.queries.append(dataset_query) - def generate_psm_analysis(self, cursor: DatabaseCursor, schema: str): + def psm_plot_match( + self, + psm, + matched_entity="propensity_logit", + Title="Side by side matched controls", + Ylabel="Number of patients", + Xlabel="propensity logit", + names=["positive_cohort", "negative_cohort"], + colors=["#E69F00", "#56B4E9"], + save=True, + filename="propensity_match.png", + ): + """Plots knn match data + + This function re-implements psm.plot_match, with the only changes + allowing for specifiying a filename/location for saving plots to, + and passing in the psm object instead of assuming a call from inside + the PsmPy class. + """ + dftreat = psm.df_matched[psm.df_matched[psm.treatment] == 1] + dfcontrol = psm.df_matched[psm.df_matched[psm.treatment] == 0] + x1 = dftreat[matched_entity] + x2 = dfcontrol[matched_entity] + colors = colors + names = names + sns.set_style("white") + plt.hist([x1, x2], color=colors, label=names) + plt.legend() + plt.xlabel(Xlabel) + plt.ylabel(Ylabel) + plt.title(Title) + if save == True: + plt.savefig(filename, dpi=250) + + def psm_effect_size_plot( + self, + psm, + title="Standardized Mean differences accross covariates before and after matching", + before_color="#FCB754", + after_color="#3EC8FB", + save=False, + filename="effect_size.png", + ): + """Plots effect size of variables for positive/negative matches + + This function re-implements psm.effect_size_plot, with the only changes + allowing for specifiying a filename/location for saving plots to, + and passing in the psm object instead of assuming a call from inside + the PsmPy class. + """ + df_preds_after = psm.df_matched[[psm.treatment] + psm.xvars] + df_preds_b4 = psm.data[[psm.treatment] + psm.xvars] + df_preds_after_float = df_preds_after.astype(float) + df_preds_b4_float = df_preds_b4.astype(float) + + data = [] + for cl in psm.xvars: + data.append([cl, "before", cohenD(df_preds_b4_float, psm.treatment, cl)]) + data.append([cl, "after", cohenD(df_preds_after_float, psm.treatment, cl)]) + psm.effect_size = pandas.DataFrame( + data, columns=["Variable", "matching", "Effect Size"] + ) + sns.set_style("white") + sns_plot = sns.barplot( + data=psm.effect_size, + y="Variable", + x="Effect Size", + hue="matching", + palette=[before_color, after_color], + orient="h", + ) + sns_plot.set(title=title) + if save == True: + sns_plot.figure.savefig(filename, dpi=250, bbox_inches="tight") + + def generate_psm_analysis( + self, cursor: DatabaseCursor, schema: str, table_suffix: str + ): """Runs PSM statistics on generated tables""" - df = cursor.execute(f"select * from {self.config.target_table}").as_pandas() + cursor.execute( + f"""CREATE OR REPLACE VIEW {self.config.target_table} + AS SELECT * FROM {self.config.target_table}_{table_suffix}""" + ) + df = cursor.execute(f"SELECT * FROM {self.config.target_table}").as_pandas() symptoms_dict = self._get_symptoms_dict(self.config.classification_json) for dependent_variable, codes in symptoms_dict.items(): df[dependent_variable] = df["code"].apply(lambda x: 1 if x in codes else 0) @@ -247,7 +336,19 @@ def generate_psm_analysis(self, cursor: DatabaseCursor, schema: str): caliper=None, drop_unmatched=True, ) - + os.makedirs(self.export_path, exist_ok=True) + self.psm_plot_match( + psm, + save=True, + filename=self.export_path + / f"{self.config.target_table}_{table_suffix}_propensity_match.png", + ) + self.psm_effect_size_plot( + psm, + save=True, + filename=self.export_path + / f"{self.config.target_table}_{table_suffix}_effect_size.png", + ) except ZeroDivisionError: sys.exit( "Encountered a divide by zero error during statistical graph generation. Try increasing your sample size." @@ -257,8 +358,8 @@ def generate_psm_analysis(self, cursor: DatabaseCursor, schema: str): "Encountered a value error during KNN matching. Try increasing your sample size." ) - def prepare_queries(self, cursor: object, schema: str): - self._create_covariate_table(cursor, schema) + def prepare_queries(self, cursor: object, schema: str, table_suffix: str): + self._create_covariate_table(cursor, schema, table_suffix) def post_execution( self, @@ -266,6 +367,7 @@ def post_execution( schema: str, verbose: bool, drop_table: bool = False, + table_suffix: str = None, ): # super().execute_queries(cursor, schema, verbose, drop_table) - self.generate_psm_analysis(cursor, schema) + self.generate_psm_analysis(cursor, schema, table_suffix) diff --git a/cumulus_library/study_parser.py b/cumulus_library/study_parser.py index e2e0d112..27049dcc 100644 --- a/cumulus_library/study_parser.py +++ b/cumulus_library/study_parser.py @@ -3,6 +3,7 @@ import importlib.util import sys +from datetime import datetime from pathlib import Path, PosixPath from typing import List, Optional @@ -10,8 +11,10 @@ from rich.progress import Progress, TaskID, track +from cumulus_library import __version__ from cumulus_library.base_table_builder import BaseTableBuilder from cumulus_library.databases import DatabaseBackend, DatabaseCursor +from cumulus_library.enums import PROTECTED_TABLE_KEYWORDS, PROTECTED_TABLES from cumulus_library.errors import StudyManifestParsingError from cumulus_library.helper import ( query_console_output, @@ -19,16 +22,17 @@ parse_sql, get_progress_bar, ) +from cumulus_library.protected_table_builder import ProtectedTableBuilder +from cumulus_library.statistics.psm import PsmBuilder from cumulus_library.template_sql.templates import ( get_show_tables, get_show_views, get_drop_view_table, + get_insert_into_query, ) StrList = List[str] -RESERVED_TABLE_KEYWORDS = ["etl", "nlp", "lib"] - class StudyManifestParser: """Handles loading of study data from manifest files. @@ -38,21 +42,22 @@ class StudyManifestParser: mechanisms for IDing studies/files of interest, and for executing queries, but specifically it should never be in charge of instantiation a cursor itself - this will help to future proof against other database implementations in the - future, assuming those DBs have a PEP-249 cursor available (and this is why we - are hinting generic objects for cursors). - + future. """ _study_path = None _study_config = {} - def __init__(self, study_path: Optional[Path] = None): + def __init__( + self, study_path: Optional[Path] = None, data_path: Optional[Path] = None + ): """Instantiates a StudyManifestParser. :param study_path: A pathlib Path object, optional """ if study_path is not None: self.load_study_manifest(study_path) + self.data_path = data_path def __repr__(self): return str(self._study_config) @@ -119,6 +124,14 @@ def get_counts_builder_file_list(self) -> Optional[StrList]: sql_config = self._study_config.get("counts_builder_config", {}) return sql_config.get("file_names", []) + def get_statistics_file_list(self) -> Optional[StrList]: + """Reads the contents of the statistics_config array from the manifest + + :returns: An array of statistics toml files from the manifest, or None if not found. + """ + stats_config = self._study_config.get("statistics_config", {}) + return stats_config.get("file_names", []) + def get_export_table_list(self) -> Optional[StrList]: """Reads the contents of the export_list array from the manifest @@ -134,14 +147,14 @@ def get_export_table_list(self) -> Optional[StrList]: ) return export_table_list - def reset_export_dir(self) -> None: + def reset_data_dir(self) -> None: """ Removes exports associated with this study from the ../data_export directory. """ project_path = Path(__file__).resolve().parents[1] - path = Path(f"{str(project_path)}/data_export/{self.get_study_prefix()}/") + path = self.data_path / self.get_study_prefix() if path.exists(): - for file in path.glob("*"): + for file in path.glob("*.*"): file.unlink() # SQL related functions @@ -149,6 +162,7 @@ def clean_study( self, cursor: DatabaseCursor, schema_name: str, + stats_clean: bool = False, verbose: bool = False, prefix: str = None, ) -> List: @@ -168,12 +182,32 @@ def clean_study( else: drop_prefix = prefix display_prefix = drop_prefix + + if stats_clean: + confirm = input( + "This will remove all historical stats tables beginning in the " + f"{display_prefix} study - are you sure? (y/N)" + ) + if confirm.lower() not in ("y", "yes"): + sys.exit("Table cleaning aborted") + view_sql = get_show_views(schema_name, drop_prefix) table_sql = get_show_tables(schema_name, drop_prefix) view_table_list = [] - for query_and_type in [[view_sql, "VIEW"], [table_sql, "TABLE"]]: - cursor.execute(query_and_type[0]) - for db_row_tuple in cursor.fetchall(): + for query_and_type in [[view_sql, "VIEW"], [table_sql, "TABLE"]]: # + tuple_list = cursor.execute(query_and_type[0]).fetchall() + if ( + f"{drop_prefix}{PROTECTED_TABLES.STATISTICS.value}", + ) in tuple_list and not stats_clean: + protected_list = cursor.execute( + f"""SELECT {(query_and_type[1]).lower()}_name + FROM {drop_prefix}{PROTECTED_TABLES.STATISTICS.value} + WHERE study_name = '{display_prefix}'""" + ).fetchall() + for protected_tuple in protected_list: + if protected_tuple in tuple_list: + tuple_list.remove(protected_tuple) + for db_row_tuple in tuple_list: # this check handles athena reporting views as also being tables, # so we don't waste time dropping things that don't exist if query_and_type[1] == "TABLE": @@ -191,8 +225,11 @@ def clean_study( # study builder, and remove them from the list. for view_table in view_table_list.copy(): if any( - ((f"_{word}_") in view_table[0] or view_table[0].endswith(word)) - for word in RESERVED_TABLE_KEYWORDS + ( + (f"_{word.value}_") in view_table[0] + or view_table[0].endswith(word.value) + ) + for word in PROTECTED_TABLE_KEYWORDS ): view_table_list.remove(view_table) # We want to only show a progress bar if we are :not: printing SQL lines @@ -216,6 +253,12 @@ def clean_study( progress, task, ) + if stats_clean: + drop_query = get_drop_view_table( + f"{drop_prefix}{PROTECTED_TABLES.STATISTICS.value}", "TABLE" + ) + cursor.execute(drop_query) + return view_table_list def _execute_drop_queries( @@ -289,12 +332,26 @@ def _load_and_execute_builder( table_builder = table_builder_class() table_builder.execute_queries(cursor, schema, verbose, drop_table) - # After runnning the executor code, we'll remove - # remove it so it doesn't interfere with the next python module to + # After running the executor code, we'll remove + # it so it doesn't interfere with the next python module to # execute, since the subclass would otherwise hang around. del sys.modules[table_builder_module.__name__] del table_builder_module + def run_protected_table_builder( + self, cursor: DatabaseCursor, schema: str, verbose: bool = False + ) -> None: + """Creates protected tables for persisting selected data across runs + + :param cursor: A PEP-249 compatible cursor object + :param schema: The name of the schema to write tables to + :param verbose: toggle from progress bar to query output + """ + ptb = ProtectedTableBuilder() + ptb.execute_queries( + cursor, schema, verbose, study_name=self._study_config.get("study_prefix") + ) + def run_table_builder( self, cursor: DatabaseCursor, schema: str, verbose: bool = False ) -> None: @@ -307,11 +364,16 @@ def run_table_builder( for file in self.get_table_builder_file_list(): self._load_and_execute_builder(file, cursor, schema, verbose) - def run_counts_builder( + def run_counts_builders( self, cursor: DatabaseCursor, schema: str, verbose: bool = False ) -> None: """Loads counts modules from a manifest and executes code via BaseTableBuilder + While a count is a form of statistics, it is treated separately from other + statistics because it is, by design, always going to be static against a + given dataset, where other statistical methods may use sampling techniques + or adjustable input parameters that may need to be preserved for later review. + :param cursor: A PEP-249 compatible cursor object :param schema: The name of the schema to write tables to :param verbose: toggle from progress bar to query output @@ -319,6 +381,69 @@ def run_counts_builder( for file in self.get_counts_builder_file_list(): self._load_and_execute_builder(file, cursor, schema, verbose) + def run_statistics_builders( + self, + cursor: DatabaseCursor, + schema: str, + verbose: bool = False, + stats_build: bool = False, + data_path: PosixPath = None, + ) -> None: + """Loads statistics modules from toml definitions and executes + + :param cursor: A PEP-249 compatible cursor object + :param schema: The name of the schema to write tables to + :param verbose: toggle from progress bar to query output + """ + if not stats_build: + return + for file in self.get_statistics_file_list(): + # This open is a bit redundant with the open inside of the PSM builder, + # but we're letting it slide so that builders function similarly + # across the board + iso_timestamp = datetime.now().replace(microsecond=0).isoformat() + safe_timestamp = iso_timestamp.replace(":", "_").replace("-", "_") + toml_path = Path(f"{self._study_path}/{file}") + with open(toml_path, encoding="UTF-8") as file: + config = toml.load(file) + config_type = config["config_type"] + target_table = config["target_table"] + if config_type == "psm": + builder = PsmBuilder( + toml_path, self.data_path / f"{self.get_study_prefix()}/psm" + ) + else: + raise StudyManifestParsingError( + f"{toml_path} references an invalid statistics type {config_type}." + ) + builder.execute_queries( + cursor, schema, verbose, table_suffix=safe_timestamp + ) + + insert_query = get_insert_into_query( + f"{self.get_study_prefix()}__{PROTECTED_TABLES.STATISTICS.value}", + [ + "study_name", + "library_version", + "table_type", + "table_name", + "view_name", + "created_on", + ], + [ + [ + self.get_study_prefix(), + __version__, + config_type, + f"{target_table}_{safe_timestamp}", + target_table, + iso_timestamp, + ] + ], + ) + cursor.execute(insert_query) + # self._load_and_execute_builder(file, cursor, schema, verbose) + def run_single_table_builder( self, cursor: DatabaseCursor, schema: str, name: str, verbose: bool = False ): @@ -395,8 +520,8 @@ def _execute_build_queries( "should be in the first line of the query.", ) if any( - f" {self.get_study_prefix()}__{word}_" in create_line - for word in RESERVED_TABLE_KEYWORDS + f" {self.get_study_prefix()}__{word.value}_" in create_line + for word in PROTECTED_TABLE_KEYWORDS ): self._query_error( query, @@ -404,7 +529,7 @@ def _execute_build_queries( "immediately after the study prefix. Please rename this table so " "that is does not begin with one of these special words " "immediately after the double undescore.\n" - f"Reserved words: {str(RESERVED_TABLE_KEYWORDS)}", + f"Reserved words: {str(word.value for word in PROTECTED_TABLE_KEYWORDS)}", ) if create_line.count("__") > 1: self._query_error( @@ -439,7 +564,7 @@ def export_study(self, db: DatabaseBackend, data_path: PosixPath) -> List: :param db: A database backend :returns: list of executed queries (for unit testing only) """ - self.reset_export_dir() + self.reset_data_dir() queries = [] for table in track( self.get_export_table_list(), diff --git a/cumulus_library/template_sql/ctas_empty.sql.jinja b/cumulus_library/template_sql/ctas_empty.sql.jinja index f65a1a78..c13f1df4 100644 --- a/cumulus_library/template_sql/ctas_empty.sql.jinja +++ b/cumulus_library/template_sql/ctas_empty.sql.jinja @@ -1,9 +1,10 @@ -CREATE TABLE "{{ schema_name }}"."{{ table_name }}" AS ( +CREATE TABLE IF NOT EXISTS "{{ schema_name }}"."{{ table_name }}" +AS ( SELECT * FROM ( VALUES ( - {%- for col in table_cols -%} - cast(NULL AS varchar) + {%- for type in table_cols_types -%} + cast(NULL AS {{ type }}) {%- if not loop.last -%} , {%- endif -%} diff --git a/cumulus_library/template_sql/statistics/psm_create_covariate_table.sql.jinja b/cumulus_library/template_sql/statistics/psm_create_covariate_table.sql.jinja index 8e5d2558..2ebde5bc 100644 --- a/cumulus_library/template_sql/statistics/psm_create_covariate_table.sql.jinja +++ b/cumulus_library/template_sql/statistics/psm_create_covariate_table.sql.jinja @@ -33,7 +33,7 @@ CREATE TABLE {{ target_table }} AS ( {%- endif -%} {{ select_column_or_alias(join_cols_by_table) }} {{ neg_source_table }}.code - FROM "{{ pos_source_table }}_sampled_ids" AS sample_cohort, + FROM "{{ pos_source_table }}_sampled_ids_{{table_suffix}}" AS sample_cohort, "{{ neg_source_table }}", {%- for key in join_cols_by_table %} "{{ key }}" diff --git a/cumulus_library/template_sql/statistics/psm_templates.py b/cumulus_library/template_sql/statistics/psm_templates.py index 71082e01..928c82e7 100644 --- a/cumulus_library/template_sql/statistics/psm_templates.py +++ b/cumulus_library/template_sql/statistics/psm_templates.py @@ -44,6 +44,7 @@ def get_create_covariate_table( target_table: str, pos_source_table: str, neg_source_table: str, + table_suffix: str, primary_ref: str, dependent_variable: str, join_cols_by_table: dict, @@ -75,6 +76,7 @@ def get_create_covariate_table( target_table=target_table, pos_source_table=pos_source_table, neg_source_table=neg_source_table, + table_suffix=table_suffix, primary_ref=primary_ref, dependent_variable=dependent_variable, count_ref=count_ref, diff --git a/cumulus_library/template_sql/templates.py b/cumulus_library/template_sql/templates.py index cbacece1..330754e0 100644 --- a/cumulus_library/template_sql/templates.py +++ b/cumulus_library/template_sql/templates.py @@ -203,7 +203,10 @@ def get_ctas_query_from_df(schema_name: str, table_name: str, df: DataFrame) -> def get_ctas_empty_query( - schema_name: str, table_name: str, table_cols: List[str] + schema_name: str, + table_name: str, + table_cols: List[str], + table_cols_types: List[str] = [], ) -> str: """Generates a create table as query for initializing an empty table @@ -215,13 +218,18 @@ def get_ctas_empty_query( :param schema_name: The athena schema to create the table in :param table_name: The name of the athena table to create :param table_cols: Comma deleniated column names, i.e. ['first,second'] + :param table_cols: Allows specifying a data type per column (default: all varchar) """ path = Path(__file__).parent + if table_cols_types == []: + for col in table_cols: + table_cols_types.append("varchar") with open(f"{path}/ctas_empty.sql.jinja") as ctas_empty: return Template(ctas_empty.read()).render( schema_name=schema_name, table_name=table_name, table_cols=table_cols, + table_cols_types=table_cols_types, ) diff --git a/docs/statistics/propensity-score-matching.md b/docs/statistics/propensity-score-matching.md index a002e3a4..aebbd322 100644 --- a/docs/statistics/propensity-score-matching.md +++ b/docs/statistics/propensity-score-matching.md @@ -51,6 +51,10 @@ details on the expectations of each value. # database. We recommend that you only attempt to use this after you have decided # on the first draft of your cohort selection criteria +# config_type should always be "psm" - we use this to distinguish from other +# statistic type runs +config_type = "psm" + # classification_json should reference a file in the same directory as this config, # which matches a category to a set of ICD codes. As an example, you could use # an existing guide like DSM5 classifications for this, but you could also use diff --git a/tests/test_data/duckdb_data/duck.db b/tests/test_data/duckdb_data/duck.db deleted file mode 100644 index ababe675cb4d91b04d53fbe3248702064e5d6695..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeI#F$#lF3;<9UkKi>vqO|32Lc6&MoyFEOc!_Skk5kX#;zin7L1>qP@Un#j5?-b> zQ~MZS&-rpa*H!Xe40(v5*zPUw$9U*Zw=Qn?s1P7PfB*pk1PBlyK!5-N0{;_8v*oP! xDwb6l0RjXF5FkK+009C72oNB!M*;u;9