Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PSM CLI, table persistence #160

Merged
merged 13 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ schema_name = test_schema
source_table = source_table
source_id = source_id
table_cols = ["a","b"]
table_cols_types = ["varchar", "varchar"]
table_name = test_table
table_suffix = 2024_01_01_11_11_11
target_col_prefix = prefix
target_table = target_table
unnests = [{"source col": "g", "table_alias": "i", "row_alias":"j"}, {"source col": "k", "table_alias": "l", "row_alias":"m"},]
Expand Down
17 changes: 12 additions & 5 deletions cumulus_library/base_table_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" abstract base for python-based study executors """
import re
import sys

from abc import ABC, abstractmethod
from typing import final
Expand All @@ -21,15 +22,14 @@ def __init__(self):
self.queries = []

@abstractmethod
def prepare_queries(self, cursor: object, schema: str):
def prepare_queries(self, cursor: object, schema: str, *args, **kwargs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mentioned this in passing, but I wanted to call attention to 'you can now include arbitrary args when extending a tablebuilder' as maybe relevant for metrics work.

"""Main entrypoint for python table builders.

When completed, prepare_queries should populate self.queries with sql
statements to execute. This array will the be read by run queries.

:param cursor: A PEP-249 compatible cursor
:param schema: A schema name
:param verbose: toggle for verbose output mode
"""
raise NotImplementedError

Expand All @@ -39,7 +39,9 @@ def execute_queries(
cursor: DatabaseCursor,
schema: str,
verbose: bool,
*args,
drop_table: bool = False,
**kwargs,
):
"""Executes queries set up by a prepare_queries call

Expand All @@ -48,7 +50,7 @@ def execute_queries(
:param verbose: toggle for verbose output mode
:param drop_table: drops any tables found in prepared_queries results
"""
self.prepare_queries(cursor, schema)
self.prepare_queries(cursor, schema, *args, **kwargs)
if drop_table:
table_names = []
for query in self.queries:
Expand All @@ -73,15 +75,20 @@ def execute_queries(
)
for query in self.queries:
query_console_output(verbose, query, progress, task)
cursor.execute(query)
self.post_execution(cursor, schema, verbose, drop_table)
try:
cursor.execute(query)
except Exception as e: # pylint: disable=broad-exception-caught
sys.exit(e)
self.post_execution(cursor, schema, verbose, drop_table, *args, **kwargs)

def post_execution(
self,
cursor: DatabaseCursor,
schema: str,
verbose: bool,
*args,
drop_table: bool = False,
**kwargs,
):
"""Hook for any additional actions to run after execute_queries"""
pass
Expand Down
141 changes: 90 additions & 51 deletions cumulus_library/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import sysconfig

from datetime import datetime
from pathlib import Path, PosixPath
from typing import Dict, List, Optional

Expand All @@ -15,61 +16,53 @@
from cumulus_library import __version__
from cumulus_library.cli_parser import get_parser
from cumulus_library.databases import (
AthenaDatabaseBackend,
DatabaseBackend,
create_db_backend,
)
from cumulus_library.enums import ProtectedTables
from cumulus_library.protected_table_builder import TRANSACTIONS_COLS
from cumulus_library.study_parser import StudyManifestParser
from cumulus_library.template_sql.templates import get_insert_into_query
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
from cumulus_library.upload import upload_files


# ** Don't delete! **
# This class isn't used in the rest of the code,
# but it is used manually as a quick & dirty alternative to the CLI.
class CumulusEnv: # pylint: disable=too-few-public-methods
"""
Wrapper for Cumulus Environment vars.
Simplifies connections to StudyBuilder without requiring CLI parsing.
"""

def __init__(self):
self.region = os.environ.get("CUMULUS_LIBRARY_REGION", "us-east-1")
self.workgroup = os.environ.get("CUMULUS_LIBRARY_WORKGROUP", "cumulus")
self.profile = os.environ.get("CUMULUS_LIBRARY_PROFILE")
self.schema_name = os.environ.get("CUMULUS_LIBRARY_DATABASE")

def get_study_builder(self):
"""Convenience method for getting athena args from environment"""
db = AthenaDatabaseBackend(
self.region, self.workgroup, self.profile, self.schema_name
)
return StudyBuilder(db)


dogversioning marked this conversation as resolved.
Show resolved Hide resolved
class StudyBuilder:
"""Class for managing Athena cursors and executing Cumulus queries"""

verbose = False
schema_name = None

def __init__(self, db: DatabaseBackend):
def __init__(self, db: DatabaseBackend, data_path: str):
self.db = db
self.data_path = data_path
self.cursor = db.cursor()
self.schema_name = db.schema_name

def reset_data_path(self, study: PosixPath) -> None:
"""
Removes existing exports from a study's local data dir
"""
project_path = Path(__file__).resolve().parents[1]
path = Path(f"{str(project_path)}/data_export/{study}/")
if path.exists():
for file in path.glob("*"):
file.unlink()
def update_transactions(self, prefix: str, status: str):
self.cursor.execute(
get_insert_into_query(
f"{prefix}__{ProtectedTables.TRANSACTIONS.value}",
TRANSACTIONS_COLS,
[
[
prefix,
__version__,
status,
datetime.now().replace(microsecond=0).isoformat(),
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
]
],
)
)

### Creating studies

def clean_study(self, targets: List[str], study_dict, prefix=False) -> None:
def clean_study(
self,
targets: List[str],
study_dict: Dict,
stats_clean: bool,
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
prefix: bool = False,
) -> None:
"""Removes study table/views from Athena.

While this is usually not required, since it it done as part of a build,
Expand All @@ -86,25 +79,68 @@ def clean_study(self, targets: List[str], study_dict, prefix=False) -> None:
if prefix:
parser = StudyManifestParser()
parser.clean_study(
self.cursor, self.schema_name, self.verbose, prefix=target
self.cursor,
self.schema_name,
verbose=self.verbose,
stats_clean=stats_clean,
prefix=target,
)
else:
parser = StudyManifestParser(study_dict[target])
parser.clean_study(self.cursor, self.schema_name, self.verbose)
parser.clean_study(
self.cursor,
self.schema_name,
verbose=self.verbose,
stats_clean=stats_clean,
)

def clean_and_build_study(
self, target: PosixPath, continue_from: str = None
self,
target: PosixPath,
stats_build: bool,
continue_from: str = None,
) -> None:
"""Recreates study views/tables

:param target: A PosixPath to the study directory
"""
studyparser = StudyManifestParser(target)
if not continue_from:
studyparser.clean_study(self.cursor, self.schema_name, self.verbose)
studyparser.run_table_builder(self.cursor, self.schema_name, self.verbose)
studyparser.build_study(self.cursor, self.verbose, continue_from)
studyparser.run_counts_builder(self.cursor, self.schema_name, self.verbose)
studyparser = StudyManifestParser(target, self.data_path)
try:
if not continue_from:
studyparser.run_protected_table_builder(
self.cursor, self.schema_name, verbose=self.verbose
)
self.update_transactions(studyparser.get_study_prefix(), "started")
cleaned_tables = studyparser.clean_study(
self.cursor,
self.schema_name,
verbose=self.verbose,
stats_clean=False,
)
# If the study hasn't been created before, force stats table generation
if len(cleaned_tables) == 0:
stats_build = True
studyparser.run_table_builder(
self.cursor, self.schema_name, verbose=self.verbose
)
else:
self.update_transactions(studyparser.get_study_prefix(), "resumed")
studyparser.build_study(self.cursor, self.verbose, continue_from)
studyparser.run_counts_builders(
self.cursor, self.schema_name, verbose=self.verbose
)
studyparser.run_statistics_builders(
self.cursor,
self.schema_name,
verbose=self.verbose,
stats_build=stats_build,
)
self.update_transactions(studyparser.get_study_prefix(), "finished")
except SystemExit as e:
raise e
except Exception as e:
self.update_transactions(studyparser.get_study_prefix(), "error")
raise e

def run_single_table_builder(
self, target: PosixPath, table_builder_name: str
Expand All @@ -118,7 +154,7 @@ def run_single_table_builder(
self.cursor, self.schema_name, table_builder_name, self.verbose
)

def clean_and_build_all(self, study_dict: Dict) -> None:
def clean_and_build_all(self, study_dict: Dict, stats_build: bool) -> None:
"""Builds views for all studies.

NOTE: By design, this method will always exclude the `template` study dir,
Expand All @@ -129,10 +165,10 @@ def clean_and_build_all(self, study_dict: Dict) -> None:
study_dict = dict(study_dict)
study_dict.pop("template")
for precursor_study in ["vocab", "core"]:
self.clean_and_build_study(study_dict[precursor_study])
self.clean_and_build_study(study_dict[precursor_study], stats_build)
study_dict.pop(precursor_study)
for key in study_dict:
self.clean_and_build_study(study_dict[key])
self.clean_and_build_study(study_dict[key], stats_build)

### Data exporters
def export_study(self, target: PosixPath, data_path: PosixPath) -> None:
Expand All @@ -142,7 +178,7 @@ def export_study(self, target: PosixPath, data_path: PosixPath) -> None:
"""
if data_path is None:
sys.exit("Missing destination - please provide a path argument.")
studyparser = StudyManifestParser(target)
studyparser = StudyManifestParser(target, data_path)
studyparser.export_study(self.db, data_path)

def export_all(self, study_dict: Dict, data_path: PosixPath):
Expand Down Expand Up @@ -229,7 +265,7 @@ def run_cli(args: Dict):
# all other actions require connecting to AWS
else:
db_backend = create_db_backend(args)
builder = StudyBuilder(db_backend)
builder = StudyBuilder(db_backend, data_path=args.get("data_path", None))
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
if args["verbose"]:
builder.verbose = True
print("Testing connection to database...")
Expand All @@ -250,11 +286,12 @@ def run_cli(args: Dict):
builder.clean_study(
args["target"],
study_dict,
args["stats_clean"],
args["prefix"],
)
elif args["action"] == "build":
if "all" in args["target"]:
builder.clean_and_build_all(study_dict)
builder.clean_and_build_all(study_dict, args["stats_build"])
else:
for target in args["target"]:
if args["builder"]:
Expand All @@ -263,7 +300,9 @@ def run_cli(args: Dict):
)
else:
builder.clean_and_build_study(
study_dict[target], continue_from=args["continue_from"]
study_dict[target],
args["stats_build"],
continue_from=args["continue_from"],
)

elif args["action"] == "export":
Expand Down
27 changes: 26 additions & 1 deletion cumulus_library/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def get_parser() -> argparse.ArgumentParser:
dest="action",
)

# Study creation

create = actions.add_parser(
"create", help="Create a study instance from a template"
)
Expand All @@ -135,6 +137,8 @@ def get_parser() -> argparse.ArgumentParser:
),
)

# Database cleaning

clean = actions.add_parser(
"clean", help="Removes tables & views beginning with '[target]__' from Athena"
)
Expand All @@ -143,12 +147,20 @@ def get_parser() -> argparse.ArgumentParser:
add_study_dir_argument(clean)
add_verbose_argument(clean)
add_db_config(clean)
clean.add_argument(
"--statistics",
action="store_true",
help="Remove artifacts of previous statistics runs",
dest="stats_clean",
)
clean.add_argument(
"--prefix",
action="store_true",
help=argparse.SUPPRESS,
)

# Database building

build = actions.add_parser(
"build",
help="Removes and recreates Athena tables & views for specified studies",
Expand All @@ -158,7 +170,16 @@ def get_parser() -> argparse.ArgumentParser:
add_study_dir_argument(build)
add_verbose_argument(build)
add_db_config(build)

add_data_path_argument(build)
build.add_argument(
"--statistics",
Comment on lines +174 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be value in offering a way to build statistics outside of a study build? (or clean them?) Like a statistics mode, akin to build and clean modes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we have one of those for the table builder class - i'm not going to do it here, but #161

action="store_true",
help=(
"Force regenerating statistics data from latest dataset. "
"Stats are created by default when study is initially run"
),
dest="stats_build",
)
build.add_argument(
"--load-ndjson-dir", help="Load ndjson files from this folder", metavar="DIR"
)
Expand All @@ -168,6 +189,8 @@ def get_parser() -> argparse.ArgumentParser:
help=argparse.SUPPRESS,
)

# Database export

export = actions.add_parser(
"export", help="Generates files on disk from Athena views"
)
Expand All @@ -177,6 +200,8 @@ def get_parser() -> argparse.ArgumentParser:
add_verbose_argument(export)
add_db_config(export)

# Aggregator upload

upload = actions.add_parser(
"upload", help="Bulk uploads data to Cumulus aggregator"
)
Expand Down
Loading
Loading