Skip to content

Commit

Permalink
core: add completion tracking support
Browse files Browse the repository at this point in the history
This will ignore any encounter rows that haven't yet loaded all the
resources we care about.

This completion tracking is opt-in and won't affect legacy data.
We include any encounters which don't have a completion group
registration.

In addition:
- Add --load-ndjson-dir to the generate-sql command.
- The core study now builds core__incomplete_encounters holding just
  the encounter IDs of ignored encounters that were deemed incomplete.
  This is not referenced by other tables - it's just a debugging tool.
  • Loading branch information
mikix committed Apr 8, 2024
1 parent 5ce8d77 commit 02a95b2
Show file tree
Hide file tree
Showing 19 changed files with 640 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
# See https://github.com/aws-actions/configure-aws-credentials for configuring
# the aws side of this - the below action is just a light wrapper
- name: Configure AWS Credentials
uses: mcblair/configure-aws-profile-action@v0.1.1
uses: mcblair/configure-aws-profile-action@v1.0.0
with:
role-arn: arn:aws:iam::${{secrets.AWS_ACCOUNT}}:role/cumulus-library-ci
region: us-east-1
Expand Down
26 changes: 25 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,28 @@ pre-commit install
```

This will install dependencies & build tools,
as well as set up a `black` auto-formatter commit hook.
as well as set up an auto-formatter commit hook.

## Adding new resources

Things to keep in mind:
- If the new resource links to Encounter,
add it to the completion checking done in the Encounter code.
(Though consider the effect this will have on existing encounters.)

## Rebuilding the reference SQL

We keep some reference SQL in git,
to help us track unexpected changes and verify our SQL indenting.
These are stored in `cumulus_library/studies/*/reference_sql/`

You can regenerate these yourself when you make changes to SQL:

```sh
cumulus-library generate-sql \
--db-type duckdb \
--database :memory: \
--load-ndjson-dir tests/test_data/duckdb_data \
--target core \
--target discovery
```
8 changes: 7 additions & 1 deletion cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ schema =
'reference': True, 'display': False, 'type': True
},
'id': True
},
},
'etl__completion': {
'group_name': True,
},
'etl__completion_encounters': {
'group_name': True,
},
'medicationrequest': {
'id': True,
'status': True,
Expand Down
14 changes: 11 additions & 3 deletions cumulus_library/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ def add_db_config(parser: argparse.ArgumentParser) -> None:
add_aws_config(parser)


def add_db_input_config(parser: argparse.ArgumentParser) -> None:
"""Like add_db_config, but for input args like duckdb's --load-ndjson-dir"""
parser.add_argument(
"--load-ndjson-dir",
help="Load ndjson files from this folder",
metavar="DIR",
)


def get_parser() -> argparse.ArgumentParser:
"""Provides parser for handling CLI arguments"""
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -160,6 +169,7 @@ def get_parser() -> argparse.ArgumentParser:
add_study_dir_argument(build)
add_verbose_argument(build)
add_db_config(build)
add_db_input_config(build)
add_data_path_argument(build)
build.add_argument(
"--statistics",
Expand All @@ -170,9 +180,6 @@ def get_parser() -> argparse.ArgumentParser:
),
dest="stats_build",
)
build.add_argument(
"--load-ndjson-dir", help="Load ndjson files from this folder", metavar="DIR"
)
build.add_argument(
"--continue",
dest="continue_from",
Expand Down Expand Up @@ -229,6 +236,7 @@ def get_parser() -> argparse.ArgumentParser:
add_target_argument(sql)
add_study_dir_argument(sql)
add_db_config(sql)
add_db_input_config(sql)
add_table_builder_argument(sql)

# Generate markdown tables for documentation
Expand Down
46 changes: 31 additions & 15 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,25 @@ def validate_table_schema(
return self._parse_found_schema(expected, schema)


def _read_rows_from_table_dir(path: str) -> list[dict]:
# Grab filenames to load (ignoring .meta files and handling missing folders)
folder = Path(path)
filenames = []
if folder.exists():
filenames = sorted(
str(x) for x in folder.iterdir() if x.name.endswith(".ndjson")
)

# Read all ndjson directly into memory
rows = []
for filename in filenames:
with open(filename, encoding="utf8") as f:
for line in f:
rows.append(json.loads(line))

return rows


def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
"""Loads a directory tree of raw ndjson into schema-ful tables.
Expand Down Expand Up @@ -363,26 +382,23 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
]
for resource in resources:
table_name = resource.lower()

# Grab filenames to load (ignoring .meta files and handling missing folders)
folder = Path(f"{path}/{table_name}")
filenames = []
if folder.exists():
filenames = sorted(
str(x) for x in folder.iterdir() if x.name.endswith(".ndjson")
)

# Read all ndjson directly into memory
rows = []
for filename in filenames:
with open(filename, encoding="utf8") as f:
for line in f:
rows.append(json.loads(line))
rows = _read_rows_from_table_dir(f"{path}/{table_name}")

# Make a pyarrow table with full schema from the data
schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, rows)
all_tables[table_name] = pyarrow.Table.from_pylist(rows, schema)

# And now some special support for a few ETL tables.
metadata_tables = [
"etl__completion",
"etl__completion_encounters",
]
for metadata_table in metadata_tables:
rows = _read_rows_from_table_dir(f"{path}/{metadata_table}")
if rows:
# Auto-detecting the schema works for these simple tables
all_tables[metadata_table] = pyarrow.Table.from_pylist(rows)

return all_tables


Expand Down
15 changes: 11 additions & 4 deletions cumulus_library/studies/core/builder_encounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
"type",
],
"id": [],
}
},
"etl__completion": {
"group_name": [],
},
"etl__completion_encounters": {
"group_name": [],
},
}


Expand Down Expand Up @@ -126,6 +132,7 @@ def prepare_queries(
validated_schema = core_templates.validate_schema(
cursor, schema, expected_table_cols, parser
)
self.queries.append(
core_templates.get_core_template("encounter", validated_schema)
)
self.queries += [
core_templates.get_core_template("encounter", validated_schema),
core_templates.get_core_template("incomplete_encounter", validated_schema),
]
68 changes: 68 additions & 0 deletions cumulus_library/studies/core/core_templates/completion_utils.jinja
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@

{#- returns a SELECT containing a list of complete encounters
There will be two columns:
- id (varchar)
- is_complete (bool)
If completion is not enabled, it will return an empty table.
If a row is not completion-tracked,
it will not be in the returned table.
Thus, you can legacy-allow any encounter that isn't represented
in this table.
-#}
{%- macro complete_encounters(schema) -%}
{%- set check_completion = (
schema['etl__completion']['group_name']
and schema['etl__completion_encounters']['group_name']
) -%}
{%- if check_completion -%}
(
WITH
-- Start by grabbing group names and exports times for each Encounter.
temp_completion_times AS (
SELECT
ece.encounter_id,
-- note that we don't chop the export time down to a DATE,
-- as is typical in the core study
min(from_iso8601_timestamp(ece.export_time)) AS earliest_export
FROM etl__completion_encounters AS ece
GROUP BY ece.encounter_id
),

-- Then examine all tables that are at least as recently loaded as the
-- Encounter. (This is meant to detect Conditions that maybe aren't
-- loaded into Athena yet for the Encounter.)
-- Make sure that we have all the tables we care about.
temp_completed_tables AS (
SELECT
ece.encounter_id,
(
BOOL_OR(ec.table_name = 'condition')
AND BOOL_OR(ec.table_name = 'documentreference')
AND BOOL_OR(ec.table_name = 'medicationrequest')
AND BOOL_OR(ec.table_name = 'observation')
) AS is_complete
FROM etl__completion_encounters AS ece
INNER JOIN temp_completion_times AS tct ON tct.encounter_id = ece.encounter_id
INNER JOIN etl__completion AS ec ON ec.group_name = ece.group_name
WHERE tct.earliest_export <= from_iso8601_timestamp(ec.export_time)
GROUP BY ece.encounter_id
)

-- Left join back with main completion_encounters table,
-- to catch rows that are completion-tracked but not in
-- temp_completed_tables.
SELECT
ece.encounter_id AS id,
(is_complete IS NOT NULL AND is_complete) AS is_complete
FROM etl__completion_encounters AS ece
LEFT JOIN temp_completed_tables AS tct ON tct.encounter_id = ece.encounter_id
)
{%- else -%}
{#- make an empty table, so that missing entries are treated as legacy rows
that aren't completion-tracked -#}
(SELECT '' AS id, FALSE AS is_complete WHERE 1=0)
{%- endif -%}
{%- endmacro -%}
13 changes: 12 additions & 1 deletion cumulus_library/studies/core/core_templates/encounter.sql.jinja
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
{% import 'core_utils.jinja' as utils %}
{% import 'completion_utils.jinja' as completion_utils %}
{#- Unlike some of the other core templates, in order to get easier access
to the nested dates in the period, we'll do the preliminary querying in
two steps.
TODO: profile speed vs a single step, consider extending date col methods
to traverse nested elements if performance is impacted-#}
CREATE TABLE core__encounter AS
WITH temp_encounter_nullable AS (
WITH

temp_encounter_completion AS {{ completion_utils.complete_encounters(schema) }},

temp_encounter_nullable AS (
SELECT DISTINCT
{{- utils.basic_cols(
'encounter',
Expand Down Expand Up @@ -51,6 +56,12 @@ WITH temp_encounter_nullable AS (
)
}}
FROM encounter AS e
LEFT JOIN temp_encounter_completion AS tec ON tec.id = e.id
WHERE (
-- NULL completion just means it's a row that isn't completion-tracked
-- (likely a legacy row), so allow it in.
tec.is_complete IS NULL OR tec.is_complete
)
),

temp_encounter AS (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{% import 'completion_utils.jinja' as completion_utils %}
CREATE TABLE core__incomplete_encounter AS
WITH
temp_encounter_completion AS {{ completion_utils.complete_encounters(schema) }}

SELECT DISTINCT tec.id
FROM temp_encounter_completion AS tec
WHERE NOT tec.is_complete
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ WITH temp_documentreference AS (
dr.docStatus,
dr.context,
dr.subject.reference AS subject_ref,
cast(NULL as varchar) AS encounter_ref,
dr.context.period.start AS author_date,
date_trunc('day', date(from_iso8601_timestamp(dr."context"."period"."start")))
AS author_day,
Expand All @@ -163,6 +162,17 @@ WITH temp_documentreference AS (
LEFT JOIN core__documentreference_dn_type AS cdrt ON dr.id = cdrt.id
LEFT JOIN core__documentreference_dn_category AS cdrc ON dr.id = cdrc.id
LEFT JOIN core__documentreference_dn_format AS cdrf ON dr.id = cdrf.id
),

temp_encounters AS (
SELECT
tdr.id,


context_encounter.encounter.reference AS encounter_ref
FROM temp_documentreference AS tdr,
unnest(context.encounter) AS context_encounter (encounter) --noqa


)

Expand All @@ -181,8 +191,8 @@ SELECT DISTINCT
tdr.author_year,
tdr.format_code,
tdr.subject_ref,
coalesce(tdr.encounter_ref, context_encounter.encounter.reference) as encounter_ref,
te.encounter_ref,
concat('DocumentReference/', tdr.id) AS documentreference_ref
FROM temp_documentreference AS tdr,
unnest(context.encounter) AS context_encounter (encounter) --noqa
FROM temp_documentreference AS tdr
LEFT JOIN temp_encounters AS te ON tdr.id = te.id
WHERE date(tdr.author_day) BETWEEN date('2016-06-01') AND current_date;
Loading

0 comments on commit 02a95b2

Please sign in to comment.