Skip to content

Commit

Permalink
first take
Browse files Browse the repository at this point in the history
  • Loading branch information
aysim319 committed Nov 18, 2024
1 parent 541b0aa commit 1c79068
Show file tree
Hide file tree
Showing 9 changed files with 2,408 additions and 119 deletions.
101 changes: 7 additions & 94 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
@@ -1,109 +1,22 @@
"""Registry for signal names."""
from delphi_utils import Smoother

GEOS = [
"nation",
"hhs",
"state"
]
# FROM HHS
CONFIRMED = "confirmed_admissions_covid_1d"
SUM_CONF_SUSP = "sum_confirmed_suspected_admissions_covid_1d"
CONFIRMED_PROP = "confirmed_admissions_covid_1d_prop"
SUM_CONF_SUSP_PROP = "sum_confirmed_suspected_admissions_covid_1d_prop"
CONFIRMED_FLU = "confirmed_admissions_influenza_1d"
CONFIRMED_FLU_PROP = CONFIRMED_FLU+"_prop"

# FROM CDC/METADATA
CONFIRMED_COVID = "Weekly Total COVID-19 Admissions"
HOSPITAL_CONFIRMED_COVID = "Percent Hospitals Reporting Total COVID-19 Admissions"

ADULT_CONFIRMED_COVID = "Weekly Total Adult COVID-19 Admissions"
PEDIATRIC_CONFIRMED_COVID = "Weekly Total Pediatric COVID-19 Admissions"

CONFIRMED_COVID_ADULT_PERCENT = "Percent Adult COVID-19 Admissions"
HOSPITAL_CONFIRMED_COVID_ADULT_PERCENT = "Percent Hospitals Reporting Adult COVID-19 Admissions"

CONFIRMED_COVID_PEDIATRIC_PERCENT = "Percent Pediatric COVID-19 Admissions"
HOSPITAL_COVID_CONFIRMED_PEDIATRIC_PERCENT = "Percent Hospitals Reporting Pediatric COVID-19 Admissions"

CONFIRMED_FLU = "Weekly Total Influenza Admissions"
HOSPITAL_CONFIRMED_FLU = "Percent Hospitals Reporting Influenza Admissions"

CONFIRMED_FLU_ADULT_PERCENT = "Percent Adult Influenza Admissions"
HOSPITAL_CONFIRMED_FLU_ADULT_PERCENT = "Percent Hospitals Reporting TotalPatients Hospitalized with Influenza"

CONFIRMED_FLU_PEDIATRIC_PERCENT = "Percent Pediatric Influenza Admissions"
HOSPTIAL_CONFIRMED_FLU_PEDIATRIC_PERCENT = "Percent Hospitals Reporting Pediatric Influenza Admissions"

AGE_GROUPS = [
"0to4",
"5to17",
"18to49",
"50to64",
"65to74",
"75plus",
"unk"
"state",
"nation"
]

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
ADULT_ADMISSION_COVID_API = "numconfc19newadmadult"
TOTAL_ADULT_ADMISSION_COVID_API = "totalconfc19newadmadult"
PEDIATRIC_ADMISSION_COVID_API = "numconfc19newadmped"
TOTAL_PEDIATRIC_ADMISSION_COVID_API = "totalconfc19newadmped"

PERCENT_ADULT_ADMISSION_COVID_API = "pctconfrsvnewadmadult"
PERCENT_PEDIATRIC_ADMISSION_COVID_API = "pctconfrsvnewadmped"

TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"
ADULT_ADMISSION_FLU_API = "numconffluhosppatsadult"
TOTAL_ADULT_ADMISSION_FLU_API = "numconfflunewadmadult"
PEDIATRIC_ADMISSION_FLU_API = "numconfflunewadmped"
TOTAL_PEDIATRIC_ADMISSION_FLU_API = "totalconfflunewadmped"

TOTAL_ADMISSION_RSV_API = "totalconfrsvnewadm"
ADULT_ADMISSION_RSV_API = "numconfrsvnewadmadult"
TOTAL_ADULT_ADMISSION_RSV_API = "totalconfrsvnewadmadult"
PEDIATRIC_ADMISSION_RSV_API = "numconfrsvnewadmped"
TOTAL_PEDIATRIC_ADMISSION_RSV_API = "totalconfrsvnewadmped"

PARTIAL_SIGNALS = [
TOTAL_ADMISSION_COVID_API,
ADULT_ADMISSION_COVID_API,
TOTAL_ADULT_ADMISSION_COVID_API,
PEDIATRIC_ADMISSION_COVID_API,
TOTAL_PEDIATRIC_ADMISSION_COVID_API,
PERCENT_ADULT_ADMISSION_COVID_API,
PERCENT_PEDIATRIC_ADMISSION_COVID_API,
TOTAL_ADMISSION_FLU_API,
ADULT_ADMISSION_FLU_API,
TOTAL_ADULT_ADMISSION_FLU_API,
PEDIATRIC_ADMISSION_FLU_API,
TOTAL_PEDIATRIC_ADMISSION_FLU_API,
TOTAL_ADMISSION_RSV_API,
ADULT_ADMISSION_RSV_API,
TOTAL_ADULT_ADMISSION_RSV_API,
PEDIATRIC_ADMISSION_RSV_API,
TOTAL_PEDIATRIC_ADMISSION_RSV_API,
]

SIGNALS_MAP = {
"confirmed_admissions_covid": [TOTAL_ADMISSION_COVID_API],
"confirmed_admissions_flu": [TOTAL_ADMISSION_FLU_API],
"confirmed_admissions_rsv": [TOTAL_ADMISSION_RSV_API],
"confirmed_admissions_covid": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu": TOTAL_ADMISSION_FLU_API,
}

TYPE_DICT = {
"timestamp": "datetime64[ns]",
"jurisdiction": str,
}

TYPE_DICT.update({key: float for key in PARTIAL_SIGNALS})


"geo_id": str,
}

SMOOTHERS = [
(Smoother("identity", impute_method=None), ""),
(Smoother("moving_average", window_length=7), "_7dav"),
]
TYPE_DICT.update({signal: float for signal in SIGNALS_MAP.keys()})
33 changes: 17 additions & 16 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import logging
import textwrap
from typing import Optional

import pandas as pd
from delphi_utils import create_backup_csv
from delphi_utils import create_backup_csv, GeoMapper
from sodapy import Socrata

from .constants import SIGNALS_MAP, TYPE_DICT, PARTIAL_SIGNALS


def process_signal_data(df):
for signal, signal_parts in SIGNALS_MAP.items():
df[signal] = sum([df[col] for col in signal_parts])
return df
from .constants import TYPE_DICT, SIGNALS_MAP


def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
Expand Down Expand Up @@ -52,13 +45,21 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger
break # exit the loop if no more results
results.extend(page)
offset += limit

df = pd.DataFrame.from_records(results)
keep_columns = list(TYPE_DICT.keys())

create_backup_csv(df, backup_dir, custom_run, logger=logger)
if not df.empty:
create_backup_csv(df, backup_dir, custom_run, logger=logger)

df = df.rename(columns={"weekendingdate": "timestamp"})
df = df[TYPE_DICT.keys()]
df = df.astype(TYPE_DICT)
processed_df = process_signal_data(df)
processed_df = processed_df.drop(columns=PARTIAL_SIGNALS)
return processed_df
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in SIGNALS_MAP.items():
df[signal] = df[col_name]

df = df[keep_columns]
df = df.astype(TYPE_DICT)
else:
df = pd.DataFrame(columns=keep_columns)

return df
51 changes: 48 additions & 3 deletions nhsn/delphi_nhsn/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
- Any other indicator-specific settings
"""
import time
from datetime import timedelta, datetime
from datetime import timedelta, datetime, date
from itertools import product

import numpy as np
import pandas as pd
from delphi_utils import get_structured_logger
from delphi_utils.export import create_export_csv
from delphi_utils.geomap import GeoMapper

from .constants import GEOS, SMOOTHERS, SIGNALS_MAP
from .constants import GEOS, SIGNALS_MAP
from .pull import pull_nhsn_data


Expand All @@ -46,7 +47,51 @@ def run_module(params):
backup_dir = params["common"]["backup_dir"]
custom_run = params["common"].get("custom_run", False)
socrata_token = params["indicator"]["socrata_token"]
geo_mapper = GeoMapper()
export_start_date = params["indicator"]["export_start_date"]
run_stats = []

if export_start_date == "latest": # Find the previous Saturday
export_start_date = date.today() - timedelta(
days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime('%Y-%m-%d')

df_pull = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)


nation_df = df_pull[df_pull["geo_id"] == "USA"]
state_df = df_pull[df_pull["geo_id"] != "USA"]

if not df_pull.empty:
for geo in GEOS:
if geo == "nation":
df = nation_df
else:
df = state_df
for signal in SIGNALS_MAP.keys():
df["val"] = df[signal]
df["se"] = np.nan
df["sample_size"] = np.nan
dates = create_export_csv(
df,
geo_res=geo,
export_dir=export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=signal,
weekly_dates=True
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

elapsed_time_in_seconds = round(time.time() - start_time, 2)
min_max_date = run_stats and min(s[0] for s in run_stats)
csv_export_count = sum(s[-1] for s in run_stats)
max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days
formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d")
logger.info(
"Completed indicator run",
elapsed_time_in_seconds=elapsed_time_in_seconds,
csv_export_count=csv_export_count,
max_lag_in_days=max_lag_in_days,
oldest_final_export_date=formatted_min_max_date,
)

Empty file added nhsn/tests/backups/.gitignore
Empty file.
63 changes: 63 additions & 0 deletions nhsn/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import copy
import json
from unittest.mock import patch

import pytest
from pathlib import Path

from delphi_nhsn.run import run_module

TEST_DIR = Path(__file__).parent

# test data generated with following url with socrata:
# https://data.cdc.gov/resource/ua7e-t2fy.json?$where=weekendingdate%20between%20%272023-08-19T00:00:00.000%27%20and%20%272023-10-19T00:00:00.000%27%20and%20jurisdiction%20in(%27CO%27,%27USA%27)
# queries the nhsn data with timestamp (2021-08-19, 2021-10-19) with CO and USA data
TEST_DATA = []
with open("test_data/page.json", "r") as f:
TEST_DATA = json.load(f)

@pytest.fixture(scope="session")
def params():
params = {
"common": {
"export_dir": f"{TEST_DIR}/receiving",
"log_filename": f"{TEST_DIR}/test.log",
"backup_dir": f"{TEST_DIR}/backups",
"custom_run": False
},
"indicator": {
"wip_signal": True,
"export_start_date": "2020-08-01",
"static_file_dir": "./static",
"socrata_token": "test_token"
},
"validation": {
"common": {
"span_length": 14,
"min_expected_lag": {"all": "3"},
"max_expected_lag": {"all": "4"},
}
}
}
return copy.deepcopy(params)

@pytest.fixture
def params_w_patch(params):
params_copy = copy.deepcopy(params)
params_copy["patch"] = {
"start_issue": "2024-06-27",
"end_issue": "2024-06-29",
"patch_dir": "./patch_dir"
}
return params_copy
@pytest.fixture
def mock_get(request):
with patch('sodapy.Socrata.get') as mock_get:
mock_get.side_effect = [TEST_DATA,[]]
yield mock_get

@pytest.fixture(scope="function")
def run_as_module(params, mock_get):

run_module(params)

6 changes: 0 additions & 6 deletions nhsn/tests/params.json.template

This file was deleted.

Loading

0 comments on commit 1c79068

Please sign in to comment.