From 0aacf202e550eb8740ff3f94953f5e67cfe038f4 Mon Sep 17 00:00:00 2001 From: matt garber Date: Wed, 1 May 2024 11:41:25 -0400 Subject: [PATCH] Add table creation from files (#230) * Add table creation from files * remove mock bsvs * pr feedback * meant to adjust the other test, whoops * test cleanup * corrected upload path --- .github/workflows/ci.yaml | 4 +- .sqlfluffignore | 4 + cumulus_library/.sqlfluff | 2 + cumulus_library/databases.py | 52 ++++++- cumulus_library/errors.py | 4 + cumulus_library/studies/vocab/.gitignore | 1 + .../vocab/{ => icd}/ICD10CM_2023AA.bsv | 0 .../vocab/{ => icd}/ICD10PCS_2023AA.bsv | 0 .../studies/vocab/{ => icd}/ICD9CM_2023AA.bsv | 0 cumulus_library/studies/vocab/manifest.toml | 6 + .../vocab/reference_sql/vocab_icd_builder.sql | 18 +++ .../studies/vocab/vocab_icd_builder.py | 75 ++++------ .../template_sql/base_templates.py | 35 +++++ .../template_sql/ctas_from_parquet.sql.jinja | 24 ++++ tests/conftest.py | 1 + tests/core/test_core_meds.py | 6 + tests/regression/__init__.py | 0 tests/regression/regression-iam.json | 76 ++++++---- tests/regression/run_regression.py | 136 ++++++++++-------- tests/test_athena.py | 36 ++++- ...st_templates.py => test_base_templates.py} | 44 ++++++ tests/test_cli.py | 18 --- .../boto3.client.athena.get_work_group.json | 43 ++++++ .../aws/boto3.client.s3.put_object.json | 23 +++ tests/test_data/mock_bsvs/ICD10CM_2023AA.bsv | 10 -- tests/test_data/mock_bsvs/ICD10PCS_2023AA.bsv | 10 -- tests/test_data/mock_bsvs/ICD9CM_2023AA.bsv | 10 -- tests/test_vocab.py | 49 +++++++ 28 files changed, 504 insertions(+), 183 deletions(-) create mode 100644 cumulus_library/studies/vocab/.gitignore rename cumulus_library/studies/vocab/{ => icd}/ICD10CM_2023AA.bsv (100%) rename cumulus_library/studies/vocab/{ => icd}/ICD10PCS_2023AA.bsv (100%) rename cumulus_library/studies/vocab/{ => icd}/ICD9CM_2023AA.bsv (100%) create mode 100644 cumulus_library/studies/vocab/reference_sql/vocab_icd_builder.sql create mode 100644 cumulus_library/template_sql/ctas_from_parquet.sql.jinja create mode 100644 tests/regression/__init__.py rename tests/{test_templates.py => test_base_templates.py} (91%) create mode 100644 tests/test_data/aws/boto3.client.athena.get_work_group.json create mode 100644 tests/test_data/aws/boto3.client.s3.put_object.json delete mode 100644 tests/test_data/mock_bsvs/ICD10CM_2023AA.bsv delete mode 100644 tests/test_data/mock_bsvs/ICD10PCS_2023AA.bsv delete mode 100644 tests/test_data/mock_bsvs/ICD9CM_2023AA.bsv create mode 100644 tests/test_vocab.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 55b278e9..7bb24882 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -80,8 +80,8 @@ jobs: WG: cumulus DB: cumulus_library_regression_db run: | - cumulus-library build -t core --profile $PROFILE --workgroup $WG --database $DB - cumulus-library export -t core ./tests/regression/data_export/ --profile $PROFILE --workgroup $WG --database $DB + cumulus-library build -t vocab -t core --profile $PROFILE --workgroup $WG --database $DB + cumulus-library export -t vocab -t core ./tests/regression/data_export/ --profile $PROFILE --workgroup $WG --database $DB - name: Compare vs known data run: python ./tests/regression/run_regression.py diff --git a/.sqlfluffignore b/.sqlfluffignore index d4954eac..7e2415aa 100644 --- a/.sqlfluffignore +++ b/.sqlfluffignore @@ -12,3 +12,7 @@ encounter.sql.jinja # Ignoring for now - could be addressed with an in-folder .sqlfluff config # or by a refactor of variable names count.sql.jinja + +# The following files try to adapt syntax to databases and so are not +# well checkable via static sqlfluff variables +ctas_from_parquet.sql.jinja diff --git a/cumulus_library/.sqlfluff b/cumulus_library/.sqlfluff index 447cef8b..1754f85e 100644 --- a/cumulus_library/.sqlfluff +++ b/cumulus_library/.sqlfluff @@ -89,12 +89,14 @@ join_cols_by_table = } } join_id = subject_ref +local_location = /var/study/data/ neg_source_table = neg_source_table output_table_name = 'created_table' parent_field = 'parent' prefix = Test primary_ref = encounter_ref pos_source_table = pos_source_table +remote_location = s3://bucket/study/data/ schema_name = test_schema schema = { diff --git a/cumulus_library/databases.py b/cumulus_library/databases.py index 36f2fae5..bba76460 100644 --- a/cumulus_library/databases.py +++ b/cumulus_library/databases.py @@ -12,10 +12,12 @@ import datetime import json import os +import pathlib import sys from pathlib import Path from typing import Any, Protocol +import boto3 import cumulus_fhir_support import duckdb import pandas @@ -24,7 +26,7 @@ from pyathena.common import BaseCursor as AthenaCursor from pyathena.pandas.cursor import PandasCursor as AthenaPandasCursor -from cumulus_library import db_config +from cumulus_library import db_config, errors class DatabaseCursor(Protocol): @@ -497,3 +499,51 @@ def create_db_backend(args: dict[str, str]) -> DatabaseBackend: raise ValueError(f"Unexpected --db-type value '{db_type}'") return backend + + +def upload_file( + *, + cursor: DatabaseCursor, + file: pathlib.Path, + study: str, + topic: str, + remote_filename: str | None = None, +) -> str | None: + if db_config.db_type == "athena": + # We'll investigate the cursor to get the relevant params for S3 upload + # TODO: this could be retrieved from a config object passed to builders + wg_conf = cursor.connection._client.get_work_group( + WorkGroup=cursor.connection.work_group + )["WorkGroup"]["Configuration"]["ResultConfiguration"] + s3_path = wg_conf["OutputLocation"] + bucket = "/".join(s3_path.split("/")[2:3]) + key_prefix = "/".join(s3_path.split("/")[3:]) + encryption_type = wg_conf.get("EncryptionConfiguration", {}).get( + "EncryptionOption", {} + ) + if encryption_type != "SSE_KMS": + raise errors.AWSError( + f"Bucket {bucket} has unexpected encryption type {encryption_type}." + "AWS KMS encryption is expected for Cumulus buckets" + ) + kms_arn = wg_conf.get("EncryptionConfiguration", {}).get("KmsKey", None) + profile = cursor.connection.profile_name + s3_key = ( + f"{key_prefix}cumulus_user_uploads/{cursor._schema_name}/" + f"{study}/{topic}" + ) + if not remote_filename: + remote_filename = file + session = boto3.Session(profile_name=profile) + s3_client = session.client("s3") + with open(file, "rb") as b_file: + s3_client.put_object( + Bucket=bucket, + Key=f"{s3_key}/{remote_filename}", + Body=b_file, + ServerSideEncryption="aws:kms", + SSEKMSKeyId=kms_arn, + ) + return f"s3://{bucket}/{s3_key}" + # For DBs not requiring a remote upload + return None diff --git a/cumulus_library/errors.py b/cumulus_library/errors.py index 0b60ea2a..8a3fa5a7 100644 --- a/cumulus_library/errors.py +++ b/cumulus_library/errors.py @@ -23,5 +23,9 @@ class StudyManifestQueryError(Exception): """Errors related to data queries from StudyManifestParser""" +class AWSError(Exception): + """Errors from interacting with AWS""" + + class ApiError(Exception): """Errors from external API calls""" diff --git a/cumulus_library/studies/vocab/.gitignore b/cumulus_library/studies/vocab/.gitignore new file mode 100644 index 00000000..45c2ed73 --- /dev/null +++ b/cumulus_library/studies/vocab/.gitignore @@ -0,0 +1 @@ +icd/*.parquet diff --git a/cumulus_library/studies/vocab/ICD10CM_2023AA.bsv b/cumulus_library/studies/vocab/icd/ICD10CM_2023AA.bsv similarity index 100% rename from cumulus_library/studies/vocab/ICD10CM_2023AA.bsv rename to cumulus_library/studies/vocab/icd/ICD10CM_2023AA.bsv diff --git a/cumulus_library/studies/vocab/ICD10PCS_2023AA.bsv b/cumulus_library/studies/vocab/icd/ICD10PCS_2023AA.bsv similarity index 100% rename from cumulus_library/studies/vocab/ICD10PCS_2023AA.bsv rename to cumulus_library/studies/vocab/icd/ICD10PCS_2023AA.bsv diff --git a/cumulus_library/studies/vocab/ICD9CM_2023AA.bsv b/cumulus_library/studies/vocab/icd/ICD9CM_2023AA.bsv similarity index 100% rename from cumulus_library/studies/vocab/ICD9CM_2023AA.bsv rename to cumulus_library/studies/vocab/icd/ICD9CM_2023AA.bsv diff --git a/cumulus_library/studies/vocab/manifest.toml b/cumulus_library/studies/vocab/manifest.toml index 49143e2e..e44c23fe 100644 --- a/cumulus_library/studies/vocab/manifest.toml +++ b/cumulus_library/studies/vocab/manifest.toml @@ -9,3 +9,9 @@ file_names = [ file_names = [ "icd_legend.sql", ] + +# This export is only intended for use as part of CI regression testing +[export_config] +export_list = [ + "vocab__icd" +] diff --git a/cumulus_library/studies/vocab/reference_sql/vocab_icd_builder.sql b/cumulus_library/studies/vocab/reference_sql/vocab_icd_builder.sql new file mode 100644 index 00000000..4fc0e480 --- /dev/null +++ b/cumulus_library/studies/vocab/reference_sql/vocab_icd_builder.sql @@ -0,0 +1,18 @@ +-- noqa: disable=all +-- This sql was autogenerated as a reference example using the library +-- CLI. Its format is tied to the specific database it was run against, +-- and it may not be correct for all databases. Use the CLI's build +-- option to derive the best SQL for your dataset. + +-- ########################################################### + +CREATE EXTERNAL TABLE IF NOT EXISTS `cumulus_mhg_dev_db`.`vocab__icd` ( + CUI STRING, + TTY STRING, + CODE STRING, + SAB STRING, + STR STRING +) +STORED AS PARQUET +LOCATION 's3://cumulus-athena-933137588087-us-east-1/results/cumulus_user_uploads/cumulus_mhg_dev_db/vocab/icd' +tblproperties ("parquet.compression"="SNAPPY"); diff --git a/cumulus_library/studies/vocab/vocab_icd_builder.py b/cumulus_library/studies/vocab/vocab_icd_builder.py index 4a323893..aa91fbf7 100644 --- a/cumulus_library/studies/vocab/vocab_icd_builder.py +++ b/cumulus_library/studies/vocab/vocab_icd_builder.py @@ -1,9 +1,10 @@ """ Module for directly loading ICD bsvs into athena tables """ -import csv import pathlib -from cumulus_library import base_table_builder +import pandas + +from cumulus_library import base_table_builder, databases from cumulus_library.template_sql import base_templates @@ -30,49 +31,31 @@ def prepare_queries(self, cursor: object, schema: str, *args, **kwargs): """ table_name = "vocab__icd" - icd_files = ["ICD10CM_2023AA", "ICD10PCS_2023AA", "ICD9CM_2023AA"] path = pathlib.Path(__file__).parent - + icd_files = path.glob("icd/*.bsv") headers = ["CUI", "TTY", "CODE", "SAB", "STR"] - rows_processed = 0 - dataset = [] - created = False - for filename in icd_files: - with open(f"{path}/{filename}.bsv") as file: - # For the first row in the dataset, we want to coerce types from - # varchar(len(item)) athena default to to an unrestricted varchar, so - # we'll create a table with one row - this make the recast faster, and - # lets us set the partition_size a little higher by limiting the - # character bloat to keep queries under athena's limit of 262144. - reader = csv.reader(file, delimiter="|") - if not created: - row = self.clean_row(next(reader), filename) - self.queries.append( - base_templates.get_ctas_query( - schema_name=schema, - table_name=table_name, - dataset=[row], - table_cols=headers, - ) - ) - created = True - for row in reader: - row = self.clean_row(row, filename) - dataset.append(row) - rows_processed += 1 - if rows_processed == self.partition_size: - self.queries.append( - base_templates.get_insert_into_query( - table_name=table_name, - table_cols=headers, - dataset=dataset, - ) - ) - dataset = [] - rows_processed = 0 - if rows_processed > 0: - self.queries.append( - base_templates.get_insert_into_query( - table_name=table_name, table_cols=headers, dataset=dataset - ) - ) + header_types = ["STRING", "STRING", "STRING", "STRING", "STRING"] + for file in icd_files: + parquet_path = path / f"icd/{file.stem}.parquet" + if not parquet_path.is_file(): + df = pandas.read_csv(file, delimiter="|", names=headers) + df.to_parquet(parquet_path) + remote_path = databases.upload_file( + cursor=cursor, + file=parquet_path, + study="vocab", + topic="icd", + remote_filename=f"{file.stem}.parquet", + ) + # Since we are building one table from these three files, it's fine to just + # use the last value of remote location + self.queries.append( + base_templates.get_ctas_from_parquet_query( + schema_name=schema, + table_name=table_name, + local_location=path / "icd", + remote_location=remote_path, + table_cols=headers, + remote_table_cols_types=header_types, + ) + ) diff --git a/cumulus_library/template_sql/base_templates.py b/cumulus_library/template_sql/base_templates.py index 2a36b4ac..4f937532 100644 --- a/cumulus_library/template_sql/base_templates.py +++ b/cumulus_library/template_sql/base_templates.py @@ -156,6 +156,41 @@ def get_create_view_query( ) +def get_ctas_from_parquet_query( + schema_name: str, + table_name: str, + local_location: str, + remote_location: str, + table_cols: list[str], + remote_table_cols_types: list[str], +) -> str: + """Generates a create table as query using a directory of parquet files as a source + + This function will generate an appropriate query for the underlying DB. + Not all params are used by each type of database. + + :param schema_name: (athena) The schema to create the table in + :param table_name: (all) The name of the athena table to create + :param local_location: (duckdb) A directory containing parquet files to group + into a table + :param remote_location: (athena) An S3 URL to a directory containing parquert + fiels to group into a table + :param table_cols: (all) names of fields in your parquet to use as SQL columns + :param remote_table_cols_types: (athena) The types to assign to the columns + created in athena. Note that these should not be SQL types, but instead + should be parquet types. + """ + return get_base_template( + "ctas_from_parquet", + schema_name=schema_name, + table_name=table_name, + local_location=local_location, + remote_location=remote_location, + table_cols=table_cols, + remote_table_cols_types=remote_table_cols_types, + ) + + def get_ctas_query( schema_name: str, table_name: str, dataset: list[list[str]], table_cols: list[str] ) -> str: diff --git a/cumulus_library/template_sql/ctas_from_parquet.sql.jinja b/cumulus_library/template_sql/ctas_from_parquet.sql.jinja new file mode 100644 index 00000000..50713606 --- /dev/null +++ b/cumulus_library/template_sql/ctas_from_parquet.sql.jinja @@ -0,0 +1,24 @@ +{%- import 'syntax.sql.jinja' as syntax -%} +{%- if db_type == 'athena' -%} +CREATE EXTERNAL TABLE IF NOT EXISTS `{{ schema_name }}`.`{{ table_name }}` ( +{%- elif db_type == 'duckdb' -%} +CREATE TABLE IF NOT EXISTS {{ table_name }} AS SELECT +{%- endif %} +{%- for col in table_cols %} + {{ col }}{% if db_type == 'athena' %} {{ remote_table_cols_types[loop.index0] }}{%- endif -%} + {{- syntax.comma_delineate(loop) }} +{%- endfor %} +{%- if db_type == 'athena' %} +) +{#- TODO: we may want to consider an optional partition parameter for +large tables, though we would need to also run a MSCK REPAIR TABLE query +after this table is created to make the data available. + +See https://docs.aws.amazon.com/athena/latest/ug/parquet-serde.html +for more info #} +STORED AS PARQUET +LOCATION '{{ remote_location }}' +tblproperties ("parquet.compression"="SNAPPY"); +{%- elif db_type == 'duckdb' %} +FROM read_parquet('{{ local_location }}/*.parquet') +{%- endif %} diff --git a/tests/conftest.py b/tests/conftest.py index c71fddf4..41a0f90b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,6 +34,7 @@ "observation": [["id"], ["encounter", "reference"]], "patient": [["id"]], } +VOCAB_ICD_ROW_COUNT = 403230 # Utility functions diff --git a/tests/core/test_core_meds.py b/tests/core/test_core_meds.py index a50bbcdd..95bf6fdc 100644 --- a/tests/core/test_core_meds.py +++ b/tests/core/test_core_meds.py @@ -1,6 +1,8 @@ """Tests for core__medicationrequest""" import json +import os +from unittest import mock import pytest @@ -79,6 +81,10 @@ def test_core_medreq_only_inline(tmp_path): assert [x[0] for x in ids] == ["Inline"] +@mock.patch.dict( + os.environ, + clear=True, +) def test_core_med_all_types(tmp_path): """Verify that we handle all types of medications""" testbed = testbed_utils.LocalTestbed(tmp_path) diff --git a/tests/regression/__init__.py b/tests/regression/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/regression/regression-iam.json b/tests/regression/regression-iam.json index fdf7b71b..992b7819 100644 --- a/tests/regression/regression-iam.json +++ b/tests/regression/regression-iam.json @@ -2,34 +2,31 @@ "Version": "2012-10-17", "Statement": [ { - "Effect": "Allow", "Action": [ "glue:BatchGetCrawlers", - "glue:GetCrawler", - "glue:GetCrawlerMetrics", - "glue:GetCrawlers", - "glue:ListCrawls", - "glue:ListCrawlers", + "glue:GetCrawler*", + "glue:ListCrawl*", "glue:StartCrawler", "glue:StopCrawler" ], "Resource": [ - "*" - ] + "arn:aws:glue:us-east-1:[replace w/ account number]:crawler/cumulus-library-regression-crawler" + ], + "Effect": "Allow", + "Sid": "AllowCrawlerAccess" }, { - "Effect": "Allow", "Action": [ - "glue:GetDatabase", - "glue:CreateDatabase" + "glue:GetDatabase" ], "Resource": [ "arn:aws:glue:*:*:catalog", "arn:aws:glue:*:*:database" - ] + ], + "Effect": "Allow", + "Sid": "AllowGetDatabases" }, { - "Effect": "Allow", "Action": [ "glue:CreatePartition", "glue:CreateTable", @@ -46,18 +43,24 @@ ], "Resource": [ "arn:aws:glue:*:*:catalog", - "arn:aws:glue:*:*:database/*", + "arn:aws:glue:*:*:database/cumulus_library_regression_db", "arn:aws:glue:*:*:table/*" - ] + ], + "Effect": "Allow", + "Sid": "AllowTableCRUD" }, { - "Effect": "Allow", "Action": [ - "athena:*" + "athena:StartQueryExecution", + "athena:GetQueryExecution", + "athena:GetQueryResults", + "athena:GetWorkGroup" ], "Resource": [ - "arn:aws:athena:*:*:workgroup/*" - ] + "arn:aws:athena:*:*:workgroup/cumulus" + ], + "Effect": "Allow", + "Sid": "AllowWorkgroupAccess" }, { "Action": [ @@ -65,7 +68,6 @@ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", - "secretsmanager:GetSecretValue", "kms:DescribeKey", "kms:Decrypt", "kms:Encrypt", @@ -73,22 +75,40 @@ "kms:GenerateDataKeyPair" ], "Resource": [ - "arn:aws:secretsmanager:us-east-1:[replace with account #]:secret:*", - "arn:aws:kms:us-east-1:[replace with account #]:key/*", - "arn:aws:s3:::cumulus-etl-synthetic-regression*", - "arn:aws:s3:::cumulus-etl-synthetic-regression*/" + "arn:aws:kms:us-east-1:[replace w/ account number]:key/*", + "arn:aws:s3:::cumulus-etl-output-[replace w/ account number]-us-east-1/library-regression/*", + "arn:aws:s3:::cumulus-etl-output-[replace w/ account number]-us-east-1", + "arn:aws:s3:::cumulus-athena-[replace w/ account number]-us-east-1/*", + "arn:aws:s3:::cumulus-athena-[replace w/ account number]-us-east-1" ], - "Effect": "Allow" + "Effect": "Allow", + "Sid": "AllowBucketReadUpdate" }, { + "Condition": { + "StringLike": { + "s3:prefix": [ + "library-regression/*" + ] + } + }, + "Action": [ + "s3:ListBucket" + ], + "Resource": "arn:aws:s3:::cumulus-etl-output-[replace w/ account number]-us-east-1", "Effect": "Allow", + "Sid": "LimitETLBucketListAccessToRegression" + }, + { "Action": [ "s3:GetBucketLocation" ], "Resource": [ - "arn:aws:s3:::cumulus-etl-synthetic-regression*", - "arn:aws:s3:::cumulus-etl-synthetic-regression*/" - ] + "arn:aws:s3:::cumulus-etl-output-[replace w/ account number]-us-east-1", + "arn:aws:s3:::cumulus-athena-[replace w/ account number]-us-east-1" + ], + "Effect": "Allow", + "Sid": "AllowBucketResolution" } ] } \ No newline at end of file diff --git a/tests/regression/run_regression.py b/tests/regression/run_regression.py index 9e4ab177..3b754226 100644 --- a/tests/regression/run_regression.py +++ b/tests/regression/run_regression.py @@ -5,7 +5,7 @@ - You need to be on the BCH VPN - You need to have a federated token for access from the bch-aws-login script -- You need to do a fresh library build and export to ./data_export core +- You need to do a fresh library build and export to ./data_export You can elect to strategically do all of these things, but that's outside the scope of the test suite. @@ -20,62 +20,84 @@ from pandas import read_parquet -ref_path = f"{Path(__file__).resolve().parent}/reference" -export_path = f"{Path(__file__).resolve().parent}/data_export/core" +VOCAB_ICD_ROW_COUNT = 403231 -references = set(os.listdir(ref_path)) -exports = set(os.listdir(export_path)) -if references != exports: - ref_missing = references - exports - export_missing = exports - references - sys.exit( - "❌ Found differences in files present: ❌\n" - f"Files present in reference not in export: {ref_missing!s}\n" - f"Files present in export not in reference: {export_missing!s}" - ) -diffs = [] -for filename in references: - if filename.endswith(".parquet"): - ref_df = read_parquet(f"{ref_path}/{filename}") - exp_df = read_parquet(f"{export_path}/{filename}") - if list(ref_df.columns) != list(exp_df.columns): - diffs.append( - [ - filename, - ( - "Columns differ between reference and export:\n" - f"Reference: {list(ref_df.columns)}\n" - f"Export: {list(exp_df.columns)}" - ), - ] - ) - continue - if ref_df.size != exp_df.size: - diffs.append( - [ - filename, - ( - "Size (num rows) differ between reference and export:\n" - f"Reference: {ref_df.size}\n" - f"Export: {exp_df.size}" - ), - ] - ) - continue - ref_df = ref_df.sort_values(list(ref_df.columns), ignore_index=True) - exp_df = exp_df.sort_values(list(exp_df.columns), ignore_index=True) - compared = ref_df.compare(exp_df) - if not compared.empty: - diffs.append( - [ - filename, - f"Rows differ between reference and export:\n {compared}", - ] +def regress_core(): + ref_path = f"{Path(__file__).resolve().parent}/reference" + export_path = f"{Path(__file__).resolve().parent}/data_export/core" + + references = set(os.listdir(ref_path)) + exports = set(os.listdir(export_path)) + + if references != exports: + ref_missing = references - exports + export_missing = exports - references + sys.exit( + "❌ Found differences in files present: ❌\n" + f"Files present in reference not in export: {ref_missing!s}\n" + f"Files present in export not in reference: {export_missing!s}" + ) + diffs = [] + for filename in references: + if filename.endswith(".parquet"): + ref_df = read_parquet(f"{ref_path}/{filename}") + exp_df = read_parquet(f"{export_path}/{filename}") + if list(ref_df.columns) != list(exp_df.columns): + diffs.append( + [ + filename, + ( + "Columns differ between reference and export:\n" + f"Reference: {list(ref_df.columns)}\n" + f"Export: {list(exp_df.columns)}" + ), + ] + ) + continue + if ref_df.size != exp_df.size: + diffs.append( + [ + filename, + ( + "Size (num rows) differ between reference and export:\n" + f"Reference: {ref_df.size}\n" + f"Export: {exp_df.size}" + ), + ] + ) + continue + ref_df = ref_df.sort_values(list(ref_df.columns), ignore_index=True) + exp_df = exp_df.sort_values(list(exp_df.columns), ignore_index=True) + compared = ref_df.compare(exp_df) + if not compared.empty: + diffs.append( + [ + filename, + f"Rows differ between reference and export:\n {compared}", + ] + ) + if len(diffs) > 0: + for row in diffs: + print(f"--- {row[0]} ---") + print(row[1]) + sys.exit(f"❌ Found {len(diffs)} difference(s) in core study. ❌") + print("✅ Core study reference and export matched ✅") + + +def regress_vocab(): + export_path = f"{Path(__file__).resolve().parent}/data_export/vocab" + with open(f"{export_path}/vocab__icd.csv") as f: + export_size = len(f.readlines()) + # this is the value of + if export_size != VOCAB_ICD_ROW_COUNT: + sys.exit( + f"❌ Vocab tables built from parquets are not expected length." + f" Found rows: {export_size} ❌" ) -if len(diffs) > 0: - for row in diffs: - print(f"--- {row[0]} ---") - print(row[1]) - sys.exit(f"❌ Found {len(diffs)} difference(s). ❌") -print("✅ Reference and export matched ✅") + print("✅ Vocab tables built from parquets are expected length ✅") + + +if __name__ == "__main__": + regress_vocab() + regress_core() diff --git a/tests/test_athena.py b/tests/test_athena.py index 825de153..0f3f9402 100644 --- a/tests/test_athena.py +++ b/tests/test_athena.py @@ -1,6 +1,10 @@ """Tests for Athena database support""" +import json +import os +import pathlib +from unittest import mock -from cumulus_library import databases +from cumulus_library import databases, db_config def test_schema_parsing(): @@ -37,3 +41,33 @@ def test_schema_parsing(): } parser = databases.AthenaParser() assert expected == parser.parse_found_schema(schema) + + +@mock.patch.dict( + os.environ, + clear=True, +) +@mock.patch("botocore.session.Session") +def test_upload_parquet(s3_client_mock): + path = pathlib.Path(__file__).resolve().parent + db_config.db_type = "athena" + cursor = mock.MagicMock() + cursor._schema_name = "db_schema" + cursor.connection.work_group = "workgroup" + cursor.connection.profile_name = "profile" + with open(path / "test_data/aws/boto3.client.athena.get_work_group.json") as f: + cursor.connection._client.get_work_group.return_value = json.load(f) + s3_client = mock.MagicMock() + with open(path / "test_data/aws/boto3.client.s3.put_object.json") as f: + s3_client.put_object.return_value = json.load(f) + s3_client_mock.patch("botocore.client.S3", return_value=s3_client) + resp = databases.upload_file( + cursor=cursor, + file=path / "test_data/count_synthea_patient.parquet", + study="test_study", + topic="count_patient", + remote_filename="count_synthea_patient.parquet", + ) + assert resp == ( + "s3://cumulus-athena-123456789012-us-east-1/results/cumulus_user_uploads/db_schema/test_study/count_patient" + ) diff --git a/tests/test_templates.py b/tests/test_base_templates.py similarity index 91% rename from tests/test_templates.py rename to tests/test_base_templates.py index c824cd01..d71b1f24 100644 --- a/tests/test_templates.py +++ b/tests/test_base_templates.py @@ -3,6 +3,7 @@ import pytest from pandas import DataFrame +from cumulus_library import db_config from cumulus_library.template_sql import base_templates, sql_utils @@ -275,6 +276,49 @@ def test_ctas_empty_query_creation(expected, schema, table, cols, types): assert query == expected +@pytest.mark.parametrize( + "expected,db_type,schema,table,cols,remote_types", + [ + ( + """CREATE EXTERNAL TABLE IF NOT EXISTS `test_athena`.`remote_table` ( + a String, + b Int +) +STORED AS PARQUET +LOCATION 's3://bucket/data/' +tblproperties ("parquet.compression"="SNAPPY");""", + "athena", + "test_athena", + "remote_table", + ["a", "b"], + ["String", "Int"], + ), + ( + """CREATE TABLE IF NOT EXISTS local_table AS SELECT + a, + b +FROM read_parquet('./tests/test_data/*.parquet')""", + "duckdb", + "test_duckdb", + "local_table", + ["a", "b"], + ["String", "Int"], + ), + ], +) +def test_ctas_from_parquet(expected, db_type, schema, table, cols, remote_types): + db_config.db_type = db_type + query = base_templates.get_ctas_from_parquet_query( + schema_name=schema, + table_name=table, + local_location="./tests/test_data", + remote_location="s3://bucket/data/", + table_cols=cols, + remote_table_cols_types=remote_types, + ) + assert query == expected + + def test_ctas_query_creation(): expected = """CREATE TABLE "test_schema"."test_table" AS ( SELECT * FROM ( diff --git a/tests/test_cli.py b/tests/test_cli.py index 871a9f58..9a8331f7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -23,23 +23,6 @@ from tests.conftest import duckdb_args -class MockVocabBsv: - """mock class for patching test BSVs for the vocab study""" - - builtin_open = open - - def open(self, *args, **kwargs): - if str(args[0]).endswith(".bsv"): - args = ( - Path( - "./tests/test_data/mock_bsvs/", - str(args[0]).rsplit("/", maxsplit=1)[-1], - ), - "r", - ) - return self.builtin_open(*args, **kwargs) - - @contextmanager def mock_stdin(value: str): stdin = sys.stdin @@ -299,7 +282,6 @@ def test_clean(mock_path, tmp_path, args, expected): # pylint: disable=unused-a assert expected not in table -@mock.patch("builtins.open", MockVocabBsv().open) @mock.patch.dict( os.environ, clear=True, diff --git a/tests/test_data/aws/boto3.client.athena.get_work_group.json b/tests/test_data/aws/boto3.client.athena.get_work_group.json new file mode 100644 index 00000000..ec340d8d --- /dev/null +++ b/tests/test_data/aws/boto3.client.athena.get_work_group.json @@ -0,0 +1,43 @@ +{ + "WorkGroup": { + "Name": "cumulus", + "State": "ENABLED", + "Configuration": { + "ResultConfiguration": { + "OutputLocation": "s3://cumulus-athena-123456789012-us-east-1/results/", + "EncryptionConfiguration": { + "EncryptionOption": "SSE_KMS", + "KmsKey": "arn:aws:kms:us-east-1:123456789012:key/12345678-90ab-cdef-1234-567890abcdef" + } + }, + "EnforceWorkGroupConfiguration": true, + "PublishCloudWatchMetricsEnabled": true, + "RequesterPaysEnabled": false, + "EngineVersion": { + "SelectedEngineVersion": "Athena engine version 3", + "EffectiveEngineVersion": "Athena engine version 3" + }, + "EnableMinimumEncryptionConfiguration": false + }, + "CreationTime": "datetime.datetime(2024", + "3": null, + "1": null, + "14": null, + "19": null, + "34": null, + "734000": null, + "tzinfo=tzlocal())": null + }, + "ResponseMetadata": { + "RequestId": "d4cd4be4-a56f-4c85-8a94-d8401cbe214c", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "date": "Tue, 30 Apr 2024 14:18:30 GMT", + "content-type": "application/x-amz-json-1.1", + "content-length": "750", + "connection": "keep-alive", + "x-amzn-requestid": "d4cd4be4-a56f-4c85-8a94-d8401cbe214c" + }, + "RetryAttempts": 0 + } +} diff --git a/tests/test_data/aws/boto3.client.s3.put_object.json b/tests/test_data/aws/boto3.client.s3.put_object.json new file mode 100644 index 00000000..92008575 --- /dev/null +++ b/tests/test_data/aws/boto3.client.s3.put_object.json @@ -0,0 +1,23 @@ +{ + "ResponseMetadata": { + "RequestId": "ABCDEFG", + "HostId": "1234567890", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "x-amz-id-2": "ABCDEFGHIJK", + "x-amz-request-id": "ABCDEFG", + "date": "Tue, 30 Apr 2024 14:31:32 GMT", + "x-amz-version-id": "QRSTUVWXYZ", + "x-amz-server-side-encryption": "aws:kms", + "x-amz-server-side-encryption-aws-kms-key-id": "arn:aws:kms:us-east-1:123456789012:key/12345678-90ab-cdef-1234-567890abcdef", + "etag": "\"1234567890abcdefg\"", + "server": "AmazonS3", + "content-length": "0" + }, + "RetryAttempts": 0 + }, + "ETag": "\"1234567890abcdefg\"", + "ServerSideEncryption": "aws:kms", + "VersionId": "QRSTUVWXYZ", + "SSEKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/12345678-90ab-cdef-1234-567890abcdef" +} diff --git a/tests/test_data/mock_bsvs/ICD10CM_2023AA.bsv b/tests/test_data/mock_bsvs/ICD10CM_2023AA.bsv deleted file mode 100644 index 9b567642..00000000 --- a/tests/test_data/mock_bsvs/ICD10CM_2023AA.bsv +++ /dev/null @@ -1,10 +0,0 @@ -C0000727|ICD10CM|PT|R10.0|Acute abdomen -C0000737|ICD10CM|PT|R10.9|Unspecified abdominal pain -C0000744|ICD10CM|ET|E78.6|Abetalipoproteinemia -C0000768|ICD10CM|ET|Q89.9|Congenital anomaly NOS -C0000768|ICD10CM|PT|Q89.9|Congenital malformation, unspecified -C0000768|ICD10CM|ET|Q89.9|Congenital deformity NOS -C0000770|ICD10CM|PT|K00.2|Abnormalities of size and form of teeth -C0000772|ICD10CM|ET|Q89.7|Multiple congenital anomalies NOS -C0000772|ICD10CM|ET|Q89.7|Multiple congenital deformities NOS -C0000786|ICD10CM|HT|O03|Spontaneous abortion diff --git a/tests/test_data/mock_bsvs/ICD10PCS_2023AA.bsv b/tests/test_data/mock_bsvs/ICD10PCS_2023AA.bsv deleted file mode 100644 index 81e5fbda..00000000 --- a/tests/test_data/mock_bsvs/ICD10PCS_2023AA.bsv +++ /dev/null @@ -1,10 +0,0 @@ -C0005491|ICD10PCS|PT|GZC9ZZZ|Biofeedback -C0005491|ICD10PCS|PX|GZC9ZZZ|Mental Health @ None @ Biofeedback @ Other Biofeedback @ None @ None @ None -C0010332|ICD10PCS|PT|GZ2ZZZZ|Crisis Intervention -C0010332|ICD10PCS|PX|GZ2ZZZZ|Mental Health @ None @ Crisis Intervention @ None @ None @ None @ None -C0011931|ICD10PCS|PT|BH4CZZZ|Ultrasonography of Head and Neck -C0011931|ICD10PCS|PX|BH4CZZZ|Imaging @ Skin, Subcutaneous Tissue and Breast @ Ultrasonography @ Head and Neck @ None @ None @ None -C0015400|ICD10PCS|HT|08P|Eye, Removal -C0015400|ICD10PCS|HX|08P|Medical and Surgical @ Eye @ Removal -C0015618|ICD10PCS|PT|GZ72ZZZ|Family Psychotherapy -C0015618|ICD10PCS|PX|GZ72ZZZ|Mental Health @ None @ Family Psychotherapy @ Other Family Psychotherapy @ None @ None @ None diff --git a/tests/test_data/mock_bsvs/ICD9CM_2023AA.bsv b/tests/test_data/mock_bsvs/ICD9CM_2023AA.bsv deleted file mode 100644 index e3722d4e..00000000 --- a/tests/test_data/mock_bsvs/ICD9CM_2023AA.bsv +++ /dev/null @@ -1,10 +0,0 @@ -C0000737|ICD9CM|HT|789.0|Abdominal pain -C0000737|ICD9CM|PT|789.00|Abdominal pain, unspecified site -C0000768|ICD9CM|HT|740-759.99|CONGENITAL ANOMALIES -C0000768|ICD9CM|PT|759.9|Congenital anomaly, unspecified -C0000770|ICD9CM|PT|520.2|Abnormalities of size and form of teeth -C0000774|ICD9CM|PT|251.5|Abnormality of secretion of gastrin -C0000786|ICD9CM|HT|634|Spontaneous abortion -C0000804|ICD9CM|HT|636|Illegally induced abortion -C0000814|ICD9CM|PT|632|Missed abortion -C0000821|ICD9CM|HT|640.0|Threatened abortion diff --git a/tests/test_vocab.py b/tests/test_vocab.py new file mode 100644 index 00000000..9898a760 --- /dev/null +++ b/tests/test_vocab.py @@ -0,0 +1,49 @@ +import os +from unittest import mock + +from cumulus_library import cli, databases +from tests import conftest + + +@mock.patch.dict( + os.environ, + clear=True, +) +def test_vocab(tmp_path): + cli.main( + cli_args=conftest.duckdb_args( + [ + "build", + "-t", + "core", + "-s", + "./tests/test_data", + "--database", + "test", + ], + tmp_path, + ) + ) + cli.main( + cli_args=conftest.duckdb_args( + [ + "build", + "-t", + "vocab", + "-s", + "./tests/test_data", + "--database", + f"{tmp_path}/duck.db", + ], + tmp_path, + ) + ) + db = databases.DuckDatabaseBackend(f"{tmp_path}/duck.db") + cursor = db.cursor() + table_rows, cols = conftest.get_sorted_table_data(cursor, "vocab__icd") + expected_cols = {"CUI", "TTY", "CODE", "SAB", "STR"} + found_cols = {col_schema[0] for col_schema in cols} + assert expected_cols == found_cols + assert len(table_rows) == conftest.VOCAB_ICD_ROW_COUNT + assert table_rows[0] == ("C0000727", "ICD10CM", "PT", "R10.0", "Acute abdomen") + assert table_rows[-1] == ("C5700317", "ICD10CM", "HT", "M91.3", "Pseudocoxalgia")