diff --git a/src/acquisition/cdcp/cdc_dropbox_receiver.py b/src/acquisition/cdcp/cdc_dropbox_receiver.py index 4fa20368e..931cd70d7 100644 --- a/src/acquisition/cdcp/cdc_dropbox_receiver.py +++ b/src/acquisition/cdcp/cdc_dropbox_receiver.py @@ -26,6 +26,7 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.common.logger import get_structured_logger # location constants @@ -33,6 +34,9 @@ DELPHI_BASE_DIR = "/common/cdc_stage" +logger = get_structured_logger("cdc_dropbox_receiver") + + def get_timestamp_string(): """ Return the current local date and time as a string. @@ -69,30 +73,30 @@ def fetch_data(): dbx = dropbox.Dropbox(secrets.cdcp.dropbox_token) # look for new CDC data files - print(f"checking dropbox: {DROPBOX_BASE_DIR}") + logger.info(f"checking dropbox: {DROPBOX_BASE_DIR}") save_list = [] for entry in dbx.files_list_folder(DROPBOX_BASE_DIR).entries: name = entry.name if name.endswith(".csv") or name.endswith(".zip"): - print(f" download: {name}") + logger.info(f" download: {name}") save_list.append(name) else: - print(f" skip: {name}") + logger.info(f" skip: {name}") # determine if there's anything to be done if len(save_list) == 0: - print("did not find any new data files") + logger.info("did not find any new data files") return # download new files, saving them inside of a new zip file timestamp = get_timestamp_string() zip_path = f"{DELPHI_BASE_DIR}/dropbox_{timestamp}.zip" - print(f"downloading into delphi:{zip_path}") + logger.info(f"downloading into delphi:{zip_path}") with ZipFile(zip_path, "w", ZIP_DEFLATED) as zf: for name in save_list: # location of the file on dropbox dropbox_path = f"{DROPBOX_BASE_DIR}/{name}" - print(f" {dropbox_path}") + logger.info(f" {dropbox_path}") # start the download meta, resp = dbx.files_download(dropbox_path) @@ -101,7 +105,7 @@ def fetch_data(): if resp.status_code != 200: raise Exception(["resp.status_code", resp.status_code]) dropbox_len = meta.size - print(f" need {int(dropbox_len)} bytes...") + logger.info(f" need {int(dropbox_len)} bytes...") content_len = int(resp.headers.get("Content-Length", -1)) if dropbox_len != content_len: info = ["dropbox_len", dropbox_len, "content_len", content_len] @@ -112,27 +116,27 @@ def fetch_data(): # check the length again payload_len = len(filedata) - print(" downloaded") + logger.info(" downloaded") if dropbox_len != payload_len: info = ["dropbox_len", dropbox_len, "payload_len", payload_len] raise Exception(info) # add the downloaded file to the zip file zf.writestr(name, filedata) - print(" added") + logger.info(" added") # At this point, all the data is stored and awaiting further processing on # the delphi server. - print(f"saved all new data in {zip_path}") + logger.info(f"saved all new data in {zip_path}") # on dropbox, archive downloaded files so they won't be downloaded again archive_dir = f"archived_reports/processed_{timestamp}" - print("archiving files...") + logger.info("archiving files...") for name in save_list: # source and destination dropbox_src = f"{DROPBOX_BASE_DIR}/{name}" dropbox_dst = f"{DROPBOX_BASE_DIR}/{archive_dir}/{name}" - print(f" {dropbox_src} -> {dropbox_dst}") + logger.info(f" {dropbox_src} -> {dropbox_dst}") # move the file meta = dbx.files_move(dropbox_src, dropbox_dst) @@ -142,9 +146,9 @@ def fetch_data(): raise Exception(f"failed to move {name}") # finally, trigger the usual processing flow - print("triggering processing flow") + logger.info("triggering processing flow") trigger_further_processing() - print("done") + logger.info("done") def main(): diff --git a/src/acquisition/cdcp/cdc_extract.py b/src/acquisition/cdcp/cdc_extract.py index 0d38e0bcc..0364c2e96 100644 --- a/src/acquisition/cdcp/cdc_extract.py +++ b/src/acquisition/cdcp/cdc_extract.py @@ -72,6 +72,10 @@ import delphi.operations.secrets as secrets import delphi.utils.epiweek as flu from . import cdc_upload +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("cdc_extract") def get_num_hits(cur, epiweek, state, page): @@ -166,7 +170,7 @@ def extract(first_week=None, last_week=None, test_mode=False): cur.execute("SELECT max(`epiweek`) FROM `cdc_meta`") for (last_week,) in cur: pass - print(f"extracting {int(first_week)}--{int(last_week)}") + logger.info(f"extracting {int(first_week)}--{int(last_week)}") # update each epiweek for epiweek in flu.range_epiweeks(first_week, last_week, inclusive=True): @@ -178,9 +182,9 @@ def extract(first_week=None, last_week=None, test_mode=False): nums[i] = get_num_hits(cur, epiweek, state, pages[i]) total = get_total_hits(cur, epiweek, state) store_result(cur, epiweek, state, *nums, total) - print(f" {epiweek}-{state}: {' '.join(str(n) for n in nums)} ({total})") + logger.info(f" {epiweek}-{state}: {' '.join(str(n) for n in nums)} ({total})") except Exception as ex: - print(f" {int(epiweek)}-{state}: failed", ex) + logger.error(f" {int(epiweek)}-{state}: failed", exception=ex) # raise ex sys.stdout.flush() diff --git a/src/acquisition/cdcp/cdc_upload.py b/src/acquisition/cdcp/cdc_upload.py index 0e191267b..cc9f723a1 100644 --- a/src/acquisition/cdcp/cdc_upload.py +++ b/src/acquisition/cdcp/cdc_upload.py @@ -84,6 +84,10 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("cdc_upload") STATES = { @@ -216,7 +220,7 @@ def handler(reader): def parse_zip(zf, level=1): for name in zf.namelist(): prefix = " " * level - print(prefix, name) + logger.info(f"{prefix}, {name}") if name[-4:] == ".zip": with zf.open(name) as temp: with ZipFile(io.BytesIO(temp.read())) as zf2: @@ -228,30 +232,29 @@ def parse_zip(zf, level=1): elif "Regions for all CDC" in name: handler = parse_csv(True) else: - print(prefix, " (skipped)") + logger.info(f"{prefix}, (skipped)") if handler is not None: with zf.open(name) as temp: count = handler(csv.reader(io.StringIO(str(temp.read(), "utf-8")))) - print(prefix, f" {int(count)} rows") + logger.info(f"{prefix}, {int(count)} rows") else: - print(prefix, " (ignored)") + logger.info(f"{prefix} (ignored)") # find, parse, and move zip files zip_files = glob.glob("/common/cdc_stage/*.zip") - print("searching...") + logger.info("searching...") for f in zip_files: - print(" ", f) - print("parsing...") + logger.info(f"{f}") for f in zip_files: with ZipFile(f) as zf: parse_zip(zf) - print("moving...") + logger.info("moving...") for f in zip_files: src = f dst = os.path.join("/home/automation/cdc_page_stats/", os.path.basename(src)) - print(" ", src, "->", dst) + logger.info(" ", src, "->", dst) if test_mode: - print(" (test mode enabled - not moved)") + logger.info(" (test mode enabled - not moved)") else: shutil.move(src, dst) if not os.path.isfile(dst): diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 871061b81..7d7545957 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -323,6 +323,7 @@ def delete_batch(self, cc_deletions): - time_type """ + logger = get_structured_logger("delete_batch") tmp_table_name = "tmp_delete_table" # composite keys: short_comp_key = "`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`" @@ -412,21 +413,21 @@ def split_list(lst, n): yield lst[i:(i+n)] for deletions_batch in split_list(cc_deletions, 100000): self._cursor.executemany(load_tmp_table_insert_sql, deletions_batch) - print(f"load_tmp_table_insert_sql:{self._cursor.rowcount}") + logger.debug(f"load_tmp_table_insert_sql:{self._cursor.rowcount}") else: raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}") self._cursor.execute(add_history_id_sql) - print(f"add_history_id_sql:{self._cursor.rowcount}") + logger.debug(f"add_history_id_sql:{self._cursor.rowcount}") self._cursor.execute(mark_for_update_latest_sql) - print(f"mark_for_update_latest_sql:{self._cursor.rowcount}") + logger.debug(f"mark_for_update_latest_sql:{self._cursor.rowcount}") self._cursor.execute(delete_history_sql) - print(f"delete_history_sql:{self._cursor.rowcount}") + logger.debug(f"delete_history_sql:{self._cursor.rowcount}") total = self._cursor.rowcount # TODO: consider reporting rows removed and/or replaced in latest table as well self._cursor.execute(delete_latest_sql) - print(f"delete_latest_sql:{self._cursor.rowcount}") + logger.debug(f"delete_latest_sql:{self._cursor.rowcount}") self._cursor.execute(update_latest_sql) - print(f"update_latest_sql:{self._cursor.rowcount}") + logger.debug(f"update_latest_sql:{self._cursor.rowcount}") self._connection.commit() if total == -1: diff --git a/src/acquisition/covidcast/test_utils.py b/src/acquisition/covidcast/test_utils.py index 5a978f8cd..a5d1335fa 100644 --- a/src/acquisition/covidcast/test_utils.py +++ b/src/acquisition/covidcast/test_utils.py @@ -12,6 +12,8 @@ from delphi.epidata.server._config import REDIS_HOST, REDIS_PASSWORD from delphi.epidata.server.utils.dates import day_to_time_value, time_value_to_day import delphi.operations.secrets as secrets +from delphi.epidata.common.logger import get_structured_logger + # all the Nans we use here are just one value, so this is a shortcut to it: nmv = Nans.NOT_MISSING.value @@ -193,7 +195,7 @@ def localTearDown(self): def _insert_rows(self, rows: Sequence[CovidcastTestRow]): # inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables n = self._db.insert_or_update_bulk(rows) - print(f"{n} rows added to load table & dispatched to v4 schema") + get_structured_logger("covidcast_test_utils").info(f"{n} rows added to load table & dispatched to v4 schema") self._db._connection.commit() # NOTE: this isnt expressly needed for our test cases, but would be if using external access (like through client lib) to ensure changes are visible outside of this db session def params_from_row(self, row: CovidcastTestRow, **kwargs): diff --git a/src/acquisition/covidcast_nowcast/load_sensors.py b/src/acquisition/covidcast_nowcast/load_sensors.py new file mode 100644 index 000000000..56ac3bddc --- /dev/null +++ b/src/acquisition/covidcast_nowcast/load_sensors.py @@ -0,0 +1,112 @@ +from shutil import move +import os +import time + +import pandas as pd +import sqlalchemy + +import delphi.operations.secrets as secrets +from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails +from delphi.epidata.common.logger import get_structured_logger + + +SENSOR_CSV_PATH = "/common/covidcast_nowcast/receiving/" +SUCCESS_DIR = "archive/successful" +FAIL_DIR = "archive/failed" +TABLE_NAME = "covidcast_nowcast" +DB_NAME = "epidata" +CSV_DTYPES = {"sensor_name": str, "geo_value": str, "value": float} + + +def main(csv_path: str = SENSOR_CSV_PATH) -> None: + """ + Parse all files in a given directory and insert them into the sensor table in the database. + + For all the files found recursively in csv_path that match the naming scheme specified by + CsvImporter.find_csv_files(), attempt to load and insert them into the database. Files which do + not match the naming scheme will be moved to an archive/failed folder and skipped, and files + which raise an error during loading/uploading will be moved to the archive/failed folder and + have the error raised. + + Parameters + ---------- + csv_path + Path to folder containing files to load. + + Returns + ------- + None. + """ + user, pw = secrets.db.epi + engine = sqlalchemy.create_engine(f"mysql+pymysql://{user}:{pw}@{secrets.db.host}/{DB_NAME}") + for filepath, attribute in CsvImporter.find_issue_specific_csv_files(csv_path): + if attribute is None: + _move_after_processing(filepath, success=False) + continue + try: + data = load_and_prepare_file(filepath, attribute) + with engine.connect() as conn: + method = _create_upsert_method(sqlalchemy.MetaData(conn)) + data.to_sql(TABLE_NAME, engine, if_exists="append", method=method, index=False) + except Exception: + _move_after_processing(filepath, success=False) + raise + _move_after_processing(filepath, success=True) + + +def load_and_prepare_file(filepath: str, attributes: PathDetails) -> pd.DataFrame: + """ + Read CSV file into a DataFrame and add relevant attributes as new columns to match DB table. + + Parameters + ---------- + filepath + Path to CSV file. + attributes + (source, signal, time_type, geo_type, time_value, issue, lag) tuple + returned by CsvImport.find_csv_files + + Returns + ------- + DataFrame with additional attributes added as columns based on filename and current date. + """ + data = pd.read_csv(filepath, dtype=CSV_DTYPES) + data["source"] = attributes.source + data["signal"] = attributes.signal + data["time_type"] = attributes.time_type + data["geo_type"] = attributes.geo_type + data["time_value"] = attributes.time_value + data["issue"] = attributes.issue + data["lag"] = attributes.lag + data["value_updated_timestamp"] = int(time.time()) + return data + + +def _move_after_processing(filepath, success): + archive_dir = SUCCESS_DIR if success else FAIL_DIR + new_dir = os.path.dirname(filepath).replace("receiving", archive_dir) + os.makedirs(new_dir, exist_ok=True) + move(filepath, filepath.replace("receiving", archive_dir)) + get_structured_logger("covidcast_nowcast_load_sensors").info(f"{filepath} moved to {archive_dir}") + + +def _create_upsert_method(meta): + def method(table, conn, keys, data_iter): + sql_table = sqlalchemy.Table( + table.name, + meta, + # specify lag column explicitly; lag is a reserved word sqlalchemy doesn't know about + sqlalchemy.Column("lag", sqlalchemy.Integer, quote=True), + autoload=True, + ) + insert_stmt = sqlalchemy.dialects.mysql.insert(sql_table).values( + [dict(zip(keys, data)) for data in data_iter] + ) + upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted}) + conn.execute(upsert_stmt) + + return method + + +if __name__ == "__main__": + main() diff --git a/src/acquisition/ecdc/ecdc_db_update.py b/src/acquisition/ecdc/ecdc_db_update.py index 84423c376..10ee751c4 100644 --- a/src/acquisition/ecdc/ecdc_db_update.py +++ b/src/acquisition/ecdc/ecdc_db_update.py @@ -14,7 +14,7 @@ | Field | Type | Null | Key | Default | Extra | +----------------+-------------+------+-----+---------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | -| release_date | date | NO | MUL | NULL | | +| release_date | date | NO | MUL | NULL | | | issue | int(11) | NO | MUL | NULL | | | epiweek | int(11) | NO | MUL | NULL | | | region | varchar(12) | NO | MUL | NULL | | @@ -44,6 +44,10 @@ from delphi.epidata.acquisition.ecdc.ecdc_ili import download_ecdc_data from delphi.utils.epiweek import delta_epiweeks from delphi.utils.epidate import EpiDate +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("dcdc_db_update") def ensure_tables_exist(): @@ -100,7 +104,7 @@ def update_from_file(issue, date, dir, test_mode=False): u, p = secrets.db.epi cnx = mysql.connector.connect(user=u, password=p, database="epidata") rows1 = get_rows(cnx, "ecdc_ili") - print(f"rows before: {int(rows1)}") + logger.info(f"rows before: {int(rows1)}") insert = cnx.cursor() # load the data, ignoring empty rows @@ -115,9 +119,9 @@ def update_from_file(issue, date, dir, test_mode=False): row["region"] = data[4] row["incidence_rate"] = data[3] rows.append(row) - print(f" loaded {len(rows)} rows") + logger.info(f" loaded {len(rows)} rows") entries = [obj for obj in rows if obj] - print(f" found {len(entries)} entries") + logger.info(f" found {len(entries)} entries") sql = """ INSERT INTO @@ -144,12 +148,12 @@ def update_from_file(issue, date, dir, test_mode=False): # cleanup insert.close() if test_mode: - print("test mode, not committing") + logger.info("test mode, not committing") rows2 = rows1 else: cnx.commit() rows2 = get_rows(cnx) - print(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") + logger.info(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") cnx.close() @@ -179,7 +183,7 @@ def main(): raise Exception("--file and --issue must both be present or absent") date = datetime.datetime.now().strftime("%Y-%m-%d") - print(f"assuming release date is today, {date}") + logger.info(f"assuming release date is today, {date}") ensure_tables_exist() if args.file: @@ -209,7 +213,7 @@ def main(): if not db_error: break # Exit loop with success if flag >= max_tries: - print("WARNING: Database `ecdc_ili` did not update successfully") + logger.warning("Database `ecdc_ili` did not update successfully") if __name__ == "__main__": diff --git a/src/acquisition/ecdc/ecdc_ili.py b/src/acquisition/ecdc/ecdc_ili.py index dca9b51ae..ba8997c9d 100644 --- a/src/acquisition/ecdc/ecdc_ili.py +++ b/src/acquisition/ecdc/ecdc_ili.py @@ -14,6 +14,7 @@ from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC +from delphi.epidata.common.logger import get_structured_logger def download_ecdc_data(download_dir="downloads"): @@ -77,7 +78,7 @@ def download_ecdc_data(download_dir="downloads"): except: driver.get(url) except: - print("WARNING: ECDC Scraper may not have downloaded all of the available data.") + get_structured_logger("ecdc_ili").warning("WARNING: ECDC Scraper may not have downloaded all of the available data.") # cleanup os.system("""pkill "firefox" """) os.system('''pkill "(firefox-bin)"''') diff --git a/src/acquisition/flusurv/flusurv.py b/src/acquisition/flusurv/flusurv.py index 28105d933..aa362fa01 100644 --- a/src/acquisition/flusurv/flusurv.py +++ b/src/acquisition/flusurv/flusurv.py @@ -45,6 +45,11 @@ # first party from delphi.utils.epidate import EpiDate +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("flusurv") + # all currently available FluSurv locations and their associated codes @@ -106,7 +111,7 @@ def fetch_json(path, payload, call_count=1, requests_impl=requests): if resp.status_code == 500 and call_count <= 2: # the server often fails with this status, so wait and retry delay = 10 * call_count - print(f"got status {int(resp.status_code)}, will retry in {int(delay)} sec...") + logger.info(f"got status {int(resp.status_code)}, will retry in {int(delay)} sec...") time.sleep(delay) return fetch_json(path, payload, call_count=call_count + 1) elif resp.status_code != 200: @@ -173,14 +178,14 @@ def extract_from_object(data_in): elif prev_rate != rate: # a different rate was already found for this epiweek/age format_args = (epiweek, obj["age"], prev_rate, rate) - print("warning: %d %d %f != %f" % format_args) + logger.warning("warning: %d %d %f != %f" % format_args) # sanity check the result if len(data_out) == 0: raise Exception("no data found") # print the result and return flu data - print(f"found data for {len(data_out)} weeks") + logger.info(f"found data for {len(data_out)} weeks") return data_out @@ -194,15 +199,15 @@ def get_data(location_code): """ # fetch - print("[fetching flusurv data...]") + logger.info("[fetching flusurv data...]") data_in = fetch_flusurv_object(location_code) # extract - print("[extracting values...]") + logger.info("[extracting values...]") data_out = extract_from_object(data_in) # return - print("[scraped successfully]") + logger.info("[scraped successfully]") return data_out diff --git a/src/acquisition/flusurv/flusurv_update.py b/src/acquisition/flusurv/flusurv_update.py index 1aa8e9885..ae82250fd 100644 --- a/src/acquisition/flusurv/flusurv_update.py +++ b/src/acquisition/flusurv/flusurv_update.py @@ -79,6 +79,10 @@ import delphi.operations.secrets as secrets from delphi.utils.epidate import EpiDate from delphi.utils.epiweek import delta_epiweeks +from delphi.epidata.common.logger import get_structured_logger + +logger = get_structured_logger("flusurv_update") + def get_rows(cur): @@ -95,7 +99,7 @@ def update(issue, location_name, test_mode=False): # fetch data location_code = flusurv.location_codes[location_name] - print("fetching data for", location_name, location_code) + logger.info("fetching data for", location_name, location_code) data = flusurv.get_data(location_code) # metadata @@ -108,7 +112,7 @@ def update(issue, location_name, test_mode=False): cnx = mysql.connector.connect(host=secrets.db.host, user=u, password=p, database="epidata") cur = cnx.cursor() rows1 = get_rows(cur) - print(f"rows before: {int(rows1)}") + logger.info(f"rows before: {int(rows1)}") # SQL for insert/update sql = """ @@ -148,10 +152,10 @@ def update(issue, location_name, test_mode=False): # commit and disconnect rows2 = get_rows(cur) - print(f"rows after: {int(rows2)} (+{int(rows2 - rows1)})") + logger.info(f"rows after: {int(rows2)} (+{int(rows2 - rows1)})") cur.close() if test_mode: - print("test mode: not committing database changes") + logger.info("test mode: not committing database changes") else: cnx.commit() cnx.close() @@ -177,7 +181,7 @@ def main(): # scrape current issue from the main page issue = flusurv.get_current_issue() - print(f"current issue: {int(issue)}") + logger.info(f"current issue: {int(issue)}") # fetch flusurv data if args.location == "all": diff --git a/src/acquisition/fluview/fluview.py b/src/acquisition/fluview/fluview.py index 9b4e6f537..c14f1df65 100644 --- a/src/acquisition/fluview/fluview.py +++ b/src/acquisition/fluview/fluview.py @@ -4,11 +4,11 @@ =============== Fetches ILINet data (surveillance of outpatient influenza-like illness) from -CDC. +CDC. -This script provides functions for first fetching metadata from Fluview which -are then used to build a request that will get all data for the different tier -types (national, hhs regions, census divisions and states). This data is +This script provides functions for first fetching metadata from Fluview which +are then used to build a request that will get all data for the different tier +types (national, hhs regions, census divisions and states). This data is downloaded as one zip file per tier type (locally). This file replaces scrape_flu_data.sh, which performed a similar function for @@ -21,7 +21,7 @@ Changes: - 10/03/18: added field for 'WHO_NREVSS' data to download data from clinical - labs as well as public health labs. + labs as well as public health labs. """ # standard library @@ -32,6 +32,12 @@ # third party import requests +# first party +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("fluview") + class Key: """ @@ -177,11 +183,11 @@ def save_latest(path=None): ) # get metatdata - print("looking up ilinet metadata") + logger.info("looking up ilinet metadata") data = fetch_metadata(sess) info = get_issue_and_locations(data) issue = info["epiweek"] - print(f"current issue: {int(issue)}") + logger.info(f"current issue: {int(issue)}") # establish timing dt = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") @@ -209,12 +215,12 @@ def save_latest(path=None): locations = info["location_ids"][cdc_name] # download and show timing information - print(f"downloading {delphi_name}") + logger.info(f"downloading {delphi_name}") t0 = time.time() size = download_data(tier_id, locations, seasons, filename) t1 = time.time() - print(f" saved {filename} ({int(size)} bytes in {t1 - t0:.1f} seconds)") + logger.info(f" saved {filename} ({int(size)} bytes in {t1 - t0:.1f} seconds)") files.append(filename) # return the current issue and the list of downloaded files diff --git a/src/acquisition/fluview/fluview_notify.py b/src/acquisition/fluview/fluview_notify.py index 3ed1a243f..e3a764953 100644 --- a/src/acquisition/fluview/fluview_notify.py +++ b/src/acquisition/fluview/fluview_notify.py @@ -29,6 +29,11 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("fluview_notify") + if __name__ == "__main__": @@ -55,17 +60,17 @@ ) for (issue1,) in cur: issue1 = int(issue1) - print("last known issue:", issue1) + logger.info("last known issue: {issue1}") # get the most recent issue from the epidata table `fluview` cur.execute("SELECT max(`issue`) FROM `fluview`") for (issue2,) in cur: issue2 = int(issue2) - print("most recent issue:", issue2) + logger.info("most recent issue: {issue2}") if issue2 > issue1: - print("new data is available!") + logger.info("new data is available!") if args.test: - print("test mode - not making any changes") + logger.info("test mode - not making any changes") else: # update the variable cur.execute( diff --git a/src/acquisition/fluview/fluview_update.py b/src/acquisition/fluview/fluview_update.py index defd01dad..998e2890e 100644 --- a/src/acquisition/fluview/fluview_update.py +++ b/src/acquisition/fluview/fluview_update.py @@ -128,6 +128,11 @@ from delphi.utils.epiweek import delta_epiweeks, join_epiweek from . import fluview from . import fluview_locations +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("fluview_update") + # sheet names ILINET_SHEET = "ILINet.csv" @@ -313,21 +318,21 @@ def update_from_file_clinical(issue, date, filename, test_mode=False): u, p = secrets.db.epi cnx = mysql.connector.connect(user=u, password=p, database="epidata", host=secrets.db.host) rows1 = get_rows(cnx, CL_TABLE) - print(f"rows before: {int(rows1)}") + logger.info(f"rows before: {int(rows1)}") insert = cnx.cursor() # load the data, ignoring empty rows - print(f"loading data from {filename} as issued on {int(issue)}") + logger.info(f"loading data from {filename} as issued on {int(issue)}") rows = load_zipped_csv(filename, CL_SHEET) - print(f" loaded {len(rows)} rows") + logger.info(f" loaded {len(rows)} rows") data = [get_clinical_data(row) for row in rows] entries = [obj for obj in data if obj] - print(f" found {len(entries)} entries") + logger.info(f" found {len(entries)} entries") sql = """ INSERT INTO - `fluview_clinical` (`release_date`, `issue`, `epiweek`, `region`, `lag`, - `total_specimens`, `total_a`, `total_b`, `percent_positive`, `percent_a`, + `fluview_clinical` (`release_date`, `issue`, `epiweek`, `region`, `lag`, + `total_specimens`, `total_a`, `total_b`, `percent_positive`, `percent_a`, `percent_b`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) @@ -360,12 +365,12 @@ def update_from_file_clinical(issue, date, filename, test_mode=False): # cleanup insert.close() if test_mode: - print("test mode, not committing") + logger.info("test mode, not committing") rows2 = rows1 else: cnx.commit() rows2 = get_rows(cnx, CL_TABLE) - print(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") + logger.info(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") cnx.close() @@ -378,16 +383,16 @@ def update_from_file_public(issue, date, filename, test_mode=False): u, p = secrets.db.epi cnx = mysql.connector.connect(user=u, password=p, database="epidata", host=secrets.db.host) rows1 = get_rows(cnx, PHL_TABLE) - print(f"rows before: {int(rows1)}") + logger.info(f"rows before: {int(rows1)}") insert = cnx.cursor() # load the data, ignoring empty rows - print(f"loading data from {filename} as issued on {int(issue)}") + logger.info(f"loading data from {filename} as issued on {int(issue)}") rows = load_zipped_csv(filename, PHL_SHEET) - print(f" loaded {len(rows)} rows") + logger.info(f" loaded {len(rows)} rows") data = [get_public_data(row) for row in rows] entries = [obj for obj in data if obj] - print(f" found {len(entries)} entries") + logger.info(f" found {len(entries)} entries") sql = """ INSERT INTO @@ -429,12 +434,12 @@ def update_from_file_public(issue, date, filename, test_mode=False): # cleanup insert.close() if test_mode: - print("test mode, not committing") + logger.info("test mode, not committing") rows2 = rows1 else: cnx.commit() rows2 = get_rows(cnx, PHL_TABLE) - print(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") + logger.info(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") cnx.close() @@ -447,16 +452,16 @@ def update_from_file(issue, date, filename, test_mode=False): u, p = secrets.db.epi cnx = mysql.connector.connect(user=u, password=p, database="epidata", host=secrets.db.host) rows1 = get_rows(cnx) - print(f"rows before: {int(rows1)}") + logger.info(f"rows before: {int(rows1)}") insert = cnx.cursor() # load the data, ignoring empty rows - print(f"loading data from {filename} as issued on {int(issue)}") + logger.info(f"loading data from {filename} as issued on {int(issue)}") rows = load_zipped_csv(filename) - print(f" loaded {len(rows)} rows") + logger.info(f" loaded {len(rows)} rows") data = [get_ilinet_data(row) for row in rows] entries = [obj for obj in data if obj] - print(f" found {len(entries)} entries") + logger.info(f" found {len(entries)} entries") sql = """ INSERT INTO @@ -504,12 +509,12 @@ def update_from_file(issue, date, filename, test_mode=False): # cleanup insert.close() if test_mode: - print("test mode, not committing") + logger.info("test mode, not committing") rows2 = rows1 else: cnx.commit() rows2 = get_rows(cnx) - print(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") + logger.info(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") cnx.close() @@ -539,7 +544,7 @@ def main(): raise Exception("--file and --issue must both be present or absent") date = datetime.datetime.now().strftime("%Y-%m-%d") - print(f"assuming release date is today, {date}") + logger.info(f"assuming release date is today, {date}") if args.file: update_from_file(args.issue, date, args.file, test_mode=args.test) diff --git a/src/acquisition/fluview/impute_missing_values.py b/src/acquisition/fluview/impute_missing_values.py index c795d9cce..4bf991dd0 100644 --- a/src/acquisition/fluview/impute_missing_values.py +++ b/src/acquisition/fluview/impute_missing_values.py @@ -56,6 +56,10 @@ import delphi.operations.secrets as secrets from delphi.utils.epiweek import delta_epiweeks from delphi.utils.geo.locations import Locations +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("fluview_impute_missing_values") class Database: @@ -147,7 +151,7 @@ def close(self, commit): if commit: self.cnx.commit() else: - print("test mode, not committing") + logger.info("test mode, not committing") self.cnx.close() def count_rows(self): @@ -270,13 +274,13 @@ def impute_missing_values(database, test_mode=False): # database connection database.connect() rows1 = database.count_rows() - print(f"rows before: {int(rows1)}") + logger.info(f"rows before: {int(rows1)}") # iterate over missing epiweeks missing_rows = database.find_missing_rows() - print(f"missing data for {len(missing_rows)} epiweeks") + logger.info(f"missing data for {len(missing_rows)} epiweeks") for issue, epiweek in missing_rows: - print(f"i={int(issue)} e={int(epiweek)}") + logger.info(f"i={int(issue)} e={int(epiweek)}") # get known values from table `fluview` known_values = database.get_known_values(issue, epiweek) @@ -310,14 +314,14 @@ def impute_missing_values(database, test_mode=False): n_ili, n_pat, n_prov = map(int, np.rint(values)) lag, ili = get_lag_and_ili(issue, epiweek, n_ili, n_pat) imputed_values[loc] = (lag, n_ili, n_pat, n_prov, ili) - print(f" {loc}: {str(imputed_values[loc])}") + logger.info(f" {loc}: {str(imputed_values[loc])}") # save all imputed values in table `fluview_imputed` database.add_imputed_values(issue, epiweek, imputed_values) # database cleanup rows2 = database.count_rows() - print(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") + logger.info(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") commit = not test_mode database.close(commit) diff --git a/src/acquisition/ght/ght_update.py b/src/acquisition/ght/ght_update.py index 9e8d48d1d..40910b0c4 100644 --- a/src/acquisition/ght/ght_update.py +++ b/src/acquisition/ght/ght_update.py @@ -71,13 +71,16 @@ # third party import mysql.connector -from apiclient.discovery import build # first party from .google_health_trends import GHT from .google_health_trends import NO_LOCATION_STR import delphi.operations.secrets as secrets import delphi.utils.epiweek as flu +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("ght_update") # secret key for accessing Google's health trends APIs @@ -266,7 +269,7 @@ def get_num_rows(): ew0 = 200401 if ew0 is None else flu.add_epiweeks(ew0, -4) ew0 = ew0 if first is None else first ew1 = ew1 if last is None else last - print(f"Checking epiweeks between {int(ew0)} and {int(ew1)}...") + logger.info(f"Checking epiweeks between {int(ew0)} and {int(ew1)}...") # keep track of how many rows were added rows_before = get_num_rows() @@ -283,7 +286,7 @@ def get_num_rows(): total_rows = 0 ght = GHT(API_KEY) for term in terms: - print(f" [{term}] using term") + logger.info(f" [{term}] using term") ll, cl = len(locations), len(countries) for i in range(max(ll, cl)): location = locations[i] if i < ll else locations[0] @@ -302,9 +305,9 @@ def get_num_rows(): raise ex else: delay = 2**attempt - print( + logger.error( f" [{term}|{location}] caught exception (will retry in {int(delay)}s):", - ex, + exception=ex, ) time.sleep(delay) values = [p["value"] for p in result["data"]["lines"][0]["points"]] @@ -330,13 +333,13 @@ def get_num_rows(): # print(' [%s|%s|%d] missing value' % (term, location, ew)) ew = flu.add_epiweeks(ew, 1) if num_missing > 0: - print(f" [{term}|{location}] missing {int(num_missing)}/{len(values)} value(s)") + logger.info(f" [{term}|{location}] missing {int(num_missing)}/{len(values)} value(s)") except Exception as ex: - print(f" [{term}|{location}] caught exception (will NOT retry):", ex) + logger.error(f" [{term}|{location}] caught exception (will NOT retry):", critical=ex) # keep track of how many rows were added rows_after = get_num_rows() - print(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)") + logger.info(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)") # cleanup cur.close() diff --git a/src/acquisition/ght/google_health_trends.py b/src/acquisition/ght/google_health_trends.py index 4bb8df25f..58ea29e6d 100644 --- a/src/acquisition/ght/google_health_trends.py +++ b/src/acquisition/ght/google_health_trends.py @@ -30,6 +30,8 @@ # first party from delphi.utils.epidate import EpiDate import delphi.utils.epiweek as flu +from delphi.epidata.common.logger import get_structured_logger + NO_LOCATION_STR = "none" @@ -162,12 +164,13 @@ def main(): expected_weeks = result["num_weeks"] received_weeks = len([v for v in values if v is not None and type(v) == float and v >= 0]) if expected_weeks != received_weeks: + get_structured_logger("google_health_trends").error(f"expected {int(expected_weeks)} weeks, received {int(received_weeks)}") raise Exception(f"expected {int(expected_weeks)} weeks, received {int(received_weeks)}") # results epiweeks = [ew for ew in flu.range_epiweeks(args.startweek, args.endweek, inclusive=True)] for (epiweek, value) in zip(epiweeks, values): - print(f"{int(epiweek):6}: {value:.3f}") + get_structured_logger("google_health_trends").info(f"{int(epiweek):6}: {value:.3f}") if __name__ == "__main__": diff --git a/src/acquisition/kcdc/kcdc_update.py b/src/acquisition/kcdc/kcdc_update.py index 713b21f00..c327fcbd6 100644 --- a/src/acquisition/kcdc/kcdc_update.py +++ b/src/acquisition/kcdc/kcdc_update.py @@ -14,7 +14,7 @@ | Field | Type | Null | Key | Default | Extra | +--------------+-------------+------+-----+---------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | -| release_date | date | NO | MUL | NULL | | +| release_date | date | NO | MUL | NULL | | | issue | int(11) | NO | MUL | NULL | | | epiweek | int(11) | NO | MUL | NULL | | | region | varchar(12) | NO | MUL | NULL | | @@ -41,6 +41,9 @@ import delphi.operations.secrets as secrets from delphi.utils.epiweek import delta_epiweeks, range_epiweeks, add_epiweeks from delphi.utils.epidate import EpiDate +from delphi.epidata.common.logger import get_structured_logger + +logger = get_structured_logger("kcdc_update") def ensure_tables_exist(): @@ -126,7 +129,7 @@ def update_from_data(ews, ilis, date, issue, test_mode=False): u, p = secrets.db.epi cnx = mysql.connector.connect(user=u, password=p, database="epidata") rows1 = get_rows(cnx) - print(f"rows before: {int(rows1)}") + logger.info(f"rows before: {int(rows1)}") insert = cnx.cursor() sql = """ @@ -155,12 +158,12 @@ def update_from_data(ews, ilis, date, issue, test_mode=False): # cleanup insert.close() if test_mode: - print("test mode, not committing") + logger.info("test mode, not committing") rows2 = rows1 else: cnx.commit() rows2 = get_rows(cnx) - print(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") + logger.info(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") cnx.close() @@ -173,7 +176,7 @@ def main(): args = parser.parse_args() date = datetime.datetime.now().strftime("%Y-%m-%d") - print(f"assuming release date is today, {date}") + logger.info(f"assuming release date is today, {date}") issue = EpiDate.today().get_ew() ensure_tables_exist() diff --git a/src/acquisition/nidss/taiwan_nidss.py b/src/acquisition/nidss/taiwan_nidss.py index b2e369e63..8b9fb0505 100644 --- a/src/acquisition/nidss/taiwan_nidss.py +++ b/src/acquisition/nidss/taiwan_nidss.py @@ -34,6 +34,9 @@ # first party from delphi.utils.epiweek import range_epiweeks, add_epiweeks, check_epiweek +from delphi.epidata.common.logger import get_structured_logger + +logger = get_structured_logger("taiwan_nidss") class NIDSS: @@ -251,19 +254,19 @@ def main(): latest_week, release_date, fdata = NIDSS.get_flu_data() ddata = NIDSS.get_dengue_data(ew, ew) - # Print the results - print("*** Meta ***") - print("latest_week:", latest_week) - print("release_date:", release_date) - print("*** Flu ***") + # Log the results + logger.info("*** Meta ***") + logger.info("latest_week:", latest_week) + logger.info("release_date:", release_date) + logger.info("*** Flu ***") for region in sorted(list(fdata[ew].keys())): visits, ili = fdata[ew][region]["visits"], fdata[ew][region]["ili"] - print(f"region={region} | visits={int(visits)} | ili={ili:.3f}") - print("*** Dengue ***") + logger.info(f"region={region} | visits={int(visits)} | ili={ili:.3f}") + logger.info("*** Dengue ***") for location in sorted(list(ddata[ew].keys())): region = NIDSS.LOCATION_TO_REGION[location] count = ddata[ew][location] - print(f"location={location} | region={region} | count={int(count)}") + logger.info(f"location={location} | region={region} | count={int(count)}") if __name__ == "__main__": diff --git a/src/acquisition/nidss/taiwan_update.py b/src/acquisition/nidss/taiwan_update.py index 30d458481..c139a5a93 100644 --- a/src/acquisition/nidss/taiwan_update.py +++ b/src/acquisition/nidss/taiwan_update.py @@ -83,6 +83,10 @@ from .taiwan_nidss import NIDSS import delphi.operations.secrets as secrets from delphi.utils.epiweek import * +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("taiwan_update") # Get a row count just to know how many new rows are inserted @@ -101,14 +105,14 @@ def get_rows(cnx): def update(test_mode=False): # test mode if test_mode: - print("test mode enabled: changes will not be saved") + logger.info("test mode enabled: changes will not be saved") # Database connection u, p = secrets.db.epi cnx = mysql.connector.connect(user=u, password=p, database="epidata") rows1 = get_rows(cnx) - print(f"rows before (flu): {int(rows1[0])}") - print(f"rows before (dengue): {int(rows1[1])}") + logger.info(f"rows before (flu): {int(rows1[0])}") + logger.info(f"rows before (dengue): {int(rows1[1])}") insert = cnx.cursor() sql_flu = """ INSERT INTO @@ -149,10 +153,10 @@ def update(test_mode=False): # Cleanup insert.close() rows2 = get_rows(cnx) - print(f"rows after (flu): {int(rows2[0])} (added {int(rows2[0] - rows1[0])})") - print(f"rows after (dengue): {int(rows2[1])} (added {int(rows2[1] - rows1[1])})") + logger.info(f"rows after (flu): {int(rows2[0])} (added {int(rows2[0] - rows1[0])})") + logger.info(f"rows after (dengue): {int(rows2[1])} (added {int(rows2[1] - rows1[1])})") if test_mode: - print("test mode: changes not commited") + logger.info("test mode: changes not commited") else: cnx.commit() cnx.close() diff --git a/src/acquisition/paho/paho_db_update.py b/src/acquisition/paho/paho_db_update.py index b351d3ff2..116da4cb9 100644 --- a/src/acquisition/paho/paho_db_update.py +++ b/src/acquisition/paho/paho_db_update.py @@ -15,7 +15,7 @@ | Field | Type | Null | Key | Default | Extra | +----------------+-------------+------+-----+---------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | -| release_date | date | NO | MUL | NULL | | +| release_date | date | NO | MUL | NULL | | | issue | int(11) | NO | MUL | NULL | | | epiweek | int(11) | NO | MUL | NULL | | | region | varchar(12) | NO | MUL | NULL | | @@ -62,6 +62,9 @@ from delphi.epidata.acquisition.paho.paho_download import get_paho_data from delphi.utils.epiweek import delta_epiweeks, check_epiweek from delphi.utils.epidate import EpiDate +from delphi.epidata.common.logger import get_structured_logger + +logger = get_structured_logger("paho_db_update") def ensure_tables_exist(): @@ -171,19 +174,19 @@ def update_from_file(issue, date, filename, test_mode=False): u, p = secrets.db.epi cnx = mysql.connector.connect(user=u, password=p, database="epidata") rows1 = get_rows(cnx, "paho_dengue") - print(f"rows before: {int(rows1)}") + logger.info(f"rows before: {int(rows1)}") insert = cnx.cursor() # load the data, ignoring empty rows - print(f"loading data from {filename} as issued on {int(issue)}") + logger.info(f"loading data from {filename} as issued on {int(issue)}") with open(filename, encoding="utf-8") as f: c = f.read() rows = [] for l in csv.reader(StringIO(c), delimiter=","): rows.append(get_paho_row(l)) - print(f" loaded {len(rows)} rows") + logger.info(f" loaded {len(rows)} rows") entries = [obj for obj in rows if obj] - print(f" found {len(entries)} entries") + logger.info(f" found {len(entries)} entries") sql = """ INSERT INTO @@ -222,12 +225,12 @@ def update_from_file(issue, date, filename, test_mode=False): # cleanup insert.close() if test_mode: - print("test mode, not committing") + logger.info("test mode, not committing") rows2 = rows1 else: cnx.commit() rows2 = get_rows(cnx) - print(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") + logger.info(f"rows after: {int(rows2)} (added {int(rows2 - rows1)})") cnx.close() @@ -257,7 +260,7 @@ def main(): raise Exception("--file and --issue must both be present or absent") date = datetime.datetime.now().strftime("%Y-%m-%d") - print(f"assuming release date is today, {date}") + logger.info(f"assuming release date is today, {date}") if args.file: update_from_file(args.issue, date, args.file, test_mode=args.test) @@ -292,7 +295,7 @@ def main(): if not db_error: break # Exit loop with success if flag >= max_tries: - print("WARNING: Database `paho_dengue` did not update successfully") + logger.warning("Database `paho_dengue` did not update successfully") if __name__ == "__main__": diff --git a/src/acquisition/paho/paho_download.py b/src/acquisition/paho/paho_download.py index c6fa70285..ea4715bde 100644 --- a/src/acquisition/paho/paho_download.py +++ b/src/acquisition/paho/paho_download.py @@ -11,6 +11,11 @@ from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException from selenium.webdriver.firefox.options import Options +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("paho_download") + headerheight = 0 @@ -23,9 +28,9 @@ def wait_for(browser, css_selector, delay=10): WebDriverWait(browser, delay).until( EC.element_to_be_clickable((By.CSS_SELECTOR, css_selector)) ) - print(f"Success Loading {css_selector}") + logger.info(f"Success Loading {css_selector}") except TimeoutException: - print(f"Loading {css_selector} took too much time!") + logger.info(f"Loading {css_selector} took too much time!") def find_and_click(browser, element): @@ -130,9 +135,9 @@ def get_paho_data(offset=0, dir="downloads"): # print gp.is_displayed() try: WebDriverWait(browser, 10).until(EC.staleness_of(gp)) - print(f"Loaded next week {int(53 - offset)}") + logger.info(f"Loaded next week {int(53 - offset)}") except TimeoutException: - print(f"Loading next week {int(53 - offset)} took too much time!") + logger.error(f"Loading next week {int(53 - offset)} took too much time!") gp = browser.find_element_by_css_selector("div.wcGlassPane") # print gp.is_enabled() # print gp.is_selected() @@ -147,7 +152,7 @@ def get_paho_data(offset=0, dir="downloads"): for i in range(54 - offset): # If something goes wrong for whatever reason, try from the beginning try: - print(f"Loading week {int(53 - i)}") + logger.info(f"Loading week {int(53 - i)}") # (Re-)load URL browser.switch_to.window(tab2) browser.get(dataurl) @@ -182,7 +187,7 @@ def get_paho_data(offset=0, dir="downloads"): find_and_click(browser, x) curr_offset += 1 except Exception as e: - print(f"Got exception {e}\nTrying again from week {int(53 - offset)}") + logger.error(f"Got exception {e}\nTrying again from week {int(53 - offset)}") browser.quit() get_paho_data(offset=curr_offset) browser.quit() diff --git a/src/acquisition/quidel/quidel.py b/src/acquisition/quidel/quidel.py index 0540d5e7c..bb3b5d0a0 100644 --- a/src/acquisition/quidel/quidel.py +++ b/src/acquisition/quidel/quidel.py @@ -34,6 +34,7 @@ import delphi.operations.secrets as secrets import delphi.utils.epidate as ED from delphi.utils.geo.locations import Locations +from delphi.epidata.common.logger import get_structured_logger def word_map(row, terms): @@ -198,7 +199,7 @@ def prepare_csv(self): if date_items: end_date = "-".join(date_items[-1].split("-")[x] for x in [2, 0, 1]) else: - print("End date not found in file name:" + f) + get_structured_logger("quidel").info("End date not found in file name:" + f) end_date = None df_dict = pd.read_excel(join(self.excel_uptodate_path, f + ".xlsx"), sheet_name=None) diff --git a/src/acquisition/quidel/quidel_update.py b/src/acquisition/quidel/quidel_update.py index 563cea898..b0a32fc7b 100644 --- a/src/acquisition/quidel/quidel_update.py +++ b/src/acquisition/quidel/quidel_update.py @@ -44,9 +44,13 @@ # first party from delphi.epidata.acquisition.quidel import quidel import delphi.operations.secrets as secrets -from delphi.utils.epidate import EpiDate import delphi.utils.epiweek as flu from delphi.utils.geo.locations import Locations +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("quidel_update") + LOCATIONS = Locations.hhs_list DATAPATH = "/home/automation/quidel_data" @@ -56,7 +60,7 @@ def update(locations, first=None, last=None, force_update=False, load_email=True # download and prepare data first qd = quidel.QuidelData(DATAPATH, load_email) if not qd.need_update and not force_update: - print("Data not updated, nothing needs change.") + logger.info("Data not updated, nothing needs change.") return qd_data = qd.load_csv() @@ -79,7 +83,7 @@ def get_num_rows(): ew0 = 200401 if ew0 is None else flu.add_epiweeks(ew0, -4) ew0 = ew0 if first is None else first ew1 = ew1 if last is None else last - print(f"Checking epiweeks between {int(ew0)} and {int(ew1)}...") + logger.info(f"Checking epiweeks between {int(ew0)} and {int(ew1)}...") # keep track of how many rows were added rows_before = get_num_rows() @@ -109,11 +113,11 @@ def get_num_rows(): if v == 0: num_missing += 1 if num_missing > 0: - print(f" [{location}] missing {int(num_missing)}/{len(ews)} value(s)") + logger.info(f" [{location}] missing {int(num_missing)}/{len(ews)} value(s)") # keep track of how many rows were added rows_after = get_num_rows() - print(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)") + logger.info(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)") # cleanup cur.close() diff --git a/src/acquisition/twtr/healthtweets.py b/src/acquisition/twtr/healthtweets.py index c1e345162..63f1cca0b 100644 --- a/src/acquisition/twtr/healthtweets.py +++ b/src/acquisition/twtr/healthtweets.py @@ -24,7 +24,7 @@ # standard library import argparse -from datetime import datetime, timedelta +from datetime import datetime import json # third party` @@ -32,6 +32,10 @@ # first party from .pageparser import PageParser +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("healthtweets") class HealthTweets: @@ -104,7 +108,7 @@ def __init__(self, username, password, debug=False): response = self._go("https://www.healthtweets.org/accounts/login") token = self._get_token(response.text) if self.debug: - print(f"token={token}") + logger.info(f"token={token}") data = { "csrfmiddlewaretoken": token, "username": username, @@ -135,7 +139,7 @@ def get_values(self, state, date1, date2): "num": round(raw_values[date]), "total": round(100 * raw_values[date] / normalized_values[date]), } - print(date, raw_values[date], normalized_values[date]) + logger.debug(f"get_values -> date: {date}, raw_values: {raw_values[date]}, normalized_values: {normalized_values[date]}") return values def _get_values(self, state, date1, date2, normalized): @@ -197,7 +201,7 @@ def _get_token(self, html): def _go(self, url, method=None, referer=None, data=None): if self.debug: - print(url) + logger.debug(f"url: {url}") if method is None: if data is None: method = self.session.get @@ -207,8 +211,8 @@ def _go(self, url, method=None, referer=None, data=None): html = response.text if self.debug: for item in response.history: - print(f" [{int(item.status_code)} to {item.headers['Location']}]") - print(f" {int(response.status_code)} ({len(html)} bytes)") + logger.debug(f" [{int(item.status_code)} to {item.headers['Location']}]") + logger.debug(f" {int(response.status_code)} ({len(html)} bytes)") return response @@ -249,7 +253,7 @@ def main(): ) parser.add_argument( "-d", - "--debug", + "--debug", action="store_const", const=True, default=False, @@ -260,9 +264,9 @@ def main(): ht = HealthTweets(args.username, args.password, debug=args.debug) values = ht.get_values(args.state, args.date1, args.date2) - print(f"Daily counts in {ht.check_state(args.state)} from {args.date1} to {args.date2}:") + logger.info(f"Daily counts in {ht.check_state(args.state)} from {args.date1} to {args.date2}:") for date in sorted(list(values.keys())): - print( + logger.info( "%s: num=%-4d total=%-5d (%.3f%%)" % ( date, diff --git a/src/acquisition/twtr/twitter_update.py b/src/acquisition/twtr/twitter_update.py index 80a023f19..e4a4c1547 100644 --- a/src/acquisition/twtr/twitter_update.py +++ b/src/acquisition/twtr/twitter_update.py @@ -1,114 +1,115 @@ -""" -=============== -=== Purpose === -=============== - -Wrapper for the entire twitter data collection process, using healthtweets.py. - -The program checks all U.S. states and DC (51) over the following inclusive -date range: - From: 7 days prior to the most recently stored date - To: 1 day prior to the actual date at runtime - -healthtweets.org sometimes behaves strangely when the date range is very short -(roughly spanning less than a week), so checking the extended range above -serves a dual purpose: - 1. get a successful and predictable response from the website - 2. update the database in case there have been delays (often?) or revisions - (never?) - - -======================= -=== Data Dictionary === -======================= - -`twitter` is the table where data from healthtweets.org is stored. -+-------+---------+------+-----+---------+----------------+ -| Field | Type | Null | Key | Default | Extra | -+-------+---------+------+-----+---------+----------------+ -| id | int(11) | NO | PRI | NULL | auto_increment | -| date | date | NO | MUL | NULL | | -| state | char(2) | NO | MUL | NULL | | -| num | int(11) | NO | | NULL | | -| total | int(11) | NO | | NULL | | -+-------+---------+------+-----+---------+----------------+ -id: unique identifier for each record -date: the date -state: two-letter U.S. state abbreviation -num: the number of flu tweets (numerator) -total: the total number of tweets (denominator) - - -================= -=== Changelog === -================= - -2017-02-16 - * Use secrets -2015-11-27 - * Small documentation update -2015-05-22 - * Original version -""" - -# third party -import mysql.connector - -# first party -from .healthtweets import HealthTweets -import delphi.operations.secrets as secrets - - -def run(): - # connect to the database - u, p = secrets.db.epi - cnx = mysql.connector.connect(user=u, password=p, database="epidata") - cur = cnx.cursor() - - def get_num_rows(): - cur.execute("SELECT count(1) `num` FROM `twitter`") - for (num,) in cur: - pass - return num - - # check from 7 days preceeding the last date with data through yesterday (healthtweets.org 404's if today's date is part of the range) - cur.execute( - "SELECT date_sub(max(`date`), INTERVAL 7 DAY) `date1`, date_sub(date(now()), INTERVAL 1 DAY) `date2` FROM `twitter`" - ) - for (date1, date2) in cur: - date1, date2 = date1.strftime("%Y-%m-%d"), date2.strftime("%Y-%m-%d") - print(f"Checking dates between {date1} and {date2}...") - - # keep track of how many rows were added - rows_before = get_num_rows() - - # check healthtweets.org for new and/or revised data - ht = HealthTweets(*secrets.healthtweets.login) - sql = "INSERT INTO `twitter` (`date`, `state`, `num`, `total`) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE `num` = %s, `total` = %s" - total_rows = 0 - for state in sorted(HealthTweets.STATE_CODES.keys()): - values = ht.get_values(state, date1, date2) - for date in sorted(list(values.keys())): - sql_data = ( - date, - state, - values[date]["num"], - values[date]["total"], - values[date]["num"], - values[date]["total"], - ) - cur.execute(sql, sql_data) - total_rows += 1 - - # keep track of how many rows were added - rows_after = get_num_rows() - print(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)") - - # cleanup - cur.close() - cnx.commit() - cnx.close() - - -if __name__ == "__main__": - run() +""" +=============== +=== Purpose === +=============== + +Wrapper for the entire twitter data collection process, using healthtweets.py. + +The program checks all U.S. states and DC (51) over the following inclusive +date range: + From: 7 days prior to the most recently stored date + To: 1 day prior to the actual date at runtime + +healthtweets.org sometimes behaves strangely when the date range is very short +(roughly spanning less than a week), so checking the extended range above +serves a dual purpose: + 1. get a successful and predictable response from the website + 2. update the database in case there have been delays (often?) or revisions + (never?) + + +======================= +=== Data Dictionary === +======================= + +`twitter` is the table where data from healthtweets.org is stored. ++-------+---------+------+-----+---------+----------------+ +| Field | Type | Null | Key | Default | Extra | ++-------+---------+------+-----+---------+----------------+ +| id | int(11) | NO | PRI | NULL | auto_increment | +| date | date | NO | MUL | NULL | | +| state | char(2) | NO | MUL | NULL | | +| num | int(11) | NO | | NULL | | +| total | int(11) | NO | | NULL | | ++-------+---------+------+-----+---------+----------------+ +id: unique identifier for each record +date: the date +state: two-letter U.S. state abbreviation +num: the number of flu tweets (numerator) +total: the total number of tweets (denominator) + + +================= +=== Changelog === +================= + +2017-02-16 + * Use secrets +2015-11-27 + * Small documentation update +2015-05-22 + * Original version +""" + +# third party +import mysql.connector + +# first party +from .healthtweets import HealthTweets +import delphi.operations.secrets as secrets +from delphi.epidata.common.logger import get_structured_logger + + +def run(): + # connect to the database + u, p = secrets.db.epi + cnx = mysql.connector.connect(user=u, password=p, database="epidata") + cur = cnx.cursor() + + def get_num_rows(): + cur.execute("SELECT count(1) `num` FROM `twitter`") + for (num,) in cur: + pass + return num + + # check from 7 days preceeding the last date with data through yesterday (healthtweets.org 404's if today's date is part of the range) + cur.execute( + "SELECT date_sub(max(`date`), INTERVAL 7 DAY) `date1`, date_sub(date(now()), INTERVAL 1 DAY) `date2` FROM `twitter`" + ) + for (date1, date2) in cur: + date1, date2 = date1.strftime("%Y-%m-%d"), date2.strftime("%Y-%m-%d") + get_structured_logger("twitter_update").info(f"Checking dates between {date1} and {date2}...") + + # keep track of how many rows were added + rows_before = get_num_rows() + + # check healthtweets.org for new and/or revised data + ht = HealthTweets(*secrets.healthtweets.login) + sql = "INSERT INTO `twitter` (`date`, `state`, `num`, `total`) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE `num` = %s, `total` = %s" + total_rows = 0 + for state in sorted(HealthTweets.STATE_CODES.keys()): + values = ht.get_values(state, date1, date2) + for date in sorted(list(values.keys())): + sql_data = ( + date, + state, + values[date]["num"], + values[date]["total"], + values[date]["num"], + values[date]["total"], + ) + cur.execute(sql, sql_data) + total_rows += 1 + + # keep track of how many rows were added + rows_after = get_num_rows() + get_structured_logger("twitter_update").info(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)") + + # cleanup + cur.close() + cnx.commit() + cnx.close() + + +if __name__ == "__main__": + run() diff --git a/src/acquisition/wiki/wiki_download.py b/src/acquisition/wiki/wiki_download.py index 6192eab02..8df3547b1 100644 --- a/src/acquisition/wiki/wiki_download.py +++ b/src/acquisition/wiki/wiki_download.py @@ -51,6 +51,12 @@ from . import wiki_util +# first party +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("wiki_download") + VERSION = 10 MASTER_URL = "https://delphi.cmu.edu/~automation/public/wiki/master.php" @@ -89,7 +95,7 @@ def extract_article_counts(filename, language, articles, debug_mode): for line in f: content = line.strip().split() if len(content) != 4: - print(f"unexpected article format: {line}") + logger.info(f"unexpected article format: {line}") continue article_title = content[1].lower() article_count = int(content[2]) @@ -97,10 +103,10 @@ def extract_article_counts(filename, language, articles, debug_mode): total += article_count if content[0] == language and article_title in articles_set: if debug_mode: - print(f"Find article {article_title}: {line}") + logger.info(f"Find article {article_title}: {line}") counts[article_title] = article_count if debug_mode: - print(f"Total number of counts for language {language} is {total}") + logger.debug(f"Total number of counts for language {language} is {total}") counts["total"] = total return counts @@ -119,7 +125,7 @@ def extract_article_counts_orig(articles, debug_mode): counts = {} for article in articles: if debug_mode: - print(f" {article}") + logger.debug(f" {article}") out = text( subprocess.check_output( f'LC_ALL=C grep -a -i "^en {article.lower()} " raw2 | cat', shell=True @@ -130,14 +136,14 @@ def extract_article_counts_orig(articles, debug_mode): for line in out.split("\n"): fields = line.split() if len(fields) != 4: - print(f"unexpected article format: [{line}]") + logger.info(f"unexpected article format: [{line}]") else: count += int(fields[2]) # print ' %4d %s'%(count, article) counts[article.lower()] = count if debug_mode: - print(f" {int(count)}") - print("getting total count...") + logger.debug(f" {int(count)}") + logger.info("getting total count...") out = text( subprocess.check_output( 'cat raw2 | LC_ALL=C grep -a -i "^en " | cut -d" " -f 3 | awk \'{s+=$1} END {printf "%.0f", s}\'', @@ -146,7 +152,7 @@ def extract_article_counts_orig(articles, debug_mode): ) total = int(out) if debug_mode: - print(total) + logger.debug(total) counts["total"] = total return counts @@ -154,9 +160,9 @@ def extract_article_counts_orig(articles, debug_mode): def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, debug_mode=False): worker = text(subprocess.check_output("echo `whoami`@`hostname`", shell=True)).strip() - print(f"this is [{worker}]") + logger.info(f"this is [{worker}]") if debug_mode: - print("*** running in debug mode ***") + logger.debug("*** running in debug mode ***") total_download = 0 passed_jobs = 0 @@ -170,12 +176,12 @@ def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, d code = req.getcode() if code != 200: if code == 201: - print("no jobs available") + logger.info("no jobs available") if download_limit is None and job_limit is None: time.sleep(60) continue else: - print("nothing to do, exiting") + logger.info("nothing to do, exiting") return else: raise Exception(f"server response code (get) was {int(code)}") @@ -185,15 +191,15 @@ def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, d else: job_content = text(req.readlines()[0]) if job_content == "no jobs": - print("no jobs available") + logger.info("no jobs available") if download_limit is None and job_limit is None: time.sleep(60) continue else: - print("nothing to do, exiting") + logger.info("nothing to do, exiting") return job = json.loads(job_content) - print(f"received job [{int(job['id'])}|{job['name']}]") + logger.info(f"received job [{int(job['id'])}|{job['name']}]") # updated parsing for pageviews - maybe use a regex in the future # year, month = int(job['name'][11:15]), int(job['name'][15:17]) year, month = int(job["name"][10:14]), int(job["name"][14:16]) @@ -202,30 +208,30 @@ def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, d "https://dumps.wikimedia.org/other/" f"pageviews/{year}/{year}-{month:02d}/{job['name']}" ) - print(f"downloading file [{url}]...") + logger.info(f"downloading file [{url}]...") subprocess.check_call(f"curl -s {url} > raw.gz", shell=True) - print("checking file size...") + logger.info("checking file size...") # Make the code cross-platfrom, so use python to get the size of the file # size = int(text(subprocess.check_output('ls -l raw.gz | cut -d" " -f 5', shell=True))) size = os.stat("raw.gz").st_size if debug_mode: - print(size) + logger.debug(f"size: {size}") total_download += size if job["hash"] != "00000000000000000000000000000000": - print("checking hash...") + logger.info("checking hash...") out = text(subprocess.check_output("md5sum raw.gz", shell=True)) result = out[0:32] if result != job["hash"]: raise Exception(f"wrong hash [expected {job['hash']}, got {result}]") if debug_mode: - print(result) - print("decompressing...") + logger.debug(f"result: {result}") + logger.info("decompressing...") subprocess.check_call("gunzip -f raw.gz", shell=True) # print 'converting case...' # subprocess.check_call('cat raw | tr "[:upper:]" "[:lower:]" > raw2', shell=True) # subprocess.check_call('rm raw', shell=True) subprocess.check_call("mv raw raw2", shell=True) - print("extracting article counts...") + logger.info("extracting article counts...") # Use python to read the file and extract counts, if you want to use the original shell method, please use counts = {} @@ -238,14 +244,14 @@ def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, d articles = lang2articles[language] articles = sorted(articles) if debug_mode: - print(f"Language is {language} and target articles are {articles}") + logger.debug(f"Language is {language} and target articles are {articles}") temp_counts = extract_article_counts("raw2", language, articles, debug_mode) counts[language] = temp_counts if not debug_mode: - print("deleting files...") + logger.info("deleting files...") subprocess.check_call("rm raw2", shell=True) - print("saving results...") + logger.info("saving results...") time_stop = datetime.datetime.now() result = { "id": job["id"], @@ -257,33 +263,33 @@ def run(secret, download_limit=None, job_limit=None, sleep_time=1, job_type=0, d payload = json.dumps(result) hmac_str = get_hmac_sha256(secret, payload) if debug_mode: - print(f" hmac: {hmac_str}") + logger.debug(f" hmac: {hmac_str}") post_data = urlencode({"put": payload, "hmac": hmac_str}) req = urlopen(MASTER_URL, data=data(post_data)) code = req.getcode() if code != 200: raise Exception(f"server response code (put) was {int(code)}") - print(f"done! (dl={int(total_download)})") + logger.info(f"done! (dl={int(total_download)})") passed_jobs += 1 except Exception as ex: - print(f"***** Caught Exception: {str(ex)} *****") + logger.error(f"***** Caught Exception: {str(ex)} *****", exception=ex) failed_jobs += 1 time.sleep(30) - print( + logger.info( "passed=%d | failed=%d | total=%d" % (passed_jobs, failed_jobs, passed_jobs + failed_jobs) ) time.sleep(sleep_time) if download_limit is not None and total_download >= download_limit: - print(f"download limit has been reached [{int(total_download)} >= {int(download_limit)}]") + logger.info(f"download limit has been reached [{int(total_download)} >= {int(download_limit)}]") if job_limit is not None and (passed_jobs + failed_jobs) >= job_limit: - print(f"job limit has been reached [{int(passed_jobs + failed_jobs)} >= {int(job_limit)}]") + logger.info(f"job limit has been reached [{int(passed_jobs + failed_jobs)} >= {int(job_limit)}]") def main(): # version info - print("version", VERSION) + logger.info("version", VERSION) # args and usage parser = argparse.ArgumentParser() diff --git a/src/acquisition/wiki/wiki_extract.py b/src/acquisition/wiki/wiki_extract.py index 718a64c20..8b4e0969c 100644 --- a/src/acquisition/wiki/wiki_extract.py +++ b/src/acquisition/wiki/wiki_extract.py @@ -1,130 +1,131 @@ -""" -=============== -=== Purpose === -=============== - -Extracts and stores article access counts - -See also: wiki.py - - -================= -=== Changelog === -================= - -2017-02-23 - * secrets and minor cleanup -2016-08-14 - * use pageviews instead of pagecounts-raw - * default job limit from 1000 to 100 -2015-08-11 - + Store total and other metadata in `wiki_meta` -2015-05-21 - * Original version -""" - -# standard library -from datetime import datetime, timedelta -import json - -# third party -import mysql.connector - -# first party -import delphi.operations.secrets as secrets - - -def floor_timestamp(timestamp): - return datetime(timestamp.year, timestamp.month, timestamp.day, timestamp.hour) - - -def ceil_timestamp(timestamp): - return floor_timestamp(timestamp) + timedelta(hours=1) - - -def round_timestamp(timestamp): - before = floor_timestamp(timestamp) - after = ceil_timestamp(timestamp) - if (timestamp - before) < (after - timestamp): - return before - else: - return after - - -def get_timestamp(name): - # new parsing for pageviews compared to pagecounts - maybe switch to regex in the future - # return datetime(int(name[11:15]), int(name[15:17]), int(name[17:19]), int(name[20:22]), int(name[22:24]), int(name[24:26])) - return datetime( - int(name[10:14]), - int(name[14:16]), - int(name[16:18]), - int(name[19:21]), - int(name[21:23]), - int(name[23:25]), - ) - - -def run(job_limit=100): - # connect to the database - u, p = secrets.db.epi - cnx = mysql.connector.connect(user=u, password=p, database="epidata") - cur = cnx.cursor() - - # # Some preparation for utf-8, and it is a temporary trick solution. The real solution should change those char set and collation encoding to utf8 permanently - # cur.execute("SET NAMES utf8;") - # cur.execute("SET CHARACTER SET utf8;") - # # I print SHOW SESSION VARIABLES LIKE 'character\_set\_%'; and SHOW SESSION VARIABLES LIKE 'collation\_%'; on my local computer - # cur.execute("SET character_set_client=utf8mb4;") - # cur.execute("SET character_set_connection=utf8mb4;") - # cur.execute("SET character_set_database=utf8;") - # cur.execute("SET character_set_results=utf8mb4;") - # cur.execute("SET character_set_server=utf8;") - # cur.execute("SET collation_connection=utf8mb4_general_ci;") - # cur.execute("SET collation_database=utf8_general_ci;") - # cur.execute("SET collation_server=utf8_general_ci;") - - # find jobs that are queued for extraction - cur.execute( - "SELECT `id`, `name`, `data` FROM `wiki_raw` WHERE `status` = 2 ORDER BY `name` ASC LIMIT %s", - (job_limit,), - ) - jobs = [] - for (id, name, data_str) in cur: - jobs.append((id, name, json.loads(data_str))) - print(f"Processing data from {len(jobs)} jobs") - - # get the counts from the json object and insert into (or update) the database - # Notice that data_collect contains data with different languages - for (id, name, data_collect) in jobs: - print(f"processing job [{int(id)}|{name}]...") - timestamp = round_timestamp(get_timestamp(name)) - for language in data_collect.keys(): - data = data_collect[language] - for article in sorted(data.keys()): - count = data[article] - cur.execute( - "INSERT INTO `wiki` (`datetime`, `article`, `count`, `language`) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE `count` = `count` + %s", - ( - str(timestamp), - article.encode("utf-8").decode("latin-1"), - count, - language, - count, - ), - ) - if article == "total": - cur.execute( - "INSERT INTO `wiki_meta` (`datetime`, `date`, `epiweek`, `total`, `language`) VALUES (%s, date(%s), yearweek(%s, 6), %s, %s) ON DUPLICATE KEY UPDATE `total` = `total` + %s", - (str(timestamp), str(timestamp), str(timestamp), count, language, count), - ) - # update the job - cur.execute("UPDATE `wiki_raw` SET `status` = 3 WHERE `id` = %s", (id,)) - - # cleanup - cur.close() - cnx.commit() - cnx.close() - - -if __name__ == "__main__": - run() +""" +=============== +=== Purpose === +=============== + +Extracts and stores article access counts + +See also: wiki.py + + +================= +=== Changelog === +================= + +2017-02-23 + * secrets and minor cleanup +2016-08-14 + * use pageviews instead of pagecounts-raw + * default job limit from 1000 to 100 +2015-08-11 + + Store total and other metadata in `wiki_meta` +2015-05-21 + * Original version +""" + +# standard library +from datetime import datetime, timedelta +import json + +# third party +import mysql.connector + +# first party +import delphi.operations.secrets as secrets +from delphi.epidata.common.logger import get_structured_logger + + +def floor_timestamp(timestamp): + return datetime(timestamp.year, timestamp.month, timestamp.day, timestamp.hour) + + +def ceil_timestamp(timestamp): + return floor_timestamp(timestamp) + timedelta(hours=1) + + +def round_timestamp(timestamp): + before = floor_timestamp(timestamp) + after = ceil_timestamp(timestamp) + if (timestamp - before) < (after - timestamp): + return before + else: + return after + + +def get_timestamp(name): + # new parsing for pageviews compared to pagecounts - maybe switch to regex in the future + # return datetime(int(name[11:15]), int(name[15:17]), int(name[17:19]), int(name[20:22]), int(name[22:24]), int(name[24:26])) + return datetime( + int(name[10:14]), + int(name[14:16]), + int(name[16:18]), + int(name[19:21]), + int(name[21:23]), + int(name[23:25]), + ) + + +def run(job_limit=100): + # connect to the database + u, p = secrets.db.epi + cnx = mysql.connector.connect(user=u, password=p, database="epidata") + cur = cnx.cursor() + + # # Some preparation for utf-8, and it is a temporary trick solution. The real solution should change those char set and collation encoding to utf8 permanently + # cur.execute("SET NAMES utf8;") + # cur.execute("SET CHARACTER SET utf8;") + # # I print SHOW SESSION VARIABLES LIKE 'character\_set\_%'; and SHOW SESSION VARIABLES LIKE 'collation\_%'; on my local computer + # cur.execute("SET character_set_client=utf8mb4;") + # cur.execute("SET character_set_connection=utf8mb4;") + # cur.execute("SET character_set_database=utf8;") + # cur.execute("SET character_set_results=utf8mb4;") + # cur.execute("SET character_set_server=utf8;") + # cur.execute("SET collation_connection=utf8mb4_general_ci;") + # cur.execute("SET collation_database=utf8_general_ci;") + # cur.execute("SET collation_server=utf8_general_ci;") + + # find jobs that are queued for extraction + cur.execute( + "SELECT `id`, `name`, `data` FROM `wiki_raw` WHERE `status` = 2 ORDER BY `name` ASC LIMIT %s", + (job_limit,), + ) + jobs = [] + for (id, name, data_str) in cur: + jobs.append((id, name, json.loads(data_str))) + get_structured_logger("wiki_extract").info(f"Processing data from {len(jobs)} jobs") + + # get the counts from the json object and insert into (or update) the database + # Notice that data_collect contains data with different languages + for (id, name, data_collect) in jobs: + get_structured_logger("wiki_extract").info(f"processing job [{int(id)}|{name}]...") + timestamp = round_timestamp(get_timestamp(name)) + for language in data_collect.keys(): + data = data_collect[language] + for article in sorted(data.keys()): + count = data[article] + cur.execute( + "INSERT INTO `wiki` (`datetime`, `article`, `count`, `language`) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE `count` = `count` + %s", + ( + str(timestamp), + article.encode("utf-8").decode("latin-1"), + count, + language, + count, + ), + ) + if article == "total": + cur.execute( + "INSERT INTO `wiki_meta` (`datetime`, `date`, `epiweek`, `total`, `language`) VALUES (%s, date(%s), yearweek(%s, 6), %s, %s) ON DUPLICATE KEY UPDATE `total` = `total` + %s", + (str(timestamp), str(timestamp), str(timestamp), count, language, count), + ) + # update the job + cur.execute("UPDATE `wiki_raw` SET `status` = 3 WHERE `id` = %s", (id,)) + + # cleanup + cur.close() + cnx.commit() + cnx.close() + + +if __name__ == "__main__": + run() diff --git a/src/acquisition/wiki/wiki_update.py b/src/acquisition/wiki/wiki_update.py index a9f240629..6fb357a56 100644 --- a/src/acquisition/wiki/wiki_update.py +++ b/src/acquisition/wiki/wiki_update.py @@ -29,6 +29,10 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.common.logger import get_structured_logger + + +logger = get_structured_logger("wiki_update") def floor_timestamp(timestamp): @@ -69,7 +73,7 @@ def get_manifest(year, month, optional=False): # unlike pagecounts-raw, pageviews doesn't provide hashes # url = 'https://dumps.wikimedia.org/other/pagecounts-raw/%d/%d-%02d/md5sums.txt'%(year, year, month) url = f"https://dumps.wikimedia.org/other/pageviews/{int(year)}/{int(year)}-{int(month):02}/" - print(f"Checking manifest at {url}...") + logger.info(f"Checking manifest at {url}...") response = requests.get(url) if response.status_code == 200: # manifest = [line.strip().split() for line in response.text.split('\n') if 'pagecounts' in line] @@ -83,7 +87,7 @@ def get_manifest(year, month, optional=False): manifest = [] else: raise Exception(f"expected 200 status code, but got {int(response.status_code)}") - print(f"Found {len(manifest)} access log(s)") + logger.info(f"Found {len(manifest)} access log(s)") return manifest @@ -98,7 +102,7 @@ def run(): cur.execute("SELECT max(`name`) FROM `wiki_raw`") for (max_name,) in cur: pass - print(f"Last known file: {max_name}") + logger.info(f"Last known file: {max_name}") timestamp = get_timestamp(max_name) # crawl dumps.wikimedia.org to find more recent access logs @@ -112,8 +116,8 @@ def run(): for (hash, name) in manifest: if max_name is None or name > max_name: new_logs[name] = hash - print(f" New job: {name} [{hash}]") - print(f"Found {len(new_logs)} new job(s)") + logger.info(f" New job: {name} [{hash}]") + logger.info(f"Found {len(new_logs)} new job(s)") # store metadata for new jobs for name in sorted(new_logs.keys()):