diff --git a/src/acquisition/rvdss/constants.py b/src/acquisition/rvdss/constants.py new file mode 100644 index 000000000..f06f1d5e2 --- /dev/null +++ b/src/acquisition/rvdss/constants.py @@ -0,0 +1,117 @@ +from datetime import datetime + +# The dataset calls the same viruses, provinces, regions (province groups), +# and country by multiple names. Map each of those to a common abbreviation. +VIRUSES = { + "parainfluenza": "hpiv", + "piv": "hpiv", + "para": "hpiv", + "adenovirus": "adv", + "adeno": "adv", + "human metapneumovirus": "hmpv", + "enterovirus_rhinovirus": "evrv", + "rhinovirus": "evrv", + "rhv": "evrv", + "entero_rhino": "evrv", + "rhino":"evrv", + "ev_rv":"evrv", + "coronavirus":"hcov", + "coron":"hcov", + "coro":"hcov", + "respiratory syncytial virus":"rsv", + "influenza":"flu", + "sars-cov-2":"sarscov2", +} + +GEOS = { + "newfoundland": "nl", + "newfoundland and labrador": "nl", + "prince edward island":"pe", + "nova scotia":"ns", + "new brunswick":"nb", + "québec":"qc", + "quebec":"qc", + "ontario":"on", + "manitoba" : "mb", + "saskatchewan":"sk", + "alberta": "ab", + "british columbia" :"bc", + "yukon" : "yt", + "northwest territories" : "nt", + "nunavut" : "nu", + "canada":"ca", + "can":"ca" , + "at":"atlantic", + "atl":"atlantic", + "pr" :"prairies" , + "terr" :"territories", + "uhn sinai hospital":"uhn mount sinai hospital" + } + +# Regions are groups of provinces that are geographically close together. Some single provinces are reported as their own region (e.g. Québec, Ontario). +REGIONS = ['atlantic','atl','at','province of québec','québec','qc','province of ontario','ontario','on', + 'prairies', 'pr', "british columbia",'bc',"territories",'terr',] +NATION = ["canada","can",'ca',] + +# Construct dashboard and data report URLS. +DASHBOARD_BASE_URL = "https://health-infobase.canada.ca/src/data/respiratory-virus-detections/" +DASHBOARD_W_DATE_URL = DASHBOARD_BASE_URL + "archive/{date}/" + +# May not need this since we write a function for this in pull_historic +DASHBOARD_BASE_URLS_2023_2024_SEASON = ( + DASHBOARD_W_DATE_URL.format(date = date) for date in + ( + "2024-06-20", + "2024-06-27", + "2024-07-04", + "2024-07-11", + "2024-07-18", + "2024-08-01", + "2024-08-08", + "2024-08-15", + "2024-08-22", + "2024-08-29", + "2024-09-05" + ) +) + +SEASON_BASE_URL = "https://www.canada.ca" +ALTERNATIVE_SEASON_BASE_URL = "www.phac-aspc.gc.ca/bid-bmi/dsd-dsm/rvdi-divr/" +HISTORIC_SEASON_REPORTS_URL = SEASON_BASE_URL+"/en/public-health/services/surveillance/respiratory-virus-detections-canada/{year_range}.html" +DASHBOARD_ARCHIVED_DATES_URL= "https://health-infobase.canada.ca/src/js/respiratory-virus-detections/ArchiveData.json" + +# Each URL created here points to a list of all data reports made during that +# season, e.g. +# https://www.canada.ca/en/public-health/services/surveillance/respiratory-virus-detections-canada/2014-2015.html. +# The Public Health Agency of Canada site switched in 2024 to reporting +# disease data in a dashboard with a static URL. Therefore, this collection +# of URLs does _NOT_ need to be updated. It is used for fetching historical +# data (for dates on or before June 8, 2024) only. +HISTORIC_SEASON_URLS = (HISTORIC_SEASON_REPORTS_URL.format(year_range = year_range) for year_range in + ( + "2013-2014", + "2014-2015", + "2015-2016", + "2016-2017", + "2017-2018", + "2018-2019", + "2019-2020", + "2020-2021", + "2021-2022", + "2022-2023", + "2023-2024" + ) +) + +DASHBOARD_UPDATE_DATE_FILE = "RVD_UpdateDate.csv" +DASHBOARD_DATA_FILE = "RVD_WeeklyData.csv" + + +RESP_DETECTIONS_OUTPUT_FILE = "respiratory_detections.csv" +POSITIVE_TESTS_OUTPUT_FILE = "positive_tests.csv" +COUNTS_OUTPUT_FILE = "number_of_detections.csv" + +FIRST_WEEK_OF_YEAR = 35 + +UPDATE_DATES_FILE = "update_dates.txt" +NOW = datetime.now() diff --git a/src/acquisition/rvdss/database.py b/src/acquisition/rvdss/database.py new file mode 100644 index 000000000..4e1ea1c87 --- /dev/null +++ b/src/acquisition/rvdss/database.py @@ -0,0 +1,121 @@ +""" +=============== +=== Purpose === +=============== + +Stores data provided by rvdss Corp., which contains flu lab test results. +See: rvdss.py + + +======================= +=== Data Dictionary === +======================= + +`rvdss` is the table where rvdss data is stored. ++----------+-------------+------+-----+---------+----------------+ +| Field | Type | Null | Key | Default | Extra | ++----------+-------------+------+-----+---------+----------------+ +| id | int(11) | NO | PRI | NULL | auto_increment | +| location | varchar(8) | NO | MUL | NULL | | +| epiweek | int(11) | NO | MUL | NULL | | +| value | float | NO | | NULL | | ++----------+-------------+------+-----+---------+----------------+ +id: unique identifier for each record +location: hhs1-10 +epiweek: the epiweek during which the queries were executed +value: number of total test records per facility, within each epiweek + +================= +=== Changelog === +================= +2017-12-14: + * add "need update" check + +2017-12-02: + * original version +""" + +# standard library +import argparse + +# third party +import mysql.connector + +# first party +from delphi.epidata.acquisition.rvdss import rvdss +import delphi.operations.secrets as secrets +from delphi.utils.epidate import EpiDate +import delphi.utils.epiweek as flu +from delphi.utils.geo.locations import Locations + +LOCATIONS = Locations.hhs_list +DATAPATH = "/home/automation/rvdss_data" + + +def update(locations, first=None, last=None, force_update=False, load_email=True): + # download and prepare data first + qd = rvdss.rvdssData(DATAPATH, load_email) + if not qd.need_update and not force_update: + print("Data not updated, nothing needs change.") + return + + qd_data = qd.load_csv() + qd_measurements = qd.prepare_measurements(qd_data, start_weekday=4) + qd_ts = rvdss.measurement_to_ts(qd_measurements, 7, startweek=first, endweek=last) + # connect to the database + u, p = secrets.db.epi + cnx = mysql.connector.connect(user=u, password=p, database="epidata") + cur = cnx.cursor() + + def get_num_rows(): + cur.execute("SELECT count(1) `num` FROM `rvdss`") + for (num,) in cur: + pass + return num + + # check from 4 weeks preceeding the last week with data through this week + cur.execute("SELECT max(`epiweek`) `ew0`, yearweek(now(), 6) `ew1` FROM `rvdss`") + for (ew0, ew1) in cur: + ew0 = 200401 if ew0 is None else flu.add_epiweeks(ew0, -4) + ew0 = ew0 if first is None else first + ew1 = ew1 if last is None else last + print(f"Checking epiweeks between {int(ew0)} and {int(ew1)}...") + + # keep track of how many rows were added + rows_before = get_num_rows() + + # check rvdss for new and/or revised data + sql = """ + INSERT INTO + `rvdss` (`location`, `epiweek`, `value`) + VALUES + (%s, %s, %s) + ON DUPLICATE KEY UPDATE + `value` = %s + """ + + total_rows = 0 + + for location in locations: + if location not in qd_ts: + continue + ews = sorted(qd_ts[location].keys()) + num_missing = 0 + for ew in ews: + v = qd_ts[location][ew] + sql_data = (location, ew, v, v) + cur.execute(sql, sql_data) + total_rows += 1 + if v == 0: + num_missing += 1 + if num_missing > 0: + print(f" [{location}] missing {int(num_missing)}/{len(ews)} value(s)") + + # keep track of how many rows were added + rows_after = get_num_rows() + print(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)") + + # cleanup + cur.close() + cnx.commit() + cnx.close() diff --git a/src/acquisition/rvdss/pull_historic.py b/src/acquisition/rvdss/pull_historic.py new file mode 100644 index 000000000..82ff48910 --- /dev/null +++ b/src/acquisition/rvdss/pull_historic.py @@ -0,0 +1,527 @@ +""" +Script to fetch historical data, before data reporting moved to the dashboard +format. This covers dates from the 2014-2015 season to the 2023-2024 season. + +This script should not be run in production; it will not fetch newly-posted +data. +""" + +from bs4 import BeautifulSoup +import requests +import regex as re +import pandas as pd +from epiweeks import Week +from datetime import datetime, timedelta +import math + +from constants import ( + HISTORIC_SEASON_URLS, + ALTERNATIVE_SEASON_BASE_URL, SEASON_BASE_URL, FIRST_WEEK_OF_YEAR, + DASHBOARD_ARCHIVED_DATES_URL, + DASHBOARD_BASE_URL + ) +from utils import ( + abbreviate_virus, abbreviate_geo, create_geo_types, check_date_format, + fetch_dashboard_data,preprocess_table_columns, add_flu_prefix + ) + #%% Functions + + # Report Functions +def get_report_season_years(soup): + """Get the start year of the season and the year the season ends """ + # Find the url in the page html and get the years included in the season + canonical_url = str(soup.find_all('link',rel="canonical")) + # The season range is in YYYY-YYYY format + matches = re.search("20[0-9]{2}-20[0-9]{2}",canonical_url) + + if matches: + season = matches.group(0) + years=season.split("-") + return(years) + +def add_https_prefix(urls): + """ Add https to urls, and changes any http to https""" + for i in range(len(urls)): + temp_url = urls[i] + + http_present = re.search("http:",temp_url) + if not http_present: + urls[i]=SEASON_BASE_URL+temp_url + else: + urls[i]=re.sub("http:","https:",temp_url) + return(urls) + +def construct_weekly_report_urls(soup): + """ Construct links for each week in a season""" + year= "-".join(get_report_season_years(soup)) + links=soup.find_all('a') + alternative_url = ALTERNATIVE_SEASON_BASE_URL+year + + urls = [link.get("href") for link in links if "ending" in str(link) or + alternative_url in str(link)] + + report_links = add_https_prefix(urls) + return(report_links) + +def report_weeks(soup): + """ Get a list of all the weeks in a season""" + links=soup.find_all('a') + full_weeks = [link.text for link in links if "Week" in str(link)] + weeks= [int(re.search('Week (.+?) ', week).group(1)) for week in full_weeks] + return(weeks) + +def get_report_date(week,start_year,epi=False): + """ + Get the end date of the current reporting/epiweek + + week - the epidemiological week number + start_year - the year the season starts in + epi - if True, return the date in cdc format (yearweek) + + """ + if week < FIRST_WEEK_OF_YEAR: + year=int(start_year)+1 + else: + year=int(start_year) + + epi_week = Week(year, week) + + if not epi: + report_date = str(epi_week.enddate()) + else: + report_date = str(epi_week) + + return(report_date) + +def extract_captions_of_interest(soup): + """ + finds all the table captions for the current week so tables can be identified + + The captions from the 'summary' tag require less parsing, but sometimes they + are missing. In that case, use the figure captions + """ + captions = soup.findAll('summary') + + table_identifiers = ["respiratory","number","positive","abbreviation"] + + # For every caption, check if all of the table identifiers are missing. If they are, + # this means the caption is noninformative (i.e just says Figure 1). If any of the captions are + # noninformative, use the figure captions as captions + if sum([all(name not in cap.text.lower() for name in table_identifiers) for cap in captions]) != 0: + figcaptions = soup.findAll('figcaption') + captions = captions + figcaptions + + remove_list=[] + for i in range(len(captions)): + caption = captions[i] + + matches = ["period","abbreviation","cumulative", "compared"] #skip historic comparisons and cumulative tables + # remove any captions with a class or that are uninformative + if any(x in caption.text.lower() for x in matches) or caption.has_attr('class') or all(name not in caption.text.lower() for name in table_identifiers): + remove_list.append(caption) + + new_captions = [cap for cap in captions if cap not in remove_list] + new_captions = list(set(new_captions)) + + return(new_captions) + +def get_modified_dates(soup,week_end_date): + """ + Get the date the report page was modfified + + Reports include both posted dates and modified dates. Fairly often on + historical data reports, posted date falls before the end of the week + being reported on. Then the page is modified later, presumably with + updated full-week data. Therefore, we use the modified date as the issue + date for a given report. + """ + meta_tags=soup.find_all("meta",title="W3CDTF") + for tag in meta_tags: + if tag.get("name", None) == "dcterms.modified" or tag.get("property", None) == "dcterms.modified": + date_modified = tag.get("content", None) + + mod_date = datetime.strptime(date_modified, "%Y-%m-%d") + week_date = datetime.strptime(week_end_date, "%Y-%m-%d") + + diff_days = (mod_date-week_date).days + + # Manually create a new modified date if the existing one is too long after the week. + # Historically, we commonly see data reports being modified ~5 days after + # the end of the week being reported on. In some cases, though, the + # modified date falls a long time (up to a year) after the end of the + # week being reported on. We expect that any changes made to the report + # at that point were primarily wording, and not data, changes. So if the + # modified date is NOT within 0-14 days after the end of the week, set + # the issue date to be 5 days after the end of the week. + if diff_days > 0 and diff_days < 14: + new_modified_date = mod_date + else: + new_lag = timedelta(days=5) + new_modified_date = week_date + new_lag + + new_modified_date_string = new_modified_date.strftime("%Y-%m-%d") + + return(new_modified_date_string) + + +def deduplicate_rows(table): + """ + Sometimes tables have more than one row for the same week + In that case, keep the row that has the highest canada tests + (i.e drop the rows with the lower counts) + """ + if table['week'].duplicated().any(): + duplicated_rows = table[table.duplicated('week',keep=False)] + grouped = duplicated_rows.groupby("week") + duplicates_drop = [] + + for name, group in grouped: + duplicates_drop.append(group['can tests'].idxmin()) + + new_table = table.drop(duplicates_drop).reset_index(drop=True) + + else: + new_table=table + return(new_table) + +def drop_ah1_columns(table): + h1n1_column_exists = any([re.search("h1n1",c) for c in table.columns]) + ah1_column_exists = any([re.search(r"ah1\b",c) for c in table.columns]) + + if ah1_column_exists and h1n1_column_exists: + column_name_to_drop = list(table.filter(regex=r'ah1\b')) + table.drop(columns = column_name_to_drop,inplace=True) + return(table) + +def create_detections_table(table,modified_date,week_number,week_end_date,start_year): + lab_columns =[col for col in table.columns if 'reporting' in col][0] + table=table.rename(columns={lab_columns:"geo_value"}) + table['geo_value']=table['geo_value'].str.lower() + + if start_year==2016 and week_number==3: + table["geo_value"]=[re.sub("^province of$","alberta",c) for c in table["geo_value"]] + + # make naming consistent + table.columns=[add_flu_prefix(col) for col in table.columns] + matches=['test','geo_value','positive'] + + new_names = [] + for i in range(len(table.columns)): + if not any(x in table.columns[i] for x in matches): + new_names.append(table.columns[i]+ " positive_tests") + else: + new_names.append(table.columns[i]) + + table.columns=new_names + + # remove any underscores or spaces from virus names + table.columns=[re.sub(" positive","_positive",t) for t in table.columns] + table.columns=[re.sub(" tests","_tests",t) for t in table.columns] + table.columns=[re.sub(" ","",t) for t in table.columns] + + table['geo_value'] = [abbreviate_geo(g) for g in table['geo_value']] + geo_types = [create_geo_types(g,"lab") for g in table['geo_value']] + + table = table.assign(**{'epiweek': get_report_date(week_number, start_year,epi=True), + 'time_value': week_end_date, + 'issue': modified_date, + 'geo_type':geo_types}) + + table.columns =[re.sub(" ","_",col) for col in table.columns] + return(table) + +def create_number_detections_table(table,modified_date,start_year): + week_columns = table.columns.get_indexer(table.columns[~table.columns.str.contains('week')]) + + for index in week_columns: + new_name = abbreviate_virus(table.columns[index]) + " positive_tests" + table.rename(columns={table.columns[index]: new_name}, inplace=True) + + if "week end" not in table.columns: + week_ends = [get_report_date(week,start_year) for week in table["week"]] + table.insert(1,"week end",week_ends) + + table = table.assign(**{'issue': modified_date, + 'geo_type': "nation", + 'geo_value': "ca"}) + + table=table.rename(columns={'week end':"time_value"}) + table.columns =[re.sub(" ","_",col) for col in table.columns] + table['time_value'] = [check_date_format(d) for d in table['time_value']] + + table=table.rename(columns={'week':"epiweek"}) + table['epiweek'] = [get_report_date(week, start_year,epi=True) for week in table['epiweek']] + return(table) + +def create_percent_positive_detection_table(table,modified_date,start_year, flu=False,overwrite_weeks=False): + table = deduplicate_rows(table) + table.columns = [re.sub(' +', ' ',col) for col in table.columns] + table.insert(2,"issue",modified_date) + table=table.rename(columns={'week end':"time_value"}) + table['time_value'] = [check_date_format(d) for d in table['time_value']] + + # get the name of the virus for the table to append to column names + virus_prefix=[] + if flu: + virus_prefix=['flua_pct_positive','flub_pct_positive'] + virus="flu" + table.columns=[re.sub("a_pct","flua_pct",c) for c in table.columns] + table.columns=[re.sub("b_pct","flub_pct",c) for c in table.columns] + else: + names=[] + for j in range(len(table.columns)): + old_name = table.columns[j] + if "pct_positive" in table.columns[j]: + virus_prefix=[table.columns[j]] + virus=re.match("(.*?)_pct_positive",old_name).group(1) + geo = table.columns[j-1].split(" ")[0] + new_name = geo + " " + old_name + else: + new_name=old_name + names.append(new_name) + table.columns=names + + # Remake the weeks column from dates + if overwrite_weeks: + week_ends = [datetime.strptime(date_string, "%Y-%m-%d") for date_string in table['time_value']] + table["week"] = [Week.fromdate(d).week for d in week_ends] + + # Change order of column names so tthey start with stubbnames + table = table.rename(columns=lambda x: ' '.join(x.split(' ')[::-1])) # + stubnames= virus_prefix+['tests'] + table= pd.wide_to_long(table, stubnames, i=['week','time_value','issue'], + j='geo_value', sep=" ", suffix=r'\w+').reset_index() + + table.columns=[re.sub("tests",virus+"_tests",c) for c in table.columns] + table.columns =[re.sub(" ","_",col) for col in table.columns] + + table=table.rename(columns={'week':"epiweek"}) + table['epiweek'] = [get_report_date(week, start_year,epi=True) for week in table['epiweek']] + + table['geo_value']= [abbreviate_geo(g) for g in table['geo_value']] + geo_types = [create_geo_types(g,"lab") for g in table['geo_value']] + table.insert(3,"geo_type",geo_types) + + # Calculate number of positive tests based on pct_positive and total tests + if flu: + table["flua_positive_tests"] = (table["flua_pct_positive"]/100)*table["flu_tests"] + table["flub_positive_tests"] = (table["flub_pct_positive"]/100)*table["flu_tests"] + + table["flu_positive_tests"] = table["flua_positive_tests"] + table["flub_positive_tests"] + table["flu_pct_positive"] = (table["flu_positive_tests"]/table["flu_tests"])*100 + else: + table[virus+"_positive_tests"] = (table[virus+"_pct_positive"]/100) *table[virus+"_tests"] + + table = table.set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) + + return(table) + +def fetch_one_season_from_report(url): + # From the url, go to the main landing page for a season + # which contains all the links to each week in the season + page=requests.get(url) + soup=BeautifulSoup(page.text,'html.parser') + + # get season, week numbers, urls and week ends + season = get_report_season_years(soup) + urls=construct_weekly_report_urls(soup) + weeks= report_weeks(soup) + end_dates = [get_report_date(week, season[0]) for week in weeks] + + # create tables to hold all the data for the season + all_positive_tables=pd.DataFrame() + all_number_tables=pd.DataFrame() + all_respiratory_detection_tables=pd.DataFrame() + + for week_num in range(len(urls)): + current_week = weeks[week_num] + current_week_end = end_dates[week_num] + + # In the 2019-2020 season, the webpages for weeks 5 and 47 only have + # the abbreviations table and the headers for the respiratory detections + # table, so they are effectively empty, and skipped + if season[0] == '2019': + if current_week == 5 or current_week == 47: + continue + + # Get page for the current week + temp_url=urls[week_num] + temp_page=requests.get(temp_url) + new_soup = BeautifulSoup(temp_page.text, 'html.parser') + + captions = extract_captions_of_interest(new_soup) + modified_date = get_modified_dates(new_soup,current_week_end) + + positive_tables=[] + number_table_exists = False + for i in range(len(captions)): + caption=captions[i] + tab = caption.find_next('table') + + # Remove footers from tables so the text isn't read in as a table row + if tab.find('tfoot'): + tab.tfoot.decompose() + + # In the positive adenovirus table in week 35 of the 2019-2020 season + # The week number has been duplicated, which makes all the entries in the table + # are one column to the right of where they should be. To fix this the + # entry in the table (which is the first "td" element in the html) is deleted + if season[0] == '2019' and current_week == 35: + if "Positive Adenovirus" in caption.text: + tab.select_one('td').decompose() + + # Replace commas with periods + # Some "number of detections" tables have number with commas (i.e 1,000) + # In this case the commas must be deleted, otherwise turn into periods + # because some tables have commas instead of decimal points + if "number" not in caption.text.lower(): + tab = re.sub(",",r".",str(tab)) + else: + tab = re.sub(",","",str(tab)) + + # Read table, coding all the abbreviations for missing data into NA + # Also use dropna because removing footers causes the html to have an empty row + na_values = ['N.A.','N.A', 'N.C.','N.R.','Not Available','Not Tested',"not available","not tested","N.D.","-"] + table = pd.read_html(tab,na_values=na_values)[0].dropna(how="all") + + # Check for multiline headers + # If there are any, combine them into a single line header + if isinstance(table.columns, pd.MultiIndex): + table.columns = [c[0] + " " + c[1] if c[0] != c[1] else c[0] for c in table.columns] + + # Make column names lowercase + table.columns=table.columns.str.lower() + + # One-off edge cases where tables need to be manually adjusted because + # they will cause errors otherwise + if season[0] == '2017': + if current_week == 35 and "entero" in caption.text.lower(): + # The positive enterovirus table in week 35 of the 2017-2018 season has french + # in the headers,so the french needs to be removed + table.columns = ['week', 'week end', 'canada tests', 'entero/rhino%', 'at tests', + 'entero/rhino%.1', 'qc tests', 'entero/rhino%.2', 'on tests', + 'entero/rhino%.3', 'pr tests', 'entero/rhino%.4', 'bc tests', + 'entero/rhino%.5'] + elif current_week == 35 and "adeno" in caption.text.lower(): + # In week 35 of the 2017-2018, the positive adenovirus table has ">week end" + # instead of "week end", so remove > from the column + table = table.rename(columns={'>week end':"week end"}) + elif current_week == 47 and "rsv" in caption.text.lower(): + # In week 47 of the 2017-2018 season, a date is written as 201-11-25, + # instead of 2017-11-25 + table.loc[table['week'] == 47, 'week end'] = "2017-11-25" + elif season[0] == '2015' and current_week == 41: + # In week 41 of the 2015-2016 season, a date written in m-d-y format not d-m-y + table=table.replace("10-17-2015","17-10-2015",regex=True) + elif season[0] == '2022' and current_week == 11 and "hmpv" in caption.text.lower(): + # In week 11 of the 2022-2023 season, in the positive hmpv table, + # a date is written as 022-09-03, instead of 2022-09-03 + table.loc[table['week'] == 35, 'week end'] = "2022-09-03" + + # check if both ah1 and h1n1 are given. If so drop one since they are the same virus and ah1 is always empty + table = drop_ah1_columns(table) + + # Rename columns + table= preprocess_table_columns(table) + + # If "reporting laboratory" is one of the columns of the table, the table must be + # the "Respiratory virus detections " table for a given week + # this is the lab level table that has weekly positive tests for each virus, with no revisions + # and each row represents a lab + + # If "number" is in the table caption, the table must be the + # "Number of positive respiratory detections" table, for a given week + # this is a national level table, reporting the number of detections for each virus, + # this table has revisions, so each row is a week in the season, with weeks going from the + # start of the season up to and including the current week + + # If "positive" is in the table caption, the table must be one of the + # "Positive [virus] Tests (%)" table, for a given week + # This is a region level table, reporting the total tests and percent positive tests for each virus, + # this table has revisions, so each row is a week in the season, with weeks going from the + # start of the season up to and including the current week + # The columns have the region information (i.e Pr tests, meaning this columns has the tests for the prairies) + + if "reporting laboratory" in str(table.columns): + respiratory_detection_table = create_detections_table(table,modified_date,current_week,current_week_end,season[0]) + respiratory_detection_table = respiratory_detection_table.set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) + elif "number" in caption.text.lower(): + number_table_exists = True + number_detections_table = create_number_detections_table(table,modified_date,season[0]) + number_detections_table = number_detections_table.set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) + elif "positive" in caption.text.lower(): + flu = " influenza" in caption.text.lower() + + # tables are missing week 53 + # In the 2014-2015 season the year ends at week 53 before starting at week 1 again. + # weeks 53,2 and 3 skip week 53 in the positive detection tables, going from 52 to 1, + # this means the week numbers following 52 are 1 larger then they should be + # fix this by overwriting the week number columns + + missing_week_53 = [53,2,3] + if season[0]=="2014" and current_week in missing_week_53: + overwrite_weeks=True + else: + overwrite_weeks=False + + pos_table = create_percent_positive_detection_table(table,modified_date,season[0],flu,overwrite_weeks) + + # Check for percentages >100 + # One in 2014-2015 week 39, left in + if season[0] != '2014' and current_week != 39: + for k in range(len(pos_table.columns)): + if "pct_positive" in pos_table.columns[k]: + assert all([0 <= val <= 100 or math.isnan(val) for val in pos_table[pos_table.columns[k]]]), "Percentage not from 0-100" + + positive_tables.append(pos_table) + + # create path to save files + #path = "season_" + season[0]+"_"+season[1] + + # combine all the positive tables + combined_positive_tables =pd.concat(positive_tables,axis=1) + + # Check if the indices are already in the season table + # If not, add the weeks tables into the season table + + # check for deduplication pandas + if not respiratory_detection_table.index.isin(all_respiratory_detection_tables.index).any(): + all_respiratory_detection_tables= pd.concat([all_respiratory_detection_tables,respiratory_detection_table]) + + if not combined_positive_tables.index.isin(all_positive_tables.index).any(): + all_positive_tables=pd.concat([all_positive_tables,combined_positive_tables]) + + if number_table_exists: + if not number_detections_table.index.isin(all_number_tables.index).any(): + all_number_tables=pd.concat([all_number_tables,number_detections_table]) + + return { + "respiratory_detection": all_respiratory_detection_tables, + "positive": all_positive_tables, + "count": all_number_tables, + } + +def fetch_archived_dashboard_dates(archive_url): + r=requests.get(archive_url) + values=r.json() + data=pd.json_normalize(values) + english_data = data[data["lang"]=="en"] + + archived_dates=english_data['date'].to_list() + return(archived_dates) + + +def fetch_report_data(): + # Scrape each season. + dict_list = [fetch_one_season_from_report(url) for url in HISTORIC_SEASON_URLS] + + return dict_list + +def fetch_historical_dashboard_data(): + # Update the end of the 2023-2024 season with the dashboard data + archived_dates = fetch_archived_dashboard_dates(DASHBOARD_ARCHIVED_DATES_URL) + + archived_urls= [DASHBOARD_BASE_URL + "archive/"+ date+"/" for date in archived_dates] + dict_list = [fetch_dashboard_data(url) for url in archived_urls] + + return dict_list diff --git a/src/acquisition/rvdss/run.py b/src/acquisition/rvdss/run.py new file mode 100644 index 000000000..599fc89de --- /dev/null +++ b/src/acquisition/rvdss/run.py @@ -0,0 +1,128 @@ +""" +Defines command line interface for the rvdss indicator. Current data (covering the most recent epiweek) and historical data (covering all data before the most recent epiweek) can be generated together or separately. + +Defines top-level functions to fetch data and save to disk or DB. +""" + +import pandas as pd +import os +import argparse + +from utils import fetch_dashboard_data, check_most_recent_update_date,get_dashboard_update_date +from constants import DASHBOARD_BASE_URL, RESP_DETECTIONS_OUTPUT_FILE, POSITIVE_TESTS_OUTPUT_FILE, COUNTS_OUTPUT_FILE,UPDATE_DATES_FILE +from pull_historic import fetch_report_data,fetch_historical_dashboard_data + +def update_current_data(): + + ## Check if data for current update date has already been fetched + headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' + } + + update_date = get_dashboard_update_date(DASHBOARD_BASE_URL, headers) + already_updated = check_most_recent_update_date(update_date,UPDATE_DATES_FILE) + + if not already_updated: + with open(UPDATE_DATES_FILE, 'a') as testfile: + testfile.write(update_date+ "\n") + + ## TODO: what is the base path for these files? + base_path = "." + + data_dict = fetch_dashboard_data(DASHBOARD_BASE_URL) + + table_types = { + "respiratory_detection": RESP_DETECTIONS_OUTPUT_FILE, + "positive": POSITIVE_TESTS_OUTPUT_FILE, + # "count": COUNTS_OUTPUT_FILE, # Dashboards don't contain this data. + } + for tt in table_types.keys(): + data = data_dict[tt] + + # Write the tables to separate csvs + path = base_path + "/" + table_types[tt] + + # Since this function generates new data weekly, we need to combine it with the existing data, if it exists. + if not os.path.exists(path): + data.to_csv(path,index=True) + else: + old_data = pd.read_csv(path).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value']) + + # If index already exists in the data on disk, don't add the new data -- we may have already run the weekly data fetch. + ## TODO: The check on index maybe should be stricter? Although we do deduplication upstream, so this probably won't find true duplicates + if not data.index.isin(old_data.index).any(): + old_data= pd.concat([old_data,data],axis=0) + old_data.to_csv(path,index=True) + + # ## TODO + # update_database(data) + else: + print("Data is already up to date") + +def update_historical_data(): + ## TODO: what is the base path for these files? + base_path = "." + + report_dict_list = fetch_report_data() # a dict for every season, and every seasonal dict has 2/3 tables inside + + # a dict with an entry for every week that has an archival dashboard, and each entry has 2/3 tables + dashboard_dict_list = fetch_historical_dashboard_data() + + table_types = { + "respiratory_detection": RESP_DETECTIONS_OUTPUT_FILE, + "positive": POSITIVE_TESTS_OUTPUT_FILE, + "count": COUNTS_OUTPUT_FILE, + } + for tt in table_types.keys(): + # Merge tables together from dashboards and reports for each table type. + dashboard_data = [elem.get(tt, pd.DataFrame()) for elem in dashboard_dict_list] # a list of all the dashboard tables + report_data = [elem.get(tt, None) for elem in report_dict_list] # a list of the report table + + all_report_tables = pd.concat(report_data) + all_dashboard_tables = pd.concat(dashboard_data) + + data = pd.concat([all_report_tables, all_dashboard_tables]) + + # Write the tables to separate csvs + if not data.empty: + data.to_csv(base_path +"/" + table_types[tt], index=True) + + # ## TODO + # update_database(data) + + +def main(): + # args and usage + parser = argparse.ArgumentParser() + # fmt: off + parser.add_argument( + "--current", + "-c", + action="store_true", + help="fetch current data, that is, data for the latest epiweek" + ) + parser.add_argument( + "--historical", + "-hist", + action="store_true", + help="fetch historical data, that is, data for all available time periods other than the latest epiweek" + ) + # fmt: on + args = parser.parse_args() + + current_flag, historical_flag = ( + args.current, + args.historical, + ) + if not current_flag and not historical_flag: + raise Exception("no data was requested") + + # Decide what to update + if current_flag: + update_current_data() + if historical_flag: + update_historical_data() + + +if __name__ == "__main__": + main() diff --git a/src/acquisition/rvdss/utils.py b/src/acquisition/rvdss/utils.py new file mode 100644 index 000000000..28c3fcdb1 --- /dev/null +++ b/src/acquisition/rvdss/utils.py @@ -0,0 +1,238 @@ +import requests +import pandas as pd +import io +import regex as re +from epiweeks import Week +from datetime import datetime +import math +from unidecode import unidecode +import string + +from constants import ( + VIRUSES, GEOS, REGIONS, NATION, + DASHBOARD_UPDATE_DATE_FILE, DASHBOARD_DATA_FILE + ) + +def abbreviate_virus(full_name): + lowercase=full_name.lower() + keys = (re.escape(k) for k in VIRUSES.keys()) + pattern = re.compile(r'\b(' + '|'.join(keys) + r')\b') + result = pattern.sub(lambda x: VIRUSES[x.group()], lowercase) + return(result) + +def abbreviate_geo(full_name): + lowercase=full_name.lower() + lowercase = re.sub("province of ","",lowercase) + lowercase=re.sub("\.|\*","",lowercase) + lowercase=re.sub("/territoires","",lowercase) + lowercase=re.sub("^cana$","can",lowercase) + lowercase =lowercase.translate(str.maketrans(string.punctuation, ' '*len(string.punctuation),'.'+"'")) + lowercase=re.sub("kidshospital","kids hospital",lowercase) + lowercase=re.sub(' +', ' ', lowercase) + + new_name=unidecode(lowercase) + new_name=re.sub(' +', ' ', new_name) + + keys = (re.escape(k) for k in GEOS.keys()) + pattern = re.compile(r'^\b(' + '|'.join(keys) + r')\b$') + + result = pattern.sub(lambda x: GEOS[x.group()], new_name) + + if result == new_name: + result = lowercase + return(result) + +def create_geo_types(geo,default_geo): + if geo in NATION: + geo_type="nation" + elif geo in REGIONS: + geo_type="region" + else: + geo_type = default_geo + return(geo_type) + +def check_date_format(date_string): + if not re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}",date_string): + if re.search(r"/",date_string): + new_date = re.sub(r"/","-",date_string) + new_date = datetime.strptime(new_date,"%d-%m-%Y").strftime("%Y-%m-%d") + elif re.search("[0-9]{2}-[0-9]{2}-[0-9]{4}",date_string): + new_date = datetime.strptime(date_string,"%d-%m-%Y").strftime("%Y-%m-%d") + else: + raise AssertionError("Unrecognised date format") + else: + new_date=date_string + + return(new_date) + +def get_dashboard_update_date(base_url,headers): + # Get update date + update_date_url = base_url + DASHBOARD_UPDATE_DATE_FILE + update_date_url_response = requests.get(update_date_url, headers=headers) + update_date = datetime.strptime(update_date_url_response.text,"%m/%d/%Y %H:%M:%S").strftime("%Y-%m-%d") + return(update_date) + +def check_most_recent_update_date(date,date_file): + with open(date_file) as file: + current_date = date + contents = file.read() + + already_updated = current_date in contents + return(already_updated) + +def preprocess_table_columns(table): + """ + Remove characters like . or * from columns + Abbreviate the viruses in columns + Change some naming of signals in columns (i.e order of hpiv and other) + Change some naming of locations in columns (i.e at instead of atl) + """ + table.columns = [re.sub("\xa0"," ", col) for col in table.columns] # \xa0 to space + table.columns = [re.sub("(.*?)(\.\d+)", "\\1", c) for c in table.columns] # remove .# for duplicated columns + table.columns =[re.sub("\.", "", s)for s in table.columns] #remove periods + table.columns =[re.sub(r"\((all)\)", "", s)for s in table.columns] # remove (all) + table.columns =[re.sub(r"\s*\(|\)", "", s)for s in table.columns] + table.columns = [re.sub(' +', ' ', col) for col in table.columns] # Make any muliple spaces into one space + table.columns = [re.sub(r'\(|\)', '', col) for col in table.columns] # replace () for _ + table.columns = [re.sub(r'/', '_', col) for col in table.columns] # replace / with _ + + table.columns = [re.sub(r"^at\b","atl ",t) for t in table.columns] + table.columns = [re.sub("canada","can",t) for t in table.columns] + table.columns = [re.sub(r"\bcb\b","bc",t) for t in table.columns] + + table.columns =[re.sub(r"h1n1 2009 |h1n12009|a_h1|ah1\b", "ah1n1pdm09", s)for s in table.columns] + table.columns =[re.sub(r"a_uns", "auns", s)for s in table.columns] + table.columns =[re.sub(r"a_h3", "ah3", s)for s in table.columns] + + table.columns =[abbreviate_virus(col) for col in table.columns] # abbreviate viruses + table.columns = [re.sub(r"flu a","flua",t) for t in table.columns] + table.columns = [re.sub(r"flu b","flub",t) for t in table.columns] + table.columns = [re.sub(r"flutest\b","flu test", col) for col in table.columns] + table.columns = [re.sub(r"other hpiv|other_hpiv","hpivother",t) for t in table.columns] + + table.columns=[re.sub(r'bpositive','b_positive',c) for c in table.columns] + table.columns=[re.sub(r'apositive','a_positive',c) for c in table.columns] + table.columns=[re.sub(r'hpiv_1','hpiv1',c) for c in table.columns] + table.columns=[re.sub(r'hpiv_2','hpiv2',c) for c in table.columns] + table.columns=[re.sub(r'hpiv_3','hpiv3',c) for c in table.columns] + table.columns=[re.sub(r'hpiv_4','hpiv4',c) for c in table.columns] + + table.columns=[make_signal_type_spelling_consistent(col) for col in table.columns] + return(table) + +def add_flu_prefix(flu_subtype): + """ Add the prefix `flu` when only the subtype is reported """ + + pat1 =r"^ah3" + pat2= r"^auns" + pat3= r"^ah1pdm09" + pat4= r"^ah1n1pdm09" + combined_pat = '|'.join((pat1, pat2,pat3,pat4)) + + full_fluname = re.sub(combined_pat, r"flu\g<0>",flu_subtype) + return(full_fluname) + +def make_signal_type_spelling_consistent(signal): + """ + Make the signal type (i.e. percent positive, number tests, total tests) have consistent spelling + Also remove total from signal names + """ + + pat1 = r"positive\b" + pat2 = r'pos\b' + combined_pat = '|'.join((pat1, pat2)) + + pat3 = r"test\b" + pat4 = 'tested' + combined_pat2 = '|'.join((pat3, pat4)) + + new_signal = re.sub(combined_pat, "positive_tests",signal) + new_signal = re.sub(combined_pat2, "tests",new_signal) + new_signal =re.sub(" *%", "_pct_positive",new_signal) + new_signal = re.sub("total ", "",new_signal) + return(new_signal) + +def get_positive_data(base_url,headers,update_date): + # Get update data + url = base_url+DASHBOARD_DATA_FILE + + url_response = requests.get(url, headers=headers) + df = pd.read_csv(io.StringIO(url_response.text)) + + df['virus'] = [abbreviate_virus(v) for v in df['virus']] + epiw = df.apply(lambda x: Week(x['year'],x['week']),axis=1) + df.insert(0,"epiweek",[int(str(w)) for w in epiw]) + df['epiweek'] = [int(str(w)) for w in df['epiweek']] + df['province'] = [abbreviate_geo(g) for g in df['province']] + df=df.rename(columns={'province':"geo_value",'date':'time_value',"detections":"positivetests"}) + df['time_value'] = [check_date_format(d) for d in df['time_value']] + df['geo_type'] = [create_geo_types(g,"province") for g in df['geo_value']] + df.insert(1,"issue",update_date) + + #df=df.drop(["weekorder","region","year","week"],axis=1) + + df = df.pivot(index=['epiweek','time_value','issue','geo_type','geo_value','region','week','weekorder','year'], + columns="virus",values=['tests','percentpositive','positivetests']) + + df.columns = ['_'.join(col).strip() for col in df.columns.values] + df = df.rename(columns=lambda x: '_'.join(x.split('_')[1:]+x.split('_')[:1])) + df.columns = [re.sub(r'/', '', col) for col in df.columns] # replace / with _ + df.columns = [re.sub(r"flu a","flua",t) for t in df.columns] + df.columns = [re.sub(r"flu b","flub",t) for t in df.columns] + df.columns=[re.sub("positivetests", "positive_tests",col) for col in df.columns] + df.columns=[re.sub("percentpositive", "pct_positive",col) for col in df.columns] + df.columns=[re.sub(r' ','_',c) for c in df.columns] + + for k in range(len(df.columns)): + if "pct_positive" in df.columns[k]: + assert all([0 <= val <= 100 or math.isnan(val) for val in df[df.columns[k]]]), "Percentage not from 0-100" + + return(df) + +def get_detections_data(base_url,headers,update_date): + # Get current week and year + summary_url = base_url + "RVD_SummaryText.csv" + summary_url_response = requests.get(summary_url, headers=headers) + summary_df = pd.read_csv(io.StringIO(summary_url_response.text)) + + week_df = summary_df[(summary_df['Section'] == "summary") & (summary_df['Type']=="title")] + week_string = week_df.iloc[0]['Text'].lower() + current_week = int(re.search("week (.+?) ", week_string).group(1)) + current_year= int(re.search("20\d{2}", week_string).group(0)) + + current_epiweek= Week(current_year,current_week) + + # Get weekly data + detections_url = base_url + "RVD_CurrentWeekTable.csv" + detections_url_response = requests.get(detections_url, headers=headers) + detections_url_response.encoding='UTF-8' + df_detections = pd.read_csv(io.StringIO(detections_url_response.text)) + + df_detections = df_detections.rename(columns=lambda x: '_'.join(x.split('_')[1:]+x.split('_')[:1])) + df_detections.insert(0,"epiweek",int(str(current_epiweek))) + df_detections.insert(1,"time_value",str(current_epiweek.enddate())) + df_detections.insert(2,"issue",update_date) + df_detections=preprocess_table_columns(df_detections) + + df_detections.columns=[re.sub(r' ','_',c) for c in df_detections.columns] + df_detections=df_detections.rename(columns={'reportinglaboratory':"geo_value"}) + df_detections['geo_value'] = [abbreviate_geo(g) for g in df_detections['geo_value']] + df_detections['geo_type'] = [create_geo_types(g,"lab") for g in df_detections['geo_value']] + + return(df_detections.set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value'])) + +def fetch_dashboard_data(url): + headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' + } + + update_date = get_dashboard_update_date(url, headers) + + detections_data = get_detections_data(url,headers,update_date) + positive_data = get_positive_data(url,headers,update_date) + + return { + "respiratory_detection": detections_data, + "positive": positive_data, + # "count": None, # Dashboards don't contain this data. + } diff --git a/src/ddl/rvdss.sql b/src/ddl/rvdss.sql new file mode 100644 index 000000000..d3a17a5b5 --- /dev/null +++ b/src/ddl/rvdss.sql @@ -0,0 +1,49 @@ +USE epidata; +/* +TODO: briefly describe data source and define all columns. +*/ + +CREATE TABLE `rvdss_repiratory_detections` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `date` date NOT NULL, + `geo_type` char(20) NOT NULL, + `geo_value` char(20) NOT NULL, + `epiweek` int(11) NOT NULL, + `flua_positive_tests` int(11) NOT NULL, + `flua_percent_positive_tests` double NOT NULL, + `flu_total_tests` int(11) NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `date` (`date`,`geo_value`), + KEY `state` (`state`), + KEY `epiweek` (`epiweek`), +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE `rvdss_testing` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `date` date NOT NULL, + `geo_type` char(20) NOT NULL, + `geo_value` char(20) NOT NULL, + `epiweek` int(11) NOT NULL, + `flua_positive_tests` int(11) NOT NULL, + `flua_percent_positive_tests` double NOT NULL, + `flu_total_tests` int(11) NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `date` (`date`,`geo_value`), + KEY `state` (`state`), + KEY `epiweek` (`epiweek`), +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE `rvdss_detections_counts` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `date` date NOT NULL, + `geo_type` char(20) NOT NULL, + `geo_value` char(20) NOT NULL, + `epiweek` int(11) NOT NULL, + `flua_positive_tests` int(11) NOT NULL, + `flua_percent_positive_tests` double NOT NULL, + `flu_total_tests` int(11) NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `date` (`date`,`geo_value`), + KEY `state` (`state`), + KEY `epiweek` (`epiweek`), +) ENGINE=InnoDB DEFAULT CHARSET=utf8;