Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor doctor_visits: Load source file only once #1978

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c639049
replace modify_claims_drops with direct modification in update_sensor
minhkhul Jun 24, 2024
1dd80be
cleanup Config
minhkhul Jun 24, 2024
749ed2d
cleanup Config
minhkhul Jun 24, 2024
9d8d521
change test
minhkhul Jun 24, 2024
ca38bb7
lint
minhkhul Jun 24, 2024
17259d0
fix test geomap
minhkhul Jun 24, 2024
6d841da
lint
minhkhul Jun 24, 2024
4ec46df
lint
minhkhul Jun 24, 2024
9740899
adding logging for comparing processing time
aysim319 Jun 28, 2024
aacc545
using dask for read/write large files
aysim319 Jun 28, 2024
dbde5c7
undo testing change and also using datetime instead of str for date p…
aysim319 Jun 28, 2024
1394d3d
refactored reading into seperate function
aysim319 Jun 28, 2024
dfc3be2
organizing code
aysim319 Jun 28, 2024
e07c697
only procesing once and passing along the dataframe
aysim319 Jul 1, 2024
d1ee4ce
added/updated tests
aysim319 Jul 1, 2024
fc2c58d
Merge pull request #1981 from cmu-delphi/optimize_with_dask
aysim319 Jul 1, 2024
b52d80a
in progress cleaning up writing csv
aysim319 Jul 1, 2024
58b51a6
Merge branch 'main' into doctor_visits_refactor_for_speed
minhkhul Jul 2, 2024
81381d6
optimized write_csv
aysim319 Jul 3, 2024
bfa853a
lint
aysim319 Jul 3, 2024
073651f
reverting to assert
aysim319 Jul 9, 2024
dd06a91
cleaning more stuff
aysim319 Jul 9, 2024
4ddd5a0
version locking at 2024.6 due to pandas
aysim319 Jul 9, 2024
593279b
aligned preprocessing to match current & rollback write for consisten…
aysim319 Jul 11, 2024
9920821
pip versioning
aysim319 Jul 11, 2024
cd83691
rewording variable and also ensure that column order is the same
aysim319 Jul 11, 2024
79c34d3
Update doctor_visits/setup.py
aysim319 Jul 12, 2024
7896042
latest version supported for 3.8 is 2023.5.*
aysim319 Jul 12, 2024
e2f7953
fix param
aysim319 Jul 12, 2024
a4f67c0
added notes for when we upgrade to 3.9+
aysim319 Jul 12, 2024
9408c81
reverting unneeded change
aysim319 Sep 9, 2024
b2f8b0e
merge with main
aysim319 Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions doctor_visits/delphi_doctor_visits/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,36 @@ class Config:
GEO_COL = "PatCountyFIPS"
AGE_COL = "PatAgeGroup"
HRR_COLS = ["Pat HRR Name", "Pat HRR ID"]
ID_COLS = [DATE_COL] + [GEO_COL] + [AGE_COL] + HRR_COLS
FILT_COLS = ID_COLS + COUNT_COLS
DTYPES = {"ServiceDate": str, "PatCountyFIPS": str,
"Denominator": int, "Flu1": int,
"Covid_like": int, "Flu_like": int,
"Mixed": int, "PatAgeGroup": str,
"Pat HRR Name": str, "Pat HRR ID": float}
# as of 2020-05-11, input file expected to have 10 columns
# id cols: ServiceDate, PatCountyFIPS, PatAgeGroup, Pat HRR ID/Pat HRR Name
# value cols: Denominator, Covid_like, Flu_like, Flu1, Mixed
ID_COLS = [DATE_COL] + [GEO_COL] + HRR_COLS + [AGE_COL]
# drop HRR columns - unused for now since we assign HRRs by FIPS
FILT_COLS = [DATE_COL] + [GEO_COL] + [AGE_COL] + COUNT_COLS
DTYPES = {
"ServiceDate": str,
"PatCountyFIPS": str,
"Denominator": int,
"Flu1": int,
"Covid_like": int,
"Flu_like": int,
"Mixed": int,
"PatAgeGroup": str,
"Pat HRR Name": str,
"Pat HRR ID": float,
"servicedate": str,
"patCountyFIPS": str,
"patAgeGroup": str,
"patHRRname": str,
"patHRRid": float,
}
DEVIANT_COLS_MAP = {
"servicedate": "ServiceDate",
"patCountyFIPS": "PatCountyFIPS",
"patHRRname": "Pat HRR Name",
"patAgeGroup": "PatAgeGroup",
"patHRRid": "Pat HRR ID",
}

SMOOTHER_BANDWIDTH = 100 # bandwidth for the linear left Gaussian filter
MAX_BACKFILL_WINDOW = 7 # maximum number of days used to average a backfill correction
Expand Down
Binary file not shown.
52 changes: 0 additions & 52 deletions doctor_visits/delphi_doctor_visits/modify_claims_drops.py

This file was deleted.

162 changes: 162 additions & 0 deletions doctor_visits/delphi_doctor_visits/process_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Module providing functions for processing and wrangling data."""

from datetime import datetime
from pathlib import Path

import dask.dataframe as dd
import numpy as np
import pandas as pd

from .config import Config


def format_outname(prefix: str, se: bool, weekday: bool):
"""
Write out results.

Parameters
----------
prefix:
se: boolean to write out standard errors, if true, use an obfuscated name
weekday: boolean for weekday adjustments.
signals will be generated with weekday adjustments (True) or without
adjustments (False)

Returns
-------
outname str
"""
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
if se:
assert prefix is not None, "template has no obfuscated prefix"
out_name = prefix + "_" + out_name
return out_name


def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
"""
Format dataframe and checks for anomalies to write results.

Parameters
----------
df: dataframe from output from update_sensor
geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
se: boolean to write out standard errors, if true, use an obfuscated name
logger

Returns
-------
filtered and formatted dataframe
"""
# report in percentage
df["val"] = df["val"] * 100
df["se"] = df["se"] * 100

val_isnull = df["val"].isnull()
df_val_null = df[val_isnull]
if not df_val_null.empty:
logger.info("sensor value is nan, check pipeline")
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
df = df[~val_isnull]

se_too_high = df["se"] >= 5
df_se_too_high = df[se_too_high]
if len(df_se_too_high) > 0:
logger.info(f"standard error suspiciously high! investigate {geo_id}")
df = df[~se_too_high]

sensor_too_high = df["val"] >= 90
df_sensor_too_high = df[sensor_too_high]
if len(df_sensor_too_high) > 0:
logger.info(f"standard error suspiciously high! investigate {geo_id}")
df = df[~sensor_too_high]

if se:
valid_cond = (df["se"] > 0) & (df["val"] > 0)
invalid_df = df[~valid_cond]
if len(invalid_df) > 0:
logger.info("p=0, std_err=0 invalid")
df = df[valid_cond]
else:
df["se"] = np.NAN

df["direction"] = np.NAN
df["sample_size"] = np.NAN
return df


def write_to_csv(output_df: pd.DataFrame, prefix: str, geo_id: str, weekday: bool, se: bool, logger, output_path="."):
"""
Write sensor values to csv.

Args:
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
se: boolean to write out standard errors, if true, use an obfuscated name
out_name: name of the output file
output_path: outfile path to write the csv (default is current directory)
"""
out_name = format_outname(prefix, se, weekday)
filtered_df = format_df(output_df, geo_id, se, logger)

if se:
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========")

dates = set(list(output_df["date"]))
grouped = filtered_df.groupby("date")
for d in dates:
filename = "%s/%s_%s_%s.csv" % (output_path, (d + Config.DAY_SHIFT).strftime("%Y%m%d"), geo_id, out_name)
single_date_df = grouped.get_group(d)
single_date_df = single_date_df.drop(columns=["date"])
single_date_df.to_csv(filename, index=False, na_rep="NA")

logger.debug(f"wrote {len(single_date_df)} rows for {geo_id}")


def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime, logger) -> pd.DataFrame:
"""
Read csv using Dask, filters unneeded data, then converts back into pandas dataframe.

Parameters
----------
filepath: path to the aggregated doctor-visits data
startdate: first sensor date (YYYY-mm-dd)
enddate: last sensor date (YYYY-mm-dd)
dropdate: data drop date (YYYY-mm-dd)

-------
"""
filepath = Path(filepath)
logger.info(f"Processing {filepath}")

ddata = dd.read_csv(
filepath,
compression="gzip",
dtype=Config.DTYPES,
blocksize=None,
)

ddata = ddata.dropna()
# rename inconsistent column names to match config column names
ddata = ddata.rename(columns=Config.DEVIANT_COLS_MAP)

ddata = ddata[Config.FILT_COLS]
ddata[Config.DATE_COL] = dd.to_datetime(ddata[Config.DATE_COL])

# restrict to training start and end date
startdate = startdate - Config.DAY_SHIFT

assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data"
assert startdate < enddate, "Start date >= end date"
assert enddate <= dropdate, "End date > drop date"

date_filter = (ddata[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & (ddata[Config.DATE_COL] < dropdate)

df = ddata[date_filter].compute()

# aggregate age groups (so data is unique by service date and FIPS)
df = df.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index()
assert np.sum(df.duplicated()) == 0, "Duplicates after age group aggregation"
assert (df[Config.COUNT_COLS] >= 0).all().all(), "Counts must be nonnegative"

logger.info(f"Done processing {filepath}")
return df
26 changes: 10 additions & 16 deletions doctor_visits/delphi_doctor_visits/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

from delphi_utils import get_structured_logger

# first party
from .update_sensor import update_sensor, write_to_csv
from .download_claims_ftp_files import download
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .process_data import csv_to_df, write_to_csv

# first party
from .update_sensor import update_sensor


def run_module(params): # pylint: disable=too-many-statements
Expand Down Expand Up @@ -55,9 +56,6 @@ def run_module(params): # pylint: disable=too-many-statements
# find the latest files (these have timestamps)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger)

# modify data
modify_and_write(claims_file, logger)

## get end date from input file
# the filename is expected to be in the format:
# "EDI_AGG_OUTPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz"
Expand Down Expand Up @@ -89,6 +87,7 @@ def run_module(params): # pylint: disable=too-many-statements
## geographies
geos = ["state", "msa", "hrr", "county", "hhs", "nation"]

claims_df = csv_to_df(claims_file, startdate_dt, enddate_dt, dropdate_dt, logger)

## print out other vars
logger.info("outpath:\t\t%s", export_dir)
Expand All @@ -107,10 +106,10 @@ def run_module(params): # pylint: disable=too-many-statements
else:
logger.info("starting %s, no adj", geo)
sensor = update_sensor(
filepath=claims_file,
startdate=startdate,
enddate=enddate,
dropdate=dropdate,
data=claims_df,
startdate=startdate_dt,
enddate=enddate_dt,
dropdate=dropdate_dt,
geo=geo,
parallel=params["indicator"]["parallel"],
weekday=weekday,
Expand All @@ -120,13 +119,8 @@ def run_module(params): # pylint: disable=too-many-statements
if sensor is None:
logger.error("No sensors calculated, no output will be produced")
continue
# write out results
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
if params["indicator"]["se"]:
assert prefix is not None, "template has no obfuscated prefix"
out_name = prefix + "_" + out_name

write_to_csv(sensor, geo, se, out_name, logger, export_dir)
write_to_csv(sensor, prefix, geo, weekday, se, logger, export_dir)
max_dates.append(sensor.date.max())
n_csv_export.append(sensor.date.unique().shape[0])
logger.debug(f"wrote files to {export_dir}")
Expand Down
Loading
Loading