Skip to content

Commit

Permalink
CLI sql generation (#170)
Browse files Browse the repository at this point in the history
* CLI sql generation

* PR feedback
  • Loading branch information
dogversioning authored Jan 31, 2024
1 parent 178295f commit 820e1a9
Show file tree
Hide file tree
Showing 17 changed files with 2,402 additions and 79 deletions.
15 changes: 11 additions & 4 deletions cumulus_library/base_table_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" abstract base for python-based study executors """

import pathlib
import re
import sys

Expand Down Expand Up @@ -102,20 +103,26 @@ def post_execution(
"""Hook for any additional actions to run after execute_queries"""
pass

def comment_queries(self):
def comment_queries(self, doc_str=None):
"""Convenience method for annotating outputs of template generators to disk"""
commented_queries = ["-- noqa: disable=all"]
if doc_str:
commented_queries.append(doc_str)
commented_queries.append(
"\n-- ###########################################################\n"
)
for query in self.queries:
commented_queries.append(query)
commented_queries.append(
"\n-- ###########################################################"
"\n-- ###########################################################\n"
)
commented_queries.pop()
self.queries = commented_queries

def write_queries(self, filename: str = "output.sql"):
def write_queries(self, path: pathlib.Path = pathlib.Path.cwd() / "output.sql"):
path.parents[0].mkdir(parents=True, exist_ok=True)
"""writes all queries constructed by prepare_queries to disk"""
with open(filename, "w", encoding="utf-8") as file:
with open(path, "w", encoding="utf-8") as file:
for query in self.queries:
file.write(query)
file.write("\n")
147 changes: 86 additions & 61 deletions cumulus_library/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,36 @@

import json
import os
import pathlib
import sys
import sysconfig

from pathlib import Path, PosixPath
from typing import Dict, List, Optional

from rich.console import Console
from rich.table import Table
from typing import Dict, List, Optional

from cumulus_library import __version__, errors, helper
from cumulus_library.cli_parser import get_parser
from cumulus_library.databases import (
DatabaseBackend,
create_db_backend,
import rich

from cumulus_library import (
__version__,
cli_parser,
databases,
enums,
errors,
helper,
protected_table_builder,
study_parser,
upload,
)
from cumulus_library.enums import ProtectedTables
from cumulus_library.protected_table_builder import TRANSACTIONS_COLS
from cumulus_library.study_parser import StudyManifestParser
from cumulus_library.template_sql.templates import get_insert_into_query
from cumulus_library.upload import upload_files
from cumulus_library.template_sql import templates


class StudyBuilder:
"""Class for managing Athena cursors and executing Cumulus queries"""
class StudyRunner:
"""Class for managing cursors and executing Cumulus queries"""

verbose = False
schema_name = None

def __init__(self, db: DatabaseBackend, data_path: str):
def __init__(self, db: databases.DatabaseBackend, data_path: str):
self.db = db
self.data_path = data_path
self.cursor = db.cursor()
Expand All @@ -40,9 +41,9 @@ def __init__(self, db: DatabaseBackend, data_path: str):
def update_transactions(self, prefix: str, status: str):
"""Adds a record to a study's transactions table"""
self.cursor.execute(
get_insert_into_query(
f"{prefix}__{ProtectedTables.TRANSACTIONS.value}",
TRANSACTIONS_COLS,
templates.get_insert_into_query(
f"{prefix}__{enums.ProtectedTables.TRANSACTIONS.value}",
protected_table_builder.TRANSACTIONS_COLS,
[
[
prefix,
Expand Down Expand Up @@ -83,7 +84,7 @@ def clean_study(
)
for target in targets:
if prefix:
parser = StudyManifestParser()
parser = study_parser.StudyManifestParser()
parser.clean_study(
self.cursor,
self.schema_name,
Expand All @@ -92,7 +93,7 @@ def clean_study(
prefix=target,
)
else:
parser = StudyManifestParser(study_dict[target])
parser = study_parser.StudyManifestParser(study_dict[target])
parser.clean_study(
self.cursor,
self.schema_name,
Expand All @@ -102,18 +103,18 @@ def clean_study(

def clean_and_build_study(
self,
target: PosixPath,
target: pathlib.Path,
*,
stats_build: bool,
continue_from: str = None,
) -> None:
"""Recreates study views/tables
:param target: A PosixPath to the study directory
:param target: A path to the study directory
:param stats_build: if True, forces creation of new stats tables
:keyword continue_from: Restart a run from a specific sql file (for dev only)
"""
studyparser = StudyManifestParser(target, self.data_path)
studyparser = study_parser.StudyManifestParser(target, self.data_path)
try:
if not continue_from:
studyparser.run_protected_table_builder(
Expand Down Expand Up @@ -159,14 +160,14 @@ def clean_and_build_study(
raise e

def run_single_table_builder(
self, target: PosixPath, table_builder_name: str
self, target: pathlib.Path, table_builder_name: str
) -> None:
"""Runs a single table builder
:param target: A PosixPath to the study directory
:param target: A path to the study directory
:param table_builder_name: a builder file referenced in the study's manifest
"""
studyparser = StudyManifestParser(target)
studyparser = study_parser.StudyManifestParser(target)
studyparser.run_single_table_builder(
self.cursor,
self.schema_name,
Expand All @@ -181,7 +182,7 @@ def clean_and_build_all(self, study_dict: Dict, stats_build: bool) -> None:
NOTE: By design, this method will always exclude the `template` study dir,
since 99% of the time you don't need a live copy in the database.
:param study_dict: A dict of PosixPaths
:param study_dict: A dict of paths
:param stats_build: if True, regen stats tables
"""
study_dict = dict(study_dict)
Expand All @@ -195,34 +196,50 @@ def clean_and_build_all(self, study_dict: Dict, stats_build: bool) -> None:
self.clean_and_build_study(study_dict[key], stats_build=stats_build)

### Data exporters
def export_study(self, target: PosixPath, data_path: PosixPath) -> None:
def export_study(self, target: pathlib.Path, data_path: pathlib.Path) -> None:
"""Exports aggregates defined in a manifest
:param target: A PosixPath to the study directory
:param target: A path to the study directory
"""
if data_path is None:
sys.exit("Missing destination - please provide a path argument.")
studyparser = StudyManifestParser(target, data_path)
studyparser = study_parser.StudyManifestParser(target, data_path)
studyparser.export_study(self.db, data_path)

def export_all(self, study_dict: Dict, data_path: PosixPath):
def export_all(self, study_dict: Dict, data_path: pathlib.Path):
"""Exports all defined count tables to disk"""
for key in study_dict.keys():
self.export_study(study_dict[key], data_path)

def generate_study_sql(
self,
target: pathlib.Path,
) -> None:
"""Materializes study sql from templates
def get_abs_posix_path(path: str) -> PosixPath:
:param target: A path to the study directory
"""
studyparser = study_parser.StudyManifestParser(target)
studyparser.run_generate_sql(
self.cursor,
self.schema_name,
verbose=self.verbose,
parser=self.db.parser(),
)


def get_abs_posix_path(path: str) -> pathlib.Path:
"""Convenience method for handling abs vs rel paths"""
if path[0] == "/":
return Path(path)
return pathlib.Path(path)
else:
return Path(Path.cwd(), path)
return pathlib.Path(pathlib.Path.cwd(), path)


def create_template(path: str) -> None:
"""Creates a manifest in target dir if one doesn't exist"""
abs_path = get_abs_posix_path(path)
manifest_path = Path(abs_path, "manifest.toml")
manifest_path = pathlib.Path(abs_path, "manifest.toml")
if manifest_path.exists():
sys.exit(f"A manifest.toml already exists at {abs_path}, skipping creation")
abs_path.mkdir(parents=True, exist_ok=True)
Expand All @@ -232,33 +249,33 @@ def create_template(path: str) -> None:
[".sqlfluff", ".sqlfluff"],
]
for source, dest in copy_lists:
source_path = Path(Path(__file__).resolve().parents[0], source)
dest_path = Path(abs_path, dest)
source_path = pathlib.Path(pathlib.Path(__file__).resolve().parents[0], source)
dest_path = pathlib.Path(abs_path, dest)
dest_path.write_bytes(source_path.read_bytes())


def get_study_dict(alt_dir_paths: List) -> Optional[Dict[str, PosixPath]]:
def get_study_dict(alt_dir_paths: List) -> Optional[Dict[str, pathlib.Path]]:
"""Gets valid study targets from ./studies/, and any pip installed studies
:returns: A list of pathlib.PosixPath objects
:returns: A list of Path objects
"""
manifest_studies = {}
cli_path = Path(__file__).resolve().parents[0]
cli_path = pathlib.Path(__file__).resolve().parents[0]

# first, we'll get any installed public studies
with open(
Path(cli_path, "./module_allowlist.json"), "r", encoding="utf-8"
pathlib.Path(cli_path, "./module_allowlist.json"), "r", encoding="utf-8"
) as study_allowlist_json:
study_allowlist = json.load(study_allowlist_json)["allowlist"]
site_packages_dir = sysconfig.get_path("purelib")
for study, subdir in study_allowlist.items():
study_path = Path(site_packages_dir, subdir)
study_path = pathlib.Path(site_packages_dir, subdir)
if study_path.exists():
manifest_studies[study] = study_path

# then we'll get all studies inside the project directory, followed by
# any user supplied paths last. These take precedence.
paths = [Path(cli_path, "studies")]
paths = [pathlib.Path(cli_path, "studies")]
if alt_dir_paths is not None:
paths = paths + alt_dir_paths
for path in paths:
Expand All @@ -267,7 +284,7 @@ def get_study_dict(alt_dir_paths: List) -> Optional[Dict[str, PosixPath]]:
return manifest_studies


def get_studies_by_manifest_path(path: PosixPath) -> dict:
def get_studies_by_manifest_path(path: pathlib.Path) -> dict:
"""Recursively search for manifest.toml files from a given path"""
manifest_paths = {}
for child_path in path.iterdir():
Expand All @@ -284,17 +301,17 @@ def run_cli(args: Dict):
create_template(args["create_dir"])

elif args["action"] == "upload":
upload_files(args)
upload.upload_files(args)

# all other actions require connecting to the database
else:
db_backend = create_db_backend(args)
db_backend = databases.create_db_backend(args)
try:
builder = StudyBuilder(db_backend, data_path=args.get("data_path"))
if args["verbose"]:
builder.verbose = True
runner = StudyRunner(db_backend, data_path=args.get("data_path"))
if args.get("verbose"):
runner.verbose = True
print("Testing connection to database...")
builder.cursor.execute("SHOW DATABASES")
runner.cursor.execute("SHOW DATABASES")

study_dict = get_study_dict(args["study_dir"])
if "prefix" not in args.keys():
Expand All @@ -308,42 +325,50 @@ def run_cli(args: Dict):
"you include `-s path/to/study/dir` as an arugment."
)
if args["action"] == "clean":
builder.clean_study(
runner.clean_study(
args["target"],
study_dict,
stats_clean=args["stats_clean"],
prefix=args["prefix"],
)
elif args["action"] == "build":
if "all" in args["target"]:
builder.clean_and_build_all(study_dict, args["stats_build"])
runner.clean_and_build_all(study_dict, args["stats_build"])
else:
for target in args["target"]:
if args["builder"]:
builder.run_single_table_builder(
runner.run_single_table_builder(
study_dict[target], args["builder"]
)
else:
builder.clean_and_build_study(
runner.clean_and_build_study(
study_dict[target],
stats_build=args["stats_build"],
continue_from=args["continue_from"],
)

elif args["action"] == "export":
if "all" in args["target"]:
builder.export_all(study_dict, args["data_path"])
runner.export_all(study_dict, args["data_path"])
else:
for target in args["target"]:
runner.export_study(study_dict[target], args["data_path"])

elif args["action"] == "generate-sql":
if "all" in args["target"]:
for target in study_dict.keys():
runner.generate_all_sql(study_dict[target])
else:
for target in args["target"]:
builder.export_study(study_dict[target], args["data_path"])
runner.generate_study_sql(study_dict[target])
finally:
db_backend.close()


def main(cli_args=None):
"""Reads CLI input/environment variables and invokes library calls"""

parser = get_parser()
parser = cli_parser.get_parser()
args = vars(parser.parse_args(cli_args))
if args["version"]:
print(__version__)
Expand Down Expand Up @@ -380,15 +405,15 @@ def main(cli_args=None):
read_env_vars.append([pair[1], env_val])

if len(read_env_vars) > 0:
table = Table(title="Values read from environment variables")
table = rich.table.Table(title="Values read from environment variables")
table.add_column("Environment Variable", style="green")
table.add_column("Value", style="cyan")
for row in read_env_vars:
if row[0] == "CUMULUS_AGGREGATOR_ID":
table.add_row(row[0], "#########")
else:
table.add_row(row[0], row[1])
console = Console()
console = rich.console.Console()
console.print(table)

if args.get("study_dir"):
Expand Down
7 changes: 7 additions & 0 deletions cumulus_library/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,11 @@ def get_parser() -> argparse.ArgumentParser:
help="Run pre-fetch and prepare upload, but log output instead of sending.",
)

# Generate a study's template-driven sql
generate = actions.add_parser(
"generate-sql", help="Generates a study's template-driven sql for reference"
)
add_target_argument(generate)
add_study_dir_argument(generate)
add_db_config(generate)
return parser
Loading

0 comments on commit 820e1a9

Please sign in to comment.