-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nssp patching code #2000
base: main
Are you sure you want to change the base?
nssp patching code #2000
Changes from 31 commits
c78ae21
7694c0a
a3ed4c2
1628d34
2536b94
e4d45e5
db906fc
8f0bb32
7f151f5
c093349
a967416
c020da6
9a6130b
b8a2177
a7d9443
e29e07e
0a4bfb6
6c0abad
7078dd0
d435bf0
8734daa
5a6f8b6
4356494
ca427a4
f58b068
5e93175
2d8670d
f0335f6
7e06f94
e678ce6
bc1d7a7
84cba84
742737b
e13d3db
9cec6ff
5450d8b
0a8da1e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
""" | ||
This module is used for patching data in the delphi_nssp package. | ||
|
||
To use this module, you need to turn on the custom_run flag | ||
and specify the range of issue dates in params.json, like so: | ||
|
||
{ | ||
"common": { | ||
"custom_run": true, | ||
... | ||
}, | ||
"validation": { | ||
... | ||
}, | ||
"patch": { | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"source_backup_credentials": { | ||
"host": "bigchunk-dev-02.delphi.cmu.edu", | ||
"user": "user", | ||
"path": "/common/source_backup/nssp" | ||
}, | ||
"patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nssp/AprilPatch", | ||
"start_issue": "2024-04-20", | ||
"end_issue": "2024-04-21", | ||
"source_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nssp/source_data" | ||
nmdefries marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
It will generate data for that range of issue dates, and store them in batch issue format: | ||
[name-of-patch]/issue_[issue-date]/nssp/actual_data_file.csv | ||
""" | ||
|
||
import sys | ||
from datetime import datetime, timedelta | ||
from os import listdir, makedirs, path | ||
from shutil import rmtree | ||
|
||
from delphi_utils import get_structured_logger, read_params | ||
from epiweeks import Week | ||
|
||
from .pull import get_source_data | ||
from .run import run_module | ||
|
||
|
||
def good_patch_config(params, logger): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can bake this in the read_params() in delphi_utils.... @nmdefries thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. I wrote this method and half way through was like hmm this should probably be generalized since it may be applicable to other indicators too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeahh I'm like oh I also would like that for mine 👀 and also saw that the current read params isn't doing any validations at all There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, could make sense to add this or a similar fn to delphi_utils. Probably we'd want it and other params validation to be separate fns from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mentioned it the #2002 and I think for right now it's better to remove this and deal with this in another issue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @minhkhul it's up to you to keep or remove the |
||
""" | ||
Check if the params.json file is correctly configured for patching. | ||
|
||
params: Dict[str, Any] | ||
Nested dictionary of parameters, typically loaded from params.json file. | ||
logger: Logger object | ||
Logger object to log messages. | ||
""" | ||
valid_config = True | ||
custom_run = params["common"].get("custom_run", False) | ||
if not custom_run: | ||
logger.error("Calling patch.py without custom_run flag set true.") | ||
valid_config = False | ||
|
||
patch_config = params.get("patch", {}) | ||
if patch_config == {}: | ||
logger.error("Custom flag is on, but patch section is missing.") | ||
valid_config = False | ||
else: | ||
required_patch_keys = ["start_issue", "end_issue", "patch_dir", "source_dir"] | ||
missing_keys = [key for key in required_patch_keys if key not in patch_config] | ||
if missing_keys: | ||
logger.error("Patch section is missing required key(s)", missing_keys=missing_keys) | ||
valid_config = False | ||
else: | ||
try: # issue dates validity check | ||
start_issue = datetime.strptime(patch_config["start_issue"], "%Y-%m-%d") | ||
end_issue = datetime.strptime(patch_config["end_issue"], "%Y-%m-%d") | ||
if start_issue > end_issue: | ||
logger.error("Start issue date is after end issue date.") | ||
valid_config = False | ||
except ValueError: | ||
logger.error("Issue dates must be in YYYY-MM-DD format.") | ||
valid_config = False | ||
|
||
if valid_config: | ||
logger.info("Good patch configuration.") | ||
return True | ||
logger.info("Bad patch configuration.") | ||
return False | ||
|
||
|
||
def patch(): | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Run the nssp indicator for a range of issue dates. | ||
|
||
The range of issue dates is specified in params.json using the following keys: | ||
- "patch": Only used for patching data | ||
- "source_backup_credentials": (optional) dict, credentials to log in to | ||
server where historical source data is backed up. | ||
if "source_dir" doesn't exist or has no files in it, we download source data to source_dir before running patch. | ||
else, we assume all needed source files are already in source_dir. | ||
- "host": str, hostname of the server where source data is backed up | ||
- "user": str, username to log in to the server | ||
- "path": str, path to the directory containing backup csv files | ||
- "start_date": str, YYYY-MM-DD format, first issue date | ||
- "end_date": str, YYYY-MM-DD format, last issue date | ||
- "patch_dir": str, directory to write all issues output | ||
- "source_dir": str, directory to read source data from. | ||
""" | ||
params = read_params() | ||
logger = get_structured_logger("delphi_nssp.patch", filename=params["common"]["log_filename"]) | ||
if not good_patch_config(params, logger): | ||
sys.exit(1) | ||
|
||
source_dir = params["patch"]["source_dir"] | ||
downloaded_source = False | ||
if not path.isdir(source_dir) or not listdir(source_dir): | ||
get_source_data(params, logger) | ||
downloaded_source = True | ||
Comment on lines
+112
to
+114
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: so we download source data if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, this actually downloads the full |
||
|
||
start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") | ||
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") | ||
|
||
logger.info(start_issue=start_issue.strftime("%Y-%m-%d")) | ||
logger.info(end_issue=end_issue.strftime("%Y-%m-%d")) | ||
logger.info(source_dir=source_dir) | ||
logger.info(patch_dir=params["patch"]["patch_dir"]) | ||
|
||
makedirs(params["patch"]["patch_dir"], exist_ok=True) | ||
|
||
current_issue = start_issue | ||
while current_issue <= end_issue: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit (optional): I have a slight preference to loop over a date range like |
||
logger.info("patching issue", issue_date=current_issue.strftime("%Y-%m-%d")) | ||
|
||
current_issue_source_csv = f"""{source_dir}/{current_issue.strftime("%Y-%m-%d")}.csv""" | ||
if not path.isfile(current_issue_source_csv): | ||
logger.info("No source data at this path", current_issue_source_csv=current_issue_source_csv) | ||
current_issue += timedelta(days=1) | ||
continue | ||
|
||
params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") | ||
|
||
# current_issue_date can be different from params["patch"]["current_issue"] | ||
# due to weekly cadence of nssp data. For weekly sources, issue dates in our | ||
# db matches with first date of epiweek that the reporting date falls in, | ||
# rather than reporting date itself. | ||
current_issue_date = Week.fromdate(current_issue).startdate() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: Since our This can fixed by keeping track of which |
||
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_date.strftime("%Y%m%d")}/nssp""" | ||
makedirs(f"{current_issue_dir}", exist_ok=True) | ||
params["common"]["export_dir"] = f"""{current_issue_dir}""" | ||
|
||
run_module(params, logger) | ||
current_issue += timedelta(days=1) | ||
|
||
if downloaded_source: | ||
rmtree(source_dir) | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
if __name__ == "__main__": | ||
patch() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,60 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Functions for pulling NSSP ER data.""" | ||
|
||
import sys | ||
import textwrap | ||
from os import makedirs, path | ||
|
||
import pandas as pd | ||
import paramiko | ||
from sodapy import Socrata | ||
|
||
from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT | ||
|
||
|
||
def get_source_data(params, logger): | ||
""" | ||
Download historical source data from a backup server. | ||
|
||
This function uses 'source_backup_credentials' configuration in params to connect | ||
to a server where backup nssp source data is stored. | ||
It then searches for CSV files that match the inclusive range of issue dates | ||
and location specified by 'path', 'start_issue', and 'end_issue'. | ||
These CSV files are then downloaded and stored in the 'source_dir' directory. | ||
Note: This function is typically used in patching only. Normal runs grab latest data from SODA API. | ||
""" | ||
makedirs(params["patch"]["source_dir"], exist_ok=True) | ||
ssh = paramiko.SSHClient() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i was wondering how this would work out since we're now saving raw data in /home/indicators/runtime/nssp/raw_backup_files.... I think for next release maybe also change where the raw files are saved; maybe /common/indicator/<data_source>? |
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | ||
host = params["patch"]["source_backup_credentials"]["host"] | ||
user = params["patch"]["source_backup_credentials"]["user"] | ||
ssh.connect(host, username=user) | ||
|
||
# Generate file names of source files to download | ||
dates = pd.date_range(start=params["patch"]["start_issue"], end=params["patch"]["end_issue"]) | ||
csv_file_names = [date.strftime("%Y-%m-%d") + ".csv" for date in dates] | ||
|
||
# Download source files | ||
sftp = ssh.open_sftp() | ||
sftp.chdir(params["patch"]["source_backup_credentials"]["path"]) | ||
num_files_transferred = 0 | ||
for remote_file_name in csv_file_names: | ||
try: | ||
local_file_path = path.join(params["patch"]["source_dir"], remote_file_name) | ||
sftp.stat(remote_file_name) | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sftp.get(remote_file_name, local_file_path) | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
num_files_transferred += 1 | ||
except IOError: | ||
logger.warning( | ||
"Source backup for this date does not exist on the remote server.", missing_filename=remote_file_name | ||
) | ||
sftp.close() | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ssh.close() | ||
|
||
if num_files_transferred == 0: | ||
logger.error("No source data was transferred. Check the source backup server for potential issues.") | ||
sys.exit(1) | ||
|
||
def warn_string(df, type_dict): | ||
"""Format the warning string.""" | ||
warn = textwrap.dedent( | ||
|
@@ -27,7 +73,7 @@ def warn_string(df, type_dict): | |
return warn | ||
|
||
|
||
def pull_nssp_data(socrata_token: str): | ||
def pull_nssp_data(socrata_token: str, params: dict, logger) -> pd.DataFrame: | ||
"""Pull the latest NSSP ER visits data, and conforms it into a dataset. | ||
|
||
The output dataset has: | ||
|
@@ -39,26 +85,40 @@ def pull_nssp_data(socrata_token: str): | |
---------- | ||
socrata_token: str | ||
My App Token for pulling the NWSS data (could be the same as the nchs data) | ||
test_file: Optional[str] | ||
When not null, name of file from which to read test data | ||
params: dict | ||
Nested dictionary of parameters, should contain info on run type. | ||
logger: | ||
Logger object | ||
|
||
Returns | ||
------- | ||
pd.DataFrame | ||
Dataframe as described above. | ||
""" | ||
# Pull data from Socrata API | ||
client = Socrata("data.cdc.gov", socrata_token) | ||
results = [] | ||
offset = 0 | ||
limit = 50000 # maximum limit allowed by SODA 2.0 | ||
while True: | ||
page = client.get("rdmq-nq56", limit=limit, offset=offset) | ||
if not page: | ||
break # exit the loop if no more results | ||
results.extend(page) | ||
offset += limit | ||
df_ervisits = pd.DataFrame.from_records(results) | ||
custom_run = params["common"].get("custom_run", False) | ||
if not custom_run: | ||
# Pull data from Socrata API | ||
client = Socrata("data.cdc.gov", socrata_token) | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
results = [] | ||
offset = 0 | ||
limit = 50000 # maximum limit allowed by SODA 2.0 | ||
while True: | ||
page = client.get("rdmq-nq56", limit=limit, offset=offset) | ||
if not page: | ||
break # exit the loop if no more results | ||
results.extend(page) | ||
offset += limit | ||
df_ervisits = pd.DataFrame.from_records(results) | ||
logger.info("Number of records grabbed from Socrata API", num_records=len(df_ervisits), source="Socrata API") | ||
elif custom_run and logger.name == "delphi_nssp.patch": | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
issue_date = params.get("patch", {}).get("current_issue", None) | ||
source_dir = params.get("patch", {}).get("source_dir", None) | ||
df_ervisits = pd.read_csv(f"{source_dir}/{issue_date}.csv") | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.info( | ||
"Number of records grabbed from source_dir/issue_date.csv", | ||
num_records=len(df_ervisits), | ||
source=f"{source_dir}/{issue_date}.csv", | ||
) | ||
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"}) | ||
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
"epiweeks", | ||
"freezegun", | ||
"us", | ||
"paramiko", | ||
] | ||
|
||
setup( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
week_end,geography,county,percent_visits_combined,percent_visits_covid,percent_visits_influenza,percent_visits_rsv,percent_visits_smoothed,percent_visits_smoothed_covid,percent_visits_smoothed_1,percent_visits_smoothed_rsv,ed_trends_covid,ed_trends_influenza,ed_trends_rsv,hsa,hsa_counties,hsa_nci_id,fips,trend_source | ||
2020-10-01T00:00:00.000,United States,All,2.84,1.84,0.48,0.55,2.83,2.07,0.34,0.44,Decreasing,Increasing,Increasing,All,All,All,0,United States | ||
2020-06-29T00:00:00.000,Alabama,All,1.01,0.85,0.17,0.0,0.89,0.66,0.22,0.03,Increasing,Decreasing,No Change,All,All,All,1000,State | ||
2020-02-25T00:00:00.000,Alabama,Blount,,,,,,,,,Data Unavailable,Data Unavailable,Data Unavailable,"Jefferson (Birmingham), AL - Shelby, AL","Bibb, Blount, Chilton, Cullman, Jefferson, Shelby, St. Clair, Walker",150,1009,HSA |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
week_end,geography,county,percent_visits_combined,percent_visits_covid,percent_visits_influenza,percent_visits_rsv,percent_visits_smoothed,percent_visits_smoothed_covid,percent_visits_smoothed_1,percent_visits_smoothed_rsv,ed_trends_covid,ed_trends_influenza,ed_trends_rsv,hsa,hsa_counties,hsa_nci_id,fips,trend_source | ||
2020-10-01T00:00:00.000,United States,All,2.84,1.84,0.48,0.55,2.83,2.07,0.34,0.44,Decreasing,Increasing,Increasing,All,All,All,0,United States | ||
2020-06-29T00:00:00.000,Alabama,All,1.01,0.85,0.17,0.0,0.89,0.66,0.22,0.03,Increasing,Decreasing,No Change,All,All,All,1000,State | ||
2020-02-25T00:00:00.000,Alabama,Blount,,,,,,,,,Data Unavailable,Data Unavailable,Data Unavailable,"Jefferson (Birmingham), AL - Shelby, AL","Bibb, Blount, Chilton, Cullman, Jefferson, Shelby, St. Clair, Walker",150,1009,HSA |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
week_end,geography,county,percent_visits_combined,percent_visits_covid,percent_visits_influenza,percent_visits_rsv,percent_visits_smoothed,percent_visits_smoothed_covid,percent_visits_smoothed_1,percent_visits_smoothed_rsv,ed_trends_covid,ed_trends_influenza,ed_trends_rsv,hsa,hsa_counties,hsa_nci_id,fips,trend_source | ||
2020-10-01T00:00:00.000,United States,All,1,1,1,1,1,1,1,1,Decreasing,Decreasing,Decreasing,All,All,All,0,United States | ||
2020-06-29T00:00:00.000,Oklahoma,All,1.01,0.85,0.17,0.0,0.89,0.66,0.22,0.03,Increasing,Decreasing,No Change,All,All,All,1000,State | ||
2020-02-25T00:00:00.000,Alabama,Blount,,,,,,,,,Data Unavailable,Data Unavailable,Data Unavailable,"Jefferson (Birmingham), AL - Shelby, AL","Bibb, Blount, Chilton, Cullman, Jefferson, Shelby, St. Clair, Walker",150,1009,HSA |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
week_end,geography,county,percent_visits_combined,percent_visits_covid,percent_visits_influenza,percent_visits_rsv,percent_visits_smoothed,percent_visits_smoothed_covid,percent_visits_smoothed_1,percent_visits_smoothed_rsv,ed_trends_covid,ed_trends_influenza,ed_trends_rsv,hsa,hsa_counties,hsa_nci_id,fips,trend_source | ||
2020-10-01T00:00:00.000,United States,All,1,1,1,1,1,1,1,1,Decreasing,Decreasing,Decreasing,All,All,All,0,United States | ||
2020-06-29T00:00:00.000,Oklahoma,All,1.01,0.85,0.17,0.0,0.89,0.66,0.22,0.03,Increasing,Decreasing,No Change,All,All,All,1000,State | ||
2020-02-25T00:00:00.000,Alabama,Blount,,,,,,,,,Data Unavailable,Data Unavailable,Data Unavailable,"Jefferson (Birmingham), AL - Shelby, AL","Bibb, Blount, Chilton, Cullman, Jefferson, Shelby, St. Clair, Walker",150,1009,HSA |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.