Skip to content

Commit

Permalink
feat: add new DuckDB backend for reading ndjson directly
Browse files Browse the repository at this point in the history
This adds some new arguments:
--db-type [athena,duckdb] (defaulting to athena)
--load-ndjson-dir DIR (tells DuckDB where to find source ndjsons)

A light abstraction layer has been added in databases.py to choose
the correct backend based on the args.

Mostly the SQL is the same. Some light tweaks for standardization,
plus some compatibility user-defined functions injected into duckdb
allow both backends to work on the same SQL.
  • Loading branch information
mikix committed Nov 24, 2023
1 parent 973c13a commit 90abb0e
Show file tree
Hide file tree
Showing 39 changed files with 812 additions and 120 deletions.
12 changes: 12 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Contributing to Cumulus Library

## Set up your dev environment

To use the same dev environment as us, you'll want to run these commands:
```sh
pip install .[dev]
pre-commit install
```

This will install dependencies & build tools,
as well as set up a `black` auto-formatter commit hook.
7 changes: 6 additions & 1 deletion cumulus_library/base_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from abc import ABC, abstractmethod
from typing import final

from cumulus_library.databases import DatabaseCursor
from cumulus_library.helper import get_progress_bar, query_console_output


Expand Down Expand Up @@ -34,7 +35,11 @@ def prepare_queries(self, cursor: object, schema: str):

@final
def execute_queries(
self, cursor: object, schema: str, verbose: bool, drop_table: bool = False
self,
cursor: DatabaseCursor,
schema: str,
verbose: bool,
drop_table: bool = False,
):
"""Executes queries set up by a prepare_queries call
Expand Down
61 changes: 20 additions & 41 deletions cumulus_library/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
from pathlib import Path, PosixPath
from typing import Dict, List, Optional

import pyathena

from pyathena.pandas.cursor import PandasCursor
from rich.console import Console
from rich.table import Table

from cumulus_library import __version__
from cumulus_library.cli_parser import get_parser
from cumulus_library.databases import (
AthenaDatabaseBackend,
DatabaseBackend,
create_db_backend,
)
from cumulus_library.study_parser import StudyManifestParser
from cumulus_library.upload import upload_files

Expand All @@ -38,7 +40,10 @@ def __init__(self):

def get_study_builder(self):
"""Convenience method for getting athena args from environment"""
return StudyBuilder(self.region, self.workgroup, self.profile, self.schema_name)
db = AthenaDatabaseBackend(
self.region, self.workgroup, self.profile, self.schema_name
)
return StudyBuilder(db)


class StudyBuilder:
Expand All @@ -47,35 +52,10 @@ class StudyBuilder:
verbose = False
schema_name = None

def __init__( # pylint: disable=too-many-arguments
self, region: str, workgroup: str, profile: str, schema: str
):
connect_kwargs = {}
for aws_env_name in [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
]:
if aws_env_val := os.environ.get(aws_env_name):
connect_kwargs[aws_env_name.lower()] = aws_env_val
# the profile may not be required, provided the above three AWS env vars
# are set. If both are present, the env vars take precedence
if profile is not None:
connect_kwargs["profile_name"] = profile
self.cursor = pyathena.connect(
region_name=region,
work_group=workgroup,
schema_name=schema,
**connect_kwargs,
).cursor()
self.pandas_cursor = pyathena.connect(
region_name=region,
work_group=workgroup,
schema_name=schema,
cursor_class=PandasCursor,
**connect_kwargs,
).cursor()
self.schema_name = schema
def __init__(self, db: DatabaseBackend):
self.db = db
self.cursor = db.cursor()
self.schema_name = db.schema_name

def reset_data_path(self, study: PosixPath) -> None:
"""
Expand Down Expand Up @@ -163,7 +143,7 @@ def export_study(self, target: PosixPath, data_path: PosixPath) -> None:
if data_path is None:
sys.exit("Missing destination - please provide a path argument.")
studyparser = StudyManifestParser(target)
studyparser.export_study(self.pandas_cursor, data_path)
studyparser.export_study(self.db, data_path)

def export_all(self, study_dict: Dict, data_path: PosixPath):
"""Exports all defined count tables to disk"""
Expand Down Expand Up @@ -248,15 +228,11 @@ def run_cli(args: Dict):

# all other actions require connecting to AWS
else:
builder = StudyBuilder(
args["region"],
args["workgroup"],
args["profile"],
args["schema_name"],
)
db_backend = create_db_backend(args)
builder = StudyBuilder(db_backend)
if args["verbose"]:
builder.verbose = True
print("Testing connection to athena...")
print("Testing connection to database...")
builder.cursor.execute("SHOW DATABASES")

study_dict = get_study_dict(args["study_dir"])
Expand Down Expand Up @@ -297,6 +273,7 @@ def run_cli(args: Dict):
for target in args["target"]:
builder.export_study(study_dict[target], args["data_path"])

db_backend.close()
# returning the builder for ease of unit testing
return builder

Expand All @@ -319,12 +296,14 @@ def main(cli_args=None):
break

arg_env_pairs = (
("db_type", "CUMULUS_LIBRARY_DB_TYPE"),
("profile", "CUMULUS_LIBRARY_PROFILE"),
("schema_name", "CUMULUS_LIBRARY_DATABASE"),
("workgroup", "CUMULUS_LIBRARY_WORKGROUP"),
("region", "CUMULUS_LIBRARY_REGION"),
("study_dir", "CUMULUS_LIBRARY_STUDY_DIR"),
("data_path", "CUMULUS_LIBRARY_DATA_PATH"),
("load_ndjson_dir", "CUMULUS_LIBRARY_LOAD_NDJSON_DIR"),
("user", "CUMULUS_AGGREGATOR_USER"),
("id", "CUMULUS_AGGREGATOR_ID"),
("url", "CUMULUS_AGGREGATOR_URL"),
Expand Down
40 changes: 30 additions & 10 deletions cumulus_library/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def add_data_path_argument(parser: argparse.ArgumentParser) -> None:
nargs="?",
help=(
"The path to use for Athena counts data. "
"Can be povided via CUMULUS_LIBRARY_DATA_PATH environment variable."
"Can be provided via CUMULUS_LIBRARY_DATA_PATH environment variable."
),
)

Expand All @@ -63,12 +63,6 @@ def add_aws_config(parser: argparse.ArgumentParser) -> None:
"""Adds arguments related to aws credentials to a subparser"""
aws = parser.add_argument_group("AWS config")
aws.add_argument("--profile", help="AWS profile", default="default")
aws.add_argument(
"--database",
# internally, we use PyAthena's terminology for this but the UX term is "database"
dest="schema_name",
help="Cumulus Athena database name",
)
aws.add_argument(
"--workgroup",
default="cumulus",
Expand All @@ -81,6 +75,29 @@ def add_aws_config(parser: argparse.ArgumentParser) -> None:
)


def add_db_config(parser: argparse.ArgumentParser) -> None:
"""Adds arguments related to database backends to a subparser"""
group = parser.add_argument_group("Database config")
group.add_argument(
"--db-type",
help="Which database backend to use (default athena)",
choices=["athena", "duckdb"],
default="athena",
)
group.add_argument(
"--database",
# In Athena, we use this as the schema_name (which is also called a Database in their UX).
# In DuckDB, we use this as the path to the filename to store tables.
# Since we started as an Athena-centric codebase, we mostly keep referring to this as
# name "schema_name". But to the user, both uses are still conceptually a "database".
dest="schema_name",
help="Database name (for Athena) or file (for DuckDB)",
)

# Backend-specific config:
add_aws_config(parser)


def get_parser() -> argparse.ArgumentParser:
"""Provides parser for handling CLI arguments"""
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -125,7 +142,7 @@ def get_parser() -> argparse.ArgumentParser:
add_target_argument(clean)
add_study_dir_argument(clean)
add_verbose_argument(clean)
add_aws_config(clean)
add_db_config(clean)
clean.add_argument(
"--prefix",
action="store_true",
Expand All @@ -140,8 +157,11 @@ def get_parser() -> argparse.ArgumentParser:
add_table_builder_argument(build)
add_study_dir_argument(build)
add_verbose_argument(build)
add_aws_config(build)
add_db_config(build)

build.add_argument(
"--load-ndjson-dir", help="Load ndjson files from this folder", metavar="DIR"
)
build.add_argument(
"--continue",
dest="continue_from",
Expand All @@ -155,7 +175,7 @@ def get_parser() -> argparse.ArgumentParser:
add_study_dir_argument(export)
add_data_path_argument(export)
add_verbose_argument(export)
add_aws_config(export)
add_db_config(export)

upload = actions.add_parser(
"upload", help="Bulk uploads data to Cumulus aggregator"
Expand Down
Loading

0 comments on commit 90abb0e

Please sign in to comment.