Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use national data for nchs-mortality signals #1912

Merged
merged 11 commits into from
Jan 11, 2024
2 changes: 2 additions & 0 deletions nchs_mortality/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
disable=logging-format-interpolation,
too-many-locals,
too-many-arguments,
too-many-branches,
too-many-statements,
# Allow pytest functions to be part of a class.
no-self-use,
# Allow pytest classes to have one test.
Expand Down
1 change: 0 additions & 1 deletion nchs_mortality/delphi_nchs_mortality/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"prop"
]
INCIDENCE_BASE = 100000
GEO_RES = "state"

# this is necessary as a delimiter in the f-string expressions we use to
# construct detailed error reports
Expand Down
11 changes: 7 additions & 4 deletions nchs_mortality/delphi_nchs_mortality/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ def pull_nchs_mortality_data(token: str, test_file: Optional[str]=None):
{NEWLINE.join(df.columns)}
""") from exc

# Drop rows for locations outside US
df = df[df["state"] != "United States"]
df = df[keep_columns + ["timestamp", "state"]].set_index("timestamp")

# NCHS considers NYC as an individual state, however, we want it included
Expand All @@ -124,6 +122,11 @@ def pull_nchs_mortality_data(token: str, test_file: Optional[str]=None):
# Add population info
keep_columns.extend(["timestamp", "geo_id", "population"])
gmpr = GeoMapper()
df = gmpr.add_population_column(df, "state_name", geocode_col="state")
df = gmpr.add_geocode(df, "state_name", "state_id", from_col="state", new_col="geo_id")
# Map state to geo_id, but set dropna=False as we also have national data
df = gmpr.add_population_column(df, "state_name",
geocode_col="state", dropna=False)
df = gmpr.add_geocode(df, "state_name", "state_id",
from_col="state", new_col="geo_id", dropna=False)
# Manually set geo_id for national data
df.loc[df["state"] == "United States", "geo_id"] = "us"
return df[keep_columns]
70 changes: 40 additions & 30 deletions nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from .archive_diffs import arch_diffs
from .constants import (METRICS, SENSOR_NAME_MAP,
SENSORS, INCIDENCE_BASE, GEO_RES)
SENSORS, INCIDENCE_BASE)
from .pull import pull_nchs_mortality_data


Expand Down Expand Up @@ -73,50 +73,60 @@ def run_module(params: Dict[str, Any]):
df_pull = pull_nchs_mortality_data(token, test_file)
for metric in METRICS:
if metric == 'percent_of_expected_deaths':
logger.info("Generating signal and exporting to CSV",
metric = metric)
df = df_pull.copy()
df["val"] = df[metric]
df["se"] = np.nan
df["sample_size"] = np.nan
df = add_nancodes(df)
# df = df[~df["val"].isnull()]
sensor_name = "_".join([SENSOR_NAME_MAP[metric]])
dates = create_export_csv(
df,
geo_res=GEO_RES,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
weekly_dates=True
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
else:
for sensor in SENSORS:
for geo in ["state", "nation"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to pull this up two levels (outside of for metric in METRICS:) to reduce repetition

you could even do the filtering on geo_id ==/!= "us" there too, if you want to make another [sub]copy of df_pull

logger.info("Generating signal and exporting to CSV",
metric = metric,
sensor = sensor)
metric = metric)
melange396 marked this conversation as resolved.
Show resolved Hide resolved
df = df_pull.copy()
if sensor == "num":
df["val"] = df[metric]
if geo == "nation":
df = df[df["geo_id"] == "us"]
else:
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
df = df[df["geo_id"] != "us"]
df["val"] = df[metric]
df["se"] = np.nan
df["sample_size"] = np.nan
df = add_nancodes(df)
# df = df[~df["val"].isnull()]
melange396 marked this conversation as resolved.
Show resolved Hide resolved
sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor])
df = add_nancodes(df)
sensor_name = "_".join([SENSOR_NAME_MAP[metric]])
melange396 marked this conversation as resolved.
Show resolved Hide resolved
dates = create_export_csv(
df,
geo_res=GEO_RES,
geo_res=geo,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
melange396 marked this conversation as resolved.
Show resolved Hide resolved
weekly_dates=True
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
melange396 marked this conversation as resolved.
Show resolved Hide resolved
else:
for geo in ["state", "nation"]:
melange396 marked this conversation as resolved.
Show resolved Hide resolved
for sensor in SENSORS:
logger.info("Generating signal and exporting to CSV",
metric = metric,
sensor=sensor)
melange396 marked this conversation as resolved.
Show resolved Hide resolved
df = df_pull.copy()
if geo == "nation":
df = df[df["geo_id"] == "us"]
else:
df = df[df["geo_id"] != "us"]
if sensor == "num":
df["val"] = df[metric]
else:
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
df["se"] = np.nan
df["sample_size"] = np.nan
# df = df[~df["val"].isnull()]
melange396 marked this conversation as resolved.
Show resolved Hide resolved
df = add_nancodes(df)
sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor])
dates = create_export_csv(
df,
geo_res=geo,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
weekly_dates=True
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
melange396 marked this conversation as resolved.
Show resolved Hide resolved

# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
Expand Down
38 changes: 20 additions & 18 deletions nchs_mortality/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def test_output_files_exist(self, run_as_module, date):
for output_folder in folders:
csv_files = listdir(output_folder)

geos = ["nation", "state"]
dates = [
"202030",
"202031",
Expand All @@ -38,15 +39,14 @@ def test_output_files_exist(self, run_as_module, date):
sensors = ["num", "prop"]

expected_files = []
for d in dates:
for metric in metrics:
if metric == "deaths_percent_of_expected":
expected_files += ["weekly_" + d + "_state_" \
+ metric + ".csv"]
else:
for sensor in sensors:
expected_files += ["weekly_" + d + "_state_" \
+ metric + "_" + sensor + ".csv"]
for geo in geos:
for d in dates:
for metric in metrics:
if metric == "deaths_percent_of_expected":
expected_files += [f"weekly_{d}_{geo}_{metric}.csv"]
else:
for sensor in sensors:
expected_files += [f"weekly_{d}_{geo}_{metric}_{sensor}.csv"]
assert set(expected_files).issubset(set(csv_files))

# the 14th was a Monday
Expand All @@ -58,12 +58,14 @@ def test_output_file_format(self, run_as_module, date):
if is_mon_or_thurs:
folders.append("receiving")

for output_folder in folders:
df = pd.read_csv(
join(output_folder, "weekly_202026_state_deaths_covid_incidence_prop.csv")
)
expected_columns = [
"geo_id", "val", "se", "sample_size",
"missing_val", "missing_se", "missing_sample_size"
]
assert (df.columns.values == expected_columns).all()
geos = ["nation", "state"]
for geo in geos:
for output_folder in folders:
df = pd.read_csv(
join(output_folder, f"weekly_202026_{geo}_deaths_covid_incidence_prop.csv")
)
expected_columns = [
"geo_id", "val", "se", "sample_size",
"missing_val", "missing_se", "missing_sample_size"
]
assert (df.columns.values == expected_columns).all()
Loading