Skip to content

Commit

Permalink
PSM CLI, table persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Dec 18, 2023
1 parent 7d36170 commit 8ee5515
Show file tree
Hide file tree
Showing 20 changed files with 574 additions and 116 deletions.
19 changes: 14 additions & 5 deletions cumulus_library/base_table_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" abstract base for python-based study executors """
import re
import sys

from abc import ABC, abstractmethod
from typing import final
Expand All @@ -21,15 +22,16 @@ def __init__(self):
self.queries = []

@abstractmethod
def prepare_queries(self, cursor: object, schema: str):
def prepare_queries(self, cursor: object, schema: str, *args, **kwargs):
"""Main entrypoint for python table builders.
When completed, prepare_queries should populate self.queries with sql
statements to execute. This array will the be read by run queries.
:param cursor: A PEP-249 compatible cursor
:param schema: A schema name
:param verbose: toggle for verbose output mode
:param db_type: The db system being used (only relevant for db-specific
query construction)
"""
raise NotImplementedError

Expand All @@ -40,6 +42,8 @@ def execute_queries(
schema: str,
verbose: bool,
drop_table: bool = False,
*args,
**kwargs,
):
"""Executes queries set up by a prepare_queries call
Expand All @@ -48,7 +52,7 @@ def execute_queries(
:param verbose: toggle for verbose output mode
:param drop_table: drops any tables found in prepared_queries results
"""
self.prepare_queries(cursor, schema)
self.prepare_queries(cursor, schema, *args, **kwargs)
if drop_table:
table_names = []
for query in self.queries:
Expand All @@ -73,15 +77,20 @@ def execute_queries(
)
for query in self.queries:
query_console_output(verbose, query, progress, task)
cursor.execute(query)
self.post_execution(cursor, schema, verbose, drop_table)
try:
cursor.execute(query)
except Exception as e:
sys.exit(e)
self.post_execution(cursor, schema, verbose, drop_table, *args, **kwargs)

def post_execution(
self,
cursor: DatabaseCursor,
schema: str,
verbose: bool,
drop_table: bool = False,
*args,
**kwargs,
):
"""Hook for any additional actions to run after execute_queries"""
pass
Expand Down
152 changes: 102 additions & 50 deletions cumulus_library/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import sysconfig

from datetime import datetime
from pathlib import Path, PosixPath
from typing import Dict, List, Optional

Expand All @@ -19,57 +20,50 @@
DatabaseBackend,
create_db_backend,
)
from cumulus_library.enums import PROTECTED_TABLES
from cumulus_library.protected_table_builder import TRANSACTIONS_COLS
from cumulus_library.study_parser import StudyManifestParser
from cumulus_library.template_sql.templates import get_insert_into_query
from cumulus_library.upload import upload_files


# ** Don't delete! **
# This class isn't used in the rest of the code,
# but it is used manually as a quick & dirty alternative to the CLI.
class CumulusEnv: # pylint: disable=too-few-public-methods
"""
Wrapper for Cumulus Environment vars.
Simplifies connections to StudyBuilder without requiring CLI parsing.
"""

def __init__(self):
self.region = os.environ.get("CUMULUS_LIBRARY_REGION", "us-east-1")
self.workgroup = os.environ.get("CUMULUS_LIBRARY_WORKGROUP", "cumulus")
self.profile = os.environ.get("CUMULUS_LIBRARY_PROFILE")
self.schema_name = os.environ.get("CUMULUS_LIBRARY_DATABASE")

def get_study_builder(self):
"""Convenience method for getting athena args from environment"""
db = AthenaDatabaseBackend(
self.region, self.workgroup, self.profile, self.schema_name
)
return StudyBuilder(db)


class StudyBuilder:
"""Class for managing Athena cursors and executing Cumulus queries"""

verbose = False
schema_name = None

def __init__(self, db: DatabaseBackend):
def __init__(self, db: DatabaseBackend, data_path: str):
self.db = db
self.data_path = data_path
self.cursor = db.cursor()
self.schema_name = db.schema_name

def reset_data_path(self, study: PosixPath) -> None:
"""
Removes existing exports from a study's local data dir
"""
project_path = Path(__file__).resolve().parents[1]
path = Path(f"{str(project_path)}/data_export/{study}/")
if path.exists():
for file in path.glob("*"):
file.unlink()
def update_transactions(self, prefix: str, status: str):
self.cursor.execute(
get_insert_into_query(
f"{prefix}__{PROTECTED_TABLES.TRANSACTIONS.value}",
TRANSACTIONS_COLS,
[
[
prefix,
__version__,
status,
datetime.now().replace(microsecond=0).isoformat(),
]
],
)
)

### Creating studies

def clean_study(self, targets: List[str], study_dict, prefix=False) -> None:
def clean_study(
self,
targets: List[str],
study_dict: Dict,
stats_clean: bool,
prefix: bool = False,
) -> None:
"""Removes study table/views from Athena.
While this is usually not required, since it it done as part of a build,
Expand All @@ -86,25 +80,69 @@ def clean_study(self, targets: List[str], study_dict, prefix=False) -> None:
if prefix:
parser = StudyManifestParser()
parser.clean_study(
self.cursor, self.schema_name, self.verbose, prefix=target
self.cursor,
self.schema_name,
verbose=self.verbose,
stats_clean=stats_clean,
prefix=target,
)
else:
parser = StudyManifestParser(study_dict[target])
parser.clean_study(self.cursor, self.schema_name, self.verbose)
parser.clean_study(
self.cursor,
self.schema_name,
verbose=self.verbose,
stats_clean=stats_clean,
)

def clean_and_build_study(
self, target: PosixPath, continue_from: str = None
self,
target: PosixPath,
export_dir: PosixPath,
stats_build: bool,
continue_from: str = None,
) -> None:
"""Recreates study views/tables
:param target: A PosixPath to the study directory
"""
studyparser = StudyManifestParser(target)
if not continue_from:
studyparser.clean_study(self.cursor, self.schema_name, self.verbose)
studyparser.run_table_builder(self.cursor, self.schema_name, self.verbose)
studyparser.build_study(self.cursor, self.verbose, continue_from)
studyparser.run_counts_builder(self.cursor, self.schema_name, self.verbose)

studyparser = StudyManifestParser(target, self.data_path)
try:
if not continue_from:
studyparser.run_protected_table_builder(
self.cursor, self.schema_name, verbose=self.verbose
)
self.update_transactions(studyparser.get_study_prefix(), "started")
cleaned_tables = studyparser.clean_study(
self.cursor,
self.schema_name,
verbose=self.verbose,
stats_clean=False,
)
# If the study hasn't been created before, force stats table generation
if len(cleaned_tables) == 0:
stats_build = True
studyparser.run_table_builder(
self.cursor, self.schema_name, verbose=self.verbose
)
else:
self.update_transactions(studyparser.get_study_prefix(), "resumed")
studyparser.build_study(self.cursor, self.verbose, continue_from)
studyparser.run_counts_builders(
self.cursor, self.schema_name, verbose=self.verbose
)
studyparser.run_statistics_builders(
self.cursor,
self.schema_name,
verbose=self.verbose,
stats_build=stats_build,
data_path=self.data_path,
)
self.update_transactions(studyparser.get_study_prefix(), "finished")
except Exception as e:
self.update_transactions(studyparser.get_study_prefix(), "error")
raise e

def run_single_table_builder(
self, target: PosixPath, table_builder_name: str
Expand All @@ -118,7 +156,7 @@ def run_single_table_builder(
self.cursor, self.schema_name, table_builder_name, self.verbose
)

def clean_and_build_all(self, study_dict: Dict) -> None:
def clean_and_build_all(self, study_dict: Dict, export_dir: PosixPath) -> None:
"""Builds views for all studies.
NOTE: By design, this method will always exclude the `template` study dir,
Expand All @@ -129,7 +167,7 @@ def clean_and_build_all(self, study_dict: Dict) -> None:
study_dict = dict(study_dict)
study_dict.pop("template")
for precursor_study in ["vocab", "core"]:
self.clean_and_build_study(study_dict[precursor_study])
self.clean_and_build_study(study_dict[precursor_study], export_dir)
study_dict.pop(precursor_study)
for key in study_dict:
self.clean_and_build_study(study_dict[key])
Expand All @@ -142,7 +180,7 @@ def export_study(self, target: PosixPath, data_path: PosixPath) -> None:
"""
if data_path is None:
sys.exit("Missing destination - please provide a path argument.")
studyparser = StudyManifestParser(target)
studyparser = StudyManifestParser(target, data_path)
studyparser.export_study(self.db, data_path)

def export_all(self, study_dict: Dict, data_path: PosixPath):
Expand Down Expand Up @@ -229,7 +267,7 @@ def run_cli(args: Dict):
# all other actions require connecting to AWS
else:
db_backend = create_db_backend(args)
builder = StudyBuilder(db_backend)
builder = StudyBuilder(db_backend, args["data_path"])
if args["verbose"]:
builder.verbose = True
print("Testing connection to database...")
Expand All @@ -250,20 +288,26 @@ def run_cli(args: Dict):
builder.clean_study(
args["target"],
study_dict,
args["stats_clean"],
args["prefix"],
)
elif args["action"] == "build":
if "all" in args["target"]:
builder.clean_and_build_all(study_dict)
builder.clean_and_build_all(
study_dict, args["export_dir"], args["stats_build"]
)
else:
for target in args["target"]:
if args["builder"]:
builder.run_single_table_builder(
study_dict[target], args["builder"]
study_dict[target], args["builder"], args["stats_build"]
)
else:
builder.clean_and_build_study(
study_dict[target], continue_from=args["continue_from"]
study_dict[target],
args["data_path"],
args["stats_build"],
continue_from=args["continue_from"],
)

elif args["action"] == "export":
Expand All @@ -273,6 +317,13 @@ def run_cli(args: Dict):
for target in args["target"]:
builder.export_study(study_dict[target], args["data_path"])

# print(set(builder.cursor.execute("""SELECT table_name
# FROM information_schema.tables
# where table_name ilike '%_lib_%'
# or table_name ilike '%_psm_%'""").fetchall()))
# print(builder.cursor.execute("select * from psm_test__lib_statistics").fetchall())
# print(builder.cursor.execute("select * from psm_test__lib_transactions").fetchall())
# print(builder.cursor.execute("select * from psm_test__psm_encounter_covariate").fetchall())
db_backend.close()
# returning the builder for ease of unit testing
return builder
Expand Down Expand Up @@ -337,6 +388,7 @@ def main(cli_args=None):

if args.get("data_path"):
args["data_path"] = get_abs_posix_path(args["data_path"])

return run_cli(args)


Expand Down
Loading

0 comments on commit 8ee5515

Please sign in to comment.