Skip to content

Commit

Permalink
optimized write_csv
Browse files Browse the repository at this point in the history
  • Loading branch information
aysim319 committed Jul 3, 2024
1 parent 58b51a6 commit 81381d6
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 36 deletions.
76 changes: 52 additions & 24 deletions doctor_visits/delphi_doctor_visits/process_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,39 @@

from .config import Config

def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
def format_outname(prefix: str, se: bool, weekday:bool):
'''
Parameters
----------
df
geo_id
prefix
se
logger
weekday
Returns
-------
'''
# write out results
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
if se:
assert prefix is not None, "template has no obfuscated prefix"
out_name = prefix + "_" + out_name
return out_name

def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
'''
format dataframe and checks for anomalies to write results
Parameters
----------
df: dataframe from output from update_sensor
geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
se: boolean to write out standard errors, if true, use an obfuscated name
logger
Returns
-------
filtered and formatted dataframe
'''
# report in percentage
df['val'] = df['val'] * 100
Expand All @@ -28,53 +48,61 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
df_val_null = df[val_isnull]
if not df_val_null.empty:
logger.info("sensor value is nan, check pipeline")
filtered_df = df[~val_isnull]
df = df[~val_isnull]

se_too_high = filtered_df['se'] >= 5
df_se_too_high = filtered_df[se_too_high]
if len(df_se_too_high.empty) > 0:
se_too_high = df['se'] >= 5
df_se_too_high = df[se_too_high]
if len(df_se_too_high) > 0:
logger.info(f"standard error suspiciously high! investigate {geo_id}")
filtered_df = filtered_df[~se_too_high]
df = df[~se_too_high]

sensor_too_high = filtered_df['val'] >= 90
df_sensor_too_high = filtered_df[sensor_too_high]
sensor_too_high = df['val'] >= 90
df_sensor_too_high = df[sensor_too_high]
if len(df_sensor_too_high) > 0:
logger.info(f"standard error suspiciously high! investigate {geo_id}")
filtered_df = filtered_df[~sensor_too_high]
df = df[~sensor_too_high]

if se:
valid_cond = filtered_df['se'] > 0 & filtered_df['val'] > 0
invalid_df = filtered_df[~valid_cond]
valid_cond = (df['se'] > 0) & (df['val'] > 0)
invalid_df = df[~valid_cond]
if len(invalid_df) > 0:
logger.info(f"p=0, std_err=0 invalid")
filtered_df = filtered_df[valid_cond]
df = df[valid_cond]
else:
filtered_df.drop(columns=['se'], inplace=True)

df["se"] = np.NAN

df["direction"] = np.NAN
df["sample_size"] = np.NAN
return df

def write_to_csv(output_df: pd.DataFrame, geo_level: str, se:bool, out_name: str, logger, output_path="."):
def write_to_csv(output_df: pd.DataFrame, prefix: str, geo_id: str, weekday: bool, se:bool, logger, output_path="."):
"""Write sensor values to csv.
Args:
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
geo_level: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
se: boolean to write out standard errors, if true, use an obfuscated name
out_name: name of the output file
output_path: outfile path to write the csv (default is current directory)
"""
out_name = format_outname(prefix, se, weekday)
filtered_df = format_df(output_df, geo_id, se, logger)

if se:
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========")

out_n = 0
for d in set(output_df["date"]):
dates = set(list(output_df['date']))
grouped = filtered_df.groupby('date')
for d in dates:
filename = "%s/%s_%s_%s.csv" % (output_path,
(d + Config.DAY_SHIFT).strftime("%Y%m%d"),
geo_level,
geo_id,
out_name)
single_date_df = output_df[output_df["date"] == d]
single_date_df = grouped.get_group(d)
single_date_df = single_date_df.drop(columns=['date'])
single_date_df.to_csv(filename, index=False, na_rep="NA")

logger.debug(f"wrote {out_n} rows for {geo_level}")
logger.debug(f"wrote {len(single_date_df)} rows for {geo_id}")


def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime, logger) -> pd.DataFrame:
Expand Down
7 changes: 1 addition & 6 deletions doctor_visits/delphi_doctor_visits/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,8 @@ def run_module(params): # pylint: disable=too-many-statements
if sensor is None:
logger.error("No sensors calculated, no output will be produced")
continue
# write out results
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
if params["indicator"]["se"]:
assert prefix is not None, "template has no obfuscated prefix"
out_name = prefix + "_" + out_name

write_to_csv(sensor, geo, se, out_name, logger, export_dir)
write_to_csv(sensor, prefix, geo, weekday, se, logger, export_dir)
max_dates.append(sensor.date.max())
n_csv_export.append(sensor.date.unique().shape[0])
logger.debug(f"wrote files to {export_dir}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
geo_id,val,se,direction,sample_size
ak,0.569287,NA,NA,NA
al,0.328228,NA,NA,NA
ar,0.370763,NA,NA,NA
az,0.530073,NA,NA,NA
ca,0.351530,NA,NA,NA
co,0.401868,NA,NA,NA
ct,0.601417,NA,NA,NA
dc,0.878253,NA,NA,NA
de,0.324467,NA,NA,NA
fl,0.479217,NA,NA,NA
ga,0.475930,NA,NA,NA
hi,0.393773,NA,NA,NA
ia,0.481491,NA,NA,NA
id,0.445713,NA,NA,NA
il,0.380958,NA,NA,NA
in,0.357658,NA,NA,NA
ks,0.365005,NA,NA,NA
ky,0.368104,NA,NA,NA
la,0.405224,NA,NA,NA
ma,0.347109,NA,NA,NA
md,0.478480,NA,NA,NA
me,0.292373,NA,NA,NA
mi,0.432469,NA,NA,NA
mn,0.436532,NA,NA,NA
mo,0.354799,NA,NA,NA
ms,0.385404,NA,NA,NA
mt,0.363729,NA,NA,NA
nc,0.502467,NA,NA,NA
nd,0.384162,NA,NA,NA
ne,0.504449,NA,NA,NA
nh,0.406304,NA,NA,NA
nj,0.350642,NA,NA,NA
nm,0.336862,NA,NA,NA
nv,0.590539,NA,NA,NA
ny,0.369274,NA,NA,NA
oh,0.402905,NA,NA,NA
ok,0.339027,NA,NA,NA
or,0.421793,NA,NA,NA
pa,0.342980,NA,NA,NA
ri,0.353920,NA,NA,NA
sc,0.321687,NA,NA,NA
sd,0.508804,NA,NA,NA
tn,0.454150,NA,NA,NA
tx,0.358389,NA,NA,NA
ut,0.488488,NA,NA,NA
va,0.371326,NA,NA,NA
vt,0.307760,NA,NA,NA
wa,0.440772,NA,NA,NA
wi,0.373994,NA,NA,NA
wv,0.317498,NA,NA,NA
wy,0.346961,NA,NA,NA
51 changes: 45 additions & 6 deletions doctor_visits/tests/test_process_data.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,60 @@
"""Tests for update_sensor.py."""
from datetime import datetime
import logging
import os
from pathlib import Path
import pandas as pd

from delphi_doctor_visits.process_data import csv_to_df
from delphi_doctor_visits.process_data import csv_to_df, write_to_csv, format_outname

TEST_LOGGER = logging.getLogger()

class TestProcessData:
geo = "state",
startdate = datetime(2020, 2, 4)
enddate = datetime(2020, 2, 5)
dropdate = datetime(2020, 2,6)
geo = "state"
se = False
weekday = False
prefix = "wip_XXXXX"
filepath = "./test_data"
compare_path = "./comparison"

def test_csv_to_df(self):
actual = csv_to_df(
filepath="./test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz",
startdate=datetime(2020, 2, 4),
enddate=datetime(2020, 2, 5),
dropdate=datetime(2020, 2,6),
filepath=f"{self.filepath}/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz",
startdate=self.startdate,
enddate=self.enddate,
dropdate=self.dropdate,
logger=TEST_LOGGER,
)

comparison = pd.read_pickle("./comparison/process_data/main_after_date_SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.pkl")
comparison = pd.read_pickle(f"{self.compare_path}/process_data/main_after_date_SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.pkl")
pd.testing.assert_frame_equal(actual.reset_index(drop=True), comparison)


def test_write_to_csv(self):
output_df = pd.read_csv(f"{self.compare_path}/update_sensor/all.csv", parse_dates=["date"])

write_to_csv(
output_df=output_df,
prefix=self.prefix,
geo_id=self.geo,
se=self.se,
weekday=self.weekday,
logger=TEST_LOGGER,
output_path=self.filepath
)

outname = format_outname(self.prefix, self.se, self.weekday)

files = list(Path(self.filepath).glob(f"*{outname}.csv"))

for f in files:
filename = f.name
actual = pd.read_csv(f)
comparison = pd.read_csv(f"{self.compare_path}/write_csv/{filename}")
pd.testing.assert_frame_equal(actual, comparison)
os.remove(f)

0 comments on commit 81381d6

Please sign in to comment.