Skip to content

Commit

Permalink
feat: switch to cumulus-fhir-support's ndjson reading code
Browse files Browse the repository at this point in the history
We now load any ndjson files that cumulus-fhir-support says to.
  • Loading branch information
mikix committed Jul 2, 2024
1 parent 4ff0cce commit f91a729
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 35 deletions.
14 changes: 14 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ pre-commit install
This will install dependencies & build tools,
as well as set up an auto-formatter commit hook.

## Running tests

Tests can be run with `pytest` like you might expect,
but you'll first want to set up a fake `test` AWS profile.

1. Edit `~/.aws/credentials`
2. Add a block like:
```
[test]
aws_access_key_id = test
aws_secret_access_key = test
```
3. Then run `pytest`

## Adding new resources

Things to keep in mind:
Expand Down
38 changes: 10 additions & 28 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,37 +646,15 @@ def parse_found_schema(self, schema: dict[str, str]) -> dict:
return parsed


def _read_rows_from_files(filenames: list[str]) -> list[dict]:
"""Reads all provided ndjson files directly into memory"""
rows = []
for filename in sorted(filenames):
with open(filename, encoding="utf8") as f:
for line in f:
rows.append(json.loads(line))
return rows


def _read_rows_from_table_dir(path: Path) -> list[dict]:
"""Grab ndjson files in the Cumulus ETL output format: path/tablename/*.ndjson"""
if not path.exists():
return []

filenames = [str(x) for x in path.iterdir() if x.name.endswith(".ndjson")]
return _read_rows_from_files(filenames)


def _read_rows_for_resource(path: Path, resource: str) -> list[dict]:
rows = []

# Grab any ndjson files in Cumulus ETL input format: path/*.Resource.*.ndjson
if path.exists():
# This pattern is copied from the ETL, allowing a suffix or a numbered prefix.
pattern = re.compile(rf"([0-9]+\.)?{resource}(\.[^/]+)?\.ndjson")
filenames = [str(x) for x in path.iterdir() if pattern.match(x.name)]
rows += _read_rows_from_files(filenames)
# Support any ndjson files from the target folder directly
rows += list(cumulus_fhir_support.read_multiline_json_from_dir(path, resource))

# Also grab any ndjson files in Cumulus ETL output format
rows += _read_rows_from_table_dir(path / resource.lower())
# Also support being given an ETL output folder, and look in the table subdir
subdir = path / resource.lower()
rows += list(cumulus_fhir_support.read_multiline_json_from_dir(subdir, resource))

return rows

Expand Down Expand Up @@ -721,7 +699,11 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
"etl__completion_encounters",
]
for metadata_table in metadata_tables:
rows = _read_rows_from_table_dir(Path(f"{path}/{metadata_table}"))
rows = list(
cumulus_fhir_support.read_multiline_json_from_dir(
f"{path}/{metadata_table}"
)
)
if rows:
# Auto-detecting the schema works for these simple tables
all_tables[metadata_table] = pyarrow.Table.from_pylist(rows)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "cumulus-library"
requires-python = ">= 3.10"
dependencies = [
"ctakesclient >= 1.3",
"cumulus-fhir-support >= 1.1",
"cumulus-fhir-support >= 1.2",
"duckdb >= 0.9",
"fhirclient >= 4.1",
"Jinja2 > 3",
Expand Down
10 changes: 4 additions & 6 deletions tests/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,16 @@ def test_duckdb_from_iso8601_timestamp(timestamp, expected):

def test_duckdb_load_ndjson_dir(tmp_path):
filenames = {
"A.Patient.ndjson": False,
"1.Patient.ndjson": True,
"Patient.ndjson": True,
"Patient.hello.bye.ndjson": True,
"Patient.nope": False,
"blarg.ndjson": True,
"blarg.nope": False,
"patient/blarg.ndjson": True,
"patient/blarg.meta": False,
}
os.mkdir(f"{tmp_path}/patient")
for index, (filename, valid) in enumerate(filenames.items()):
with open(f"{tmp_path}/{filename}", "w", encoding="utf8") as f:
row_id = f"Good{index}" if valid else f"Bad{index}"
f.write(f'{{"id":"{row_id}"}}')
f.write(f'{{"id":"{row_id}", "resourceType": "Patient"}}')

db = databases.create_db_backend(
{
Expand Down Expand Up @@ -115,6 +112,7 @@ def test_duckdb_table_schema():
with open(f"{tmpdir}/observation/test.ndjson", "w", encoding="utf8") as ndjson:
json.dump(
{
"resourceType": "Observation",
"id": "test",
"component": [
{
Expand Down

0 comments on commit f91a729

Please sign in to comment.