Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced print statements with logger #1299

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/acquisition/cdcp/cdc_dropbox_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@

# first party
import delphi.operations.secrets as secrets
from delphi.epidata.common.logger import get_structured_logger


# location constants
DROPBOX_BASE_DIR = "/cdc_page_stats"
DELPHI_BASE_DIR = "/common/cdc_stage"


logger = get_structured_logger("cdc_dropbox_receiver")


def get_timestamp_string():
"""
Return the current local date and time as a string.
Expand Down Expand Up @@ -69,30 +73,30 @@ def fetch_data():
dbx = dropbox.Dropbox(secrets.cdcp.dropbox_token)

# look for new CDC data files
print(f"checking dropbox: {DROPBOX_BASE_DIR}")
logger.info(f"checking dropbox: {DROPBOX_BASE_DIR}")
save_list = []
for entry in dbx.files_list_folder(DROPBOX_BASE_DIR).entries:
name = entry.name
if name.endswith(".csv") or name.endswith(".zip"):
print(f" download: {name}")
logger.info(f" download: {name}")
save_list.append(name)
else:
print(f" skip: {name}")
logger.info(f" skip: {name}")

# determine if there's anything to be done
if len(save_list) == 0:
print("did not find any new data files")
logger.info("did not find any new data files")
return

# download new files, saving them inside of a new zip file
timestamp = get_timestamp_string()
zip_path = f"{DELPHI_BASE_DIR}/dropbox_{timestamp}.zip"
print(f"downloading into delphi:{zip_path}")
logger.info(f"downloading into delphi:{zip_path}")
with ZipFile(zip_path, "w", ZIP_DEFLATED) as zf:
for name in save_list:
# location of the file on dropbox
dropbox_path = f"{DROPBOX_BASE_DIR}/{name}"
print(f" {dropbox_path}")
logger.info(f" {dropbox_path}")

# start the download
meta, resp = dbx.files_download(dropbox_path)
Expand All @@ -101,7 +105,7 @@ def fetch_data():
if resp.status_code != 200:
raise Exception(["resp.status_code", resp.status_code])
dropbox_len = meta.size
print(f" need {int(dropbox_len)} bytes...")
logger.info(f" need {int(dropbox_len)} bytes...")
content_len = int(resp.headers.get("Content-Length", -1))
if dropbox_len != content_len:
info = ["dropbox_len", dropbox_len, "content_len", content_len]
Expand All @@ -112,27 +116,27 @@ def fetch_data():

# check the length again
payload_len = len(filedata)
print(" downloaded")
logger.info(" downloaded")
if dropbox_len != payload_len:
info = ["dropbox_len", dropbox_len, "payload_len", payload_len]
raise Exception(info)

# add the downloaded file to the zip file
zf.writestr(name, filedata)
print(" added")
logger.info(" added")

# At this point, all the data is stored and awaiting further processing on
# the delphi server.
print(f"saved all new data in {zip_path}")
logger.info(f"saved all new data in {zip_path}")

# on dropbox, archive downloaded files so they won't be downloaded again
archive_dir = f"archived_reports/processed_{timestamp}"
print("archiving files...")
logger.info("archiving files...")
for name in save_list:
# source and destination
dropbox_src = f"{DROPBOX_BASE_DIR}/{name}"
dropbox_dst = f"{DROPBOX_BASE_DIR}/{archive_dir}/{name}"
print(f" {dropbox_src} -> {dropbox_dst}")
logger.info(f" {dropbox_src} -> {dropbox_dst}")

# move the file
meta = dbx.files_move(dropbox_src, dropbox_dst)
Expand All @@ -142,9 +146,9 @@ def fetch_data():
raise Exception(f"failed to move {name}")

# finally, trigger the usual processing flow
print("triggering processing flow")
logger.info("triggering processing flow")
trigger_further_processing()
print("done")
logger.info("done")


def main():
Expand Down
10 changes: 7 additions & 3 deletions src/acquisition/cdcp/cdc_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
import delphi.operations.secrets as secrets
import delphi.utils.epiweek as flu
from . import cdc_upload
from delphi.epidata.common.logger import get_structured_logger


logger = get_structured_logger("cdc_extract")


def get_num_hits(cur, epiweek, state, page):
Expand Down Expand Up @@ -166,7 +170,7 @@ def extract(first_week=None, last_week=None, test_mode=False):
cur.execute("SELECT max(`epiweek`) FROM `cdc_meta`")
for (last_week,) in cur:
pass
print(f"extracting {int(first_week)}--{int(last_week)}")
logger.info(f"extracting {int(first_week)}--{int(last_week)}")

# update each epiweek
for epiweek in flu.range_epiweeks(first_week, last_week, inclusive=True):
Expand All @@ -178,9 +182,9 @@ def extract(first_week=None, last_week=None, test_mode=False):
nums[i] = get_num_hits(cur, epiweek, state, pages[i])
total = get_total_hits(cur, epiweek, state)
store_result(cur, epiweek, state, *nums, total)
print(f" {epiweek}-{state}: {' '.join(str(n) for n in nums)} ({total})")
logger.info(f" {epiweek}-{state}: {' '.join(str(n) for n in nums)} ({total})")
except Exception as ex:
print(f" {int(epiweek)}-{state}: failed", ex)
logger.error(f" {int(epiweek)}-{state}: failed", exception=ex)
# raise ex
sys.stdout.flush()

Expand Down
23 changes: 13 additions & 10 deletions src/acquisition/cdcp/cdc_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@

# first party
import delphi.operations.secrets as secrets
from delphi.epidata.common.logger import get_structured_logger


logger = get_structured_logger("cdc_upload")


STATES = {
Expand Down Expand Up @@ -216,7 +220,7 @@ def handler(reader):
def parse_zip(zf, level=1):
for name in zf.namelist():
prefix = " " * level
print(prefix, name)
logger.info(f"{prefix}, {name}")
if name[-4:] == ".zip":
with zf.open(name) as temp:
with ZipFile(io.BytesIO(temp.read())) as zf2:
Expand All @@ -228,30 +232,29 @@ def parse_zip(zf, level=1):
elif "Regions for all CDC" in name:
handler = parse_csv(True)
else:
print(prefix, " (skipped)")
logger.info(f"{prefix}, (skipped)")
if handler is not None:
with zf.open(name) as temp:
count = handler(csv.reader(io.StringIO(str(temp.read(), "utf-8"))))
print(prefix, f" {int(count)} rows")
logger.info(f"{prefix}, {int(count)} rows")
else:
print(prefix, " (ignored)")
logger.info(f"{prefix} (ignored)")

# find, parse, and move zip files
zip_files = glob.glob("/common/cdc_stage/*.zip")
print("searching...")
logger.info("searching...")
for f in zip_files:
print(" ", f)
print("parsing...")
logger.info(f"{f}")
for f in zip_files:
with ZipFile(f) as zf:
parse_zip(zf)
print("moving...")
logger.info("moving...")
for f in zip_files:
src = f
dst = os.path.join("/home/automation/cdc_page_stats/", os.path.basename(src))
print(" ", src, "->", dst)
logger.info(" ", src, "->", dst)
if test_mode:
print(" (test mode enabled - not moved)")
logger.info(" (test mode enabled - not moved)")
else:
shutil.move(src, dst)
if not os.path.isfile(dst):
Expand Down
13 changes: 7 additions & 6 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def delete_batch(self, cc_deletions):
- time_type
"""

logger = get_structured_logger("delete_batch")
tmp_table_name = "tmp_delete_table"
# composite keys:
short_comp_key = "`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`"
Expand Down Expand Up @@ -412,21 +413,21 @@ def split_list(lst, n):
yield lst[i:(i+n)]
for deletions_batch in split_list(cc_deletions, 100000):
self._cursor.executemany(load_tmp_table_insert_sql, deletions_batch)
print(f"load_tmp_table_insert_sql:{self._cursor.rowcount}")
logger.debug(f"load_tmp_table_insert_sql:{self._cursor.rowcount}")
else:
raise Exception(f"Bad deletions argument: need a filename or a list of tuples; got a {type(cc_deletions)}")
self._cursor.execute(add_history_id_sql)
print(f"add_history_id_sql:{self._cursor.rowcount}")
logger.debug(f"add_history_id_sql:{self._cursor.rowcount}")
self._cursor.execute(mark_for_update_latest_sql)
print(f"mark_for_update_latest_sql:{self._cursor.rowcount}")
logger.debug(f"mark_for_update_latest_sql:{self._cursor.rowcount}")
self._cursor.execute(delete_history_sql)
print(f"delete_history_sql:{self._cursor.rowcount}")
logger.debug(f"delete_history_sql:{self._cursor.rowcount}")
total = self._cursor.rowcount
# TODO: consider reporting rows removed and/or replaced in latest table as well
self._cursor.execute(delete_latest_sql)
print(f"delete_latest_sql:{self._cursor.rowcount}")
logger.debug(f"delete_latest_sql:{self._cursor.rowcount}")
self._cursor.execute(update_latest_sql)
print(f"update_latest_sql:{self._cursor.rowcount}")
logger.debug(f"update_latest_sql:{self._cursor.rowcount}")
self._connection.commit()

if total == -1:
Expand Down
4 changes: 3 additions & 1 deletion src/acquisition/covidcast/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from delphi.epidata.server._config import REDIS_HOST, REDIS_PASSWORD
from delphi.epidata.server.utils.dates import day_to_time_value, time_value_to_day
import delphi.operations.secrets as secrets
from delphi.epidata.common.logger import get_structured_logger


# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value
Expand Down Expand Up @@ -193,7 +195,7 @@ def localTearDown(self):
def _insert_rows(self, rows: Sequence[CovidcastTestRow]):
# inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables
n = self._db.insert_or_update_bulk(rows)
print(f"{n} rows added to load table & dispatched to v4 schema")
get_structured_logger("covidcast_test_utils").info(f"{n} rows added to load table & dispatched to v4 schema")
self._db._connection.commit() # NOTE: this isnt expressly needed for our test cases, but would be if using external access (like through client lib) to ensure changes are visible outside of this db session

def params_from_row(self, row: CovidcastTestRow, **kwargs):
Expand Down
112 changes: 112 additions & 0 deletions src/acquisition/covidcast_nowcast/load_sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from shutil import move
import os
import time

import pandas as pd
import sqlalchemy

import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails
from delphi.epidata.common.logger import get_structured_logger


SENSOR_CSV_PATH = "/common/covidcast_nowcast/receiving/"
SUCCESS_DIR = "archive/successful"
FAIL_DIR = "archive/failed"
TABLE_NAME = "covidcast_nowcast"
DB_NAME = "epidata"
CSV_DTYPES = {"sensor_name": str, "geo_value": str, "value": float}


def main(csv_path: str = SENSOR_CSV_PATH) -> None:
"""
Parse all files in a given directory and insert them into the sensor table in the database.

For all the files found recursively in csv_path that match the naming scheme specified by
CsvImporter.find_csv_files(), attempt to load and insert them into the database. Files which do
not match the naming scheme will be moved to an archive/failed folder and skipped, and files
which raise an error during loading/uploading will be moved to the archive/failed folder and
have the error raised.

Parameters
----------
csv_path
Path to folder containing files to load.

Returns
-------
None.
"""
user, pw = secrets.db.epi
engine = sqlalchemy.create_engine(f"mysql+pymysql://{user}:{pw}@{secrets.db.host}/{DB_NAME}")
for filepath, attribute in CsvImporter.find_issue_specific_csv_files(csv_path):
if attribute is None:
_move_after_processing(filepath, success=False)
continue
try:
data = load_and_prepare_file(filepath, attribute)
with engine.connect() as conn:
method = _create_upsert_method(sqlalchemy.MetaData(conn))
data.to_sql(TABLE_NAME, engine, if_exists="append", method=method, index=False)
except Exception:
_move_after_processing(filepath, success=False)
raise
_move_after_processing(filepath, success=True)


def load_and_prepare_file(filepath: str, attributes: PathDetails) -> pd.DataFrame:
"""
Read CSV file into a DataFrame and add relevant attributes as new columns to match DB table.

Parameters
----------
filepath
Path to CSV file.
attributes
(source, signal, time_type, geo_type, time_value, issue, lag) tuple
returned by CsvImport.find_csv_files

Returns
-------
DataFrame with additional attributes added as columns based on filename and current date.
"""
data = pd.read_csv(filepath, dtype=CSV_DTYPES)
data["source"] = attributes.source
data["signal"] = attributes.signal
data["time_type"] = attributes.time_type
data["geo_type"] = attributes.geo_type
data["time_value"] = attributes.time_value
data["issue"] = attributes.issue
data["lag"] = attributes.lag
data["value_updated_timestamp"] = int(time.time())
return data


def _move_after_processing(filepath, success):
archive_dir = SUCCESS_DIR if success else FAIL_DIR
new_dir = os.path.dirname(filepath).replace("receiving", archive_dir)
os.makedirs(new_dir, exist_ok=True)
move(filepath, filepath.replace("receiving", archive_dir))
get_structured_logger("covidcast_nowcast_load_sensors").info(f"{filepath} moved to {archive_dir}")


def _create_upsert_method(meta):
def method(table, conn, keys, data_iter):
sql_table = sqlalchemy.Table(
table.name,
meta,
# specify lag column explicitly; lag is a reserved word sqlalchemy doesn't know about
sqlalchemy.Column("lag", sqlalchemy.Integer, quote=True),
autoload=True,
)
insert_stmt = sqlalchemy.dialects.mysql.insert(sql_table).values(
[dict(zip(keys, data)) for data in data_iter]
)
upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted})
conn.execute(upsert_stmt)

return method


if __name__ == "__main__":
main()
Loading
Loading