From 44ce8497fadcceefc2fca85f27742167d2fc8d62 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 17 Jul 2024 10:36:33 -0700 Subject: [PATCH 1/3] refactor: use delphi_utils.logger instead of copied file * remove duplicate logger.py in this repo --- src/acquisition/covid_hosp/common/database.py | 2 +- src/acquisition/covidcast/csv_importer.py | 3 +- src/acquisition/covidcast/csv_to_database.py | 2 +- src/acquisition/covidcast/database.py | 2 +- src/acquisition/covidcast/file_archiver.py | 2 +- src/common/logger.py | 254 ------------------ .../covidcast_meta_cache_updater.py | 2 +- src/maintenance/delete_batch.py | 2 +- src/maintenance/signal_dash_data_generator.py | 2 +- src/server/_common.py | 2 +- src/server/_printer.py | 2 +- src/server/_security.py | 2 +- src/server/admin/models.py | 2 +- src/server/endpoints/covidcast.py | 2 +- src/server/endpoints/covidcast_meta.py | 2 +- src/server/main.py | 2 +- src/server/utils/dates.py | 2 +- 17 files changed, 16 insertions(+), 271 deletions(-) delete mode 100644 src/common/logger.py diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index efbdb6c45..18c7f377f 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -11,7 +11,7 @@ # first party import delphi.operations.secrets as secrets -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger Columndef = namedtuple("Columndef", "csv_name sql_name dtype") diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index f6122e610..e9893c0da 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -13,10 +13,9 @@ import pandas as pd # first party -from delphi_utils import Nans +from delphi_utils import get_structured_logger, Nans from delphi.utils.epiweek import delta_epiweeks from delphi.epidata.common.covidcast_row import CovidcastRow -from delphi.epidata.common.logger import get_structured_logger DataFrameRow = NamedTuple('DFRow', [ ('geo_id', str), diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index be9dad86c..b3642fc51 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -11,7 +11,7 @@ from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger def get_argument_parser(): diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 871061b81..a8b146d30 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -14,7 +14,7 @@ # first party import delphi.operations.secrets as secrets -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from delphi.epidata.common.covidcast_row import CovidcastRow diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 802590871..07bd453f9 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -6,7 +6,7 @@ import shutil # first party -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger class FileArchiver: """Archives files by moving and compressing.""" diff --git a/src/common/logger.py b/src/common/logger.py deleted file mode 100644 index d04ff7673..000000000 --- a/src/common/logger.py +++ /dev/null @@ -1,254 +0,0 @@ -"""Structured logger utility for creating JSON logs.""" - -# the Delphi group uses two ~identical versions of this file. -# try to keep them in sync with edits, for sanity. -# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long -# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py - -import contextlib -import logging -import multiprocessing -import os -import sys -import threading -from traceback import format_exception - -import structlog - - -def handle_exceptions(logger): - """Handle exceptions using the provided logger.""" - - def exception_handler(scope, etype, value, traceback): - logger.exception("Top-level exception occurred", - scope=scope, exc_info=(etype, value, traceback)) - - def sys_exception_handler(etype, value, traceback): - exception_handler("sys", etype, value, traceback) - - def threading_exception_handler(args): - if args.exc_type == SystemExit and args.exc_value.code == 0: - # `sys.exit(0)` is considered "successful termination": - # https://docs.python.org/3/library/sys.html#sys.exit - logger.debug("normal thread exit", thread=args.thread, - stack="".join( - format_exception( - args.exc_type, args.exc_value, args.exc_traceback))) - else: - exception_handler(f"thread: {args.thread}", - args.exc_type, args.exc_value, args.exc_traceback) - - sys.excepthook = sys_exception_handler - threading.excepthook = threading_exception_handler - - -def get_structured_logger(name=__name__, - filename=None, - log_exceptions=True): - """Create a new structlog logger. - - Use the logger returned from this in indicator code using the standard - wrapper calls, e.g.: - - logger = get_structured_logger(__name__) - logger.warning("Error", type="Signal too low"). - - The output will be rendered as JSON which can easily be consumed by logs - processors. - - See the structlog documentation for details. - - Parameters - --------- - name: Name to use for logger (included in log lines), __name__ from caller - is a good choice. - filename: An (optional) file to write log output. - """ - # Set the underlying logging configuration - if "LOG_DEBUG" in os.environ: - log_level = logging.DEBUG - else: - log_level = logging.INFO - - logging.basicConfig( - format="%(message)s", - level=log_level, - handlers=[logging.StreamHandler()]) - - def add_pid(_logger, _method_name, event_dict): - """Add current PID to the event dict.""" - event_dict["pid"] = os.getpid() - return event_dict - - # Configure structlog. This uses many of the standard suggestions from - # the structlog documentation. - structlog.configure( - processors=[ - # Filter out log levels we are not tracking. - structlog.stdlib.filter_by_level, - # Include logger name in output. - structlog.stdlib.add_logger_name, - # Include log level in output. - structlog.stdlib.add_log_level, - # Include PID in output. - add_pid, - # Allow formatting into arguments e.g., logger.info("Hello, %s", - # name) - structlog.stdlib.PositionalArgumentsFormatter(), - # Add timestamps. - structlog.processors.TimeStamper(fmt="iso"), - # Match support for exception logging in the standard logger. - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - # Decode unicode characters - structlog.processors.UnicodeDecoder(), - # Render as JSON - structlog.processors.JSONRenderer(), - ], - # Use a dict class for keeping track of data. - context_class=dict, - # Use a standard logger for the actual log call. - logger_factory=structlog.stdlib.LoggerFactory(), - # Use a standard wrapper class for utilities like log.warning() - wrapper_class=structlog.stdlib.BoundLogger, - # Cache the logger - cache_logger_on_first_use=True, - ) - - # Create the underlying python logger and wrap it with structlog - system_logger = logging.getLogger(name) - if filename and not system_logger.handlers: - system_logger.addHandler(logging.FileHandler(filename)) - system_logger.setLevel(log_level) - logger = structlog.wrap_logger(system_logger) - - if log_exceptions: - handle_exceptions(logger) - - return logger - - -class LoggerThread(): - """ - A construct to use a logger from multiprocessing workers/jobs. - - the bare structlog loggers are thread-safe but not multiprocessing-safe. - a `LoggerThread` will spawn a thread that listens to a mp.Queue - and logs messages from it with the provided logger, - so other processes can send logging messages to it - via the logger-like `SubLogger` interface. - the SubLogger even logs the pid of the caller. - - this is good to use with a set of jobs that are part of a mp.Pool, - but isnt recommended for general use - because of overhead from threading and multiprocessing, - and because it might introduce lag to log messages. - - somewhat inspired by: - docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes - """ - - class SubLogger(): - """MP-safe logger-like interface to convey log messages to a listening LoggerThread.""" - - def __init__(self, queue): - """Create SubLogger with a bound queue.""" - self.queue = queue - - def _log(self, level, *args, **kwargs): - kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} - kwargs_plus.update(kwargs) - self.queue.put([level, args, kwargs_plus]) - - def debug(self, *args, **kwargs): - """Log a DEBUG level message.""" - self._log(logging.DEBUG, *args, **kwargs) - - def info(self, *args, **kwargs): - """Log an INFO level message.""" - self._log(logging.INFO, *args, **kwargs) - - def warning(self, *args, **kwargs): - """Log a WARNING level message.""" - self._log(logging.WARNING, *args, **kwargs) - - def error(self, *args, **kwargs): - """Log an ERROR level message.""" - self._log(logging.ERROR, *args, **kwargs) - - def critical(self, *args, **kwargs): - """Log a CRITICAL level message.""" - self._log(logging.CRITICAL, *args, **kwargs) - - - def get_sublogger(self): - """Retrieve SubLogger for this LoggerThread.""" - return self.sublogger - - def __init__(self, logger, q=None): - """Create and start LoggerThread with supplied logger, creating a queue if not provided.""" - self.logger = logger - if q: - self.msg_queue = q - else: - self.msg_queue = multiprocessing.Queue() - - def logger_thread_worker(): - logger.info('thread started') - while True: - msg = self.msg_queue.get() - if msg == 'STOP': - logger.debug('received stop signal') - break - level, args, kwargs = msg - if level in [logging.DEBUG, logging.INFO, logging.WARNING, - logging.ERROR, logging.CRITICAL]: - logger.log(level, *args, **kwargs) - else: - logger.error('received unknown logging level! exiting...', - level=level, args_kwargs=(args, kwargs)) - break - logger.debug('stopping thread') - - self.thread = threading.Thread(target=logger_thread_worker, - name="LoggerThread__"+logger.name) - logger.debug('starting thread') - self.thread.start() - - self.sublogger = LoggerThread.SubLogger(self.msg_queue) - self.running = True - - def stop(self): - """Terminate this LoggerThread.""" - if not self.running: - self.logger.warning('thread already stopped') - return - self.logger.debug('sending stop signal') - self.msg_queue.put('STOP') - self.thread.join() - self.running = False - self.logger.info('thread stopped') - - -@contextlib.contextmanager -def pool_and_threadedlogger(logger, *poolargs): - """ - Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger. - - Emulates the multiprocessing.Pool() context manager, - but also provides (via a LoggerThread) a SubLogger proxy to logger - that can be safely used by pool workers. - The SubLogger proxy interface supports these methods: debug, info, warning, error, - and critical. - Also "cleans up" the pool by waiting for workers to complete - as it exits the context. - """ - with multiprocessing.Manager() as manager: - logger_thread = LoggerThread(logger, manager.Queue()) - try: - with multiprocessing.Pool(*poolargs) as pool: - yield pool, logger_thread.get_sublogger() - pool.close() - pool.join() - finally: - logger_thread.stop() diff --git a/src/maintenance/covidcast_meta_cache_updater.py b/src/maintenance/covidcast_meta_cache_updater.py index c5f7fe3e8..cb0b2703f 100644 --- a/src/maintenance/covidcast_meta_cache_updater.py +++ b/src/maintenance/covidcast_meta_cache_updater.py @@ -7,7 +7,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata def get_argument_parser(): diff --git a/src/maintenance/delete_batch.py b/src/maintenance/delete_batch.py index 31a25ef2a..8e8298817 100644 --- a/src/maintenance/delete_batch.py +++ b/src/maintenance/delete_batch.py @@ -8,7 +8,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger def get_argument_parser(): diff --git a/src/maintenance/signal_dash_data_generator.py b/src/maintenance/signal_dash_data_generator.py index b7f1048f5..56c69491f 100644 --- a/src/maintenance/signal_dash_data_generator.py +++ b/src/maintenance/signal_dash_data_generator.py @@ -15,7 +15,7 @@ # first party import covidcast import delphi.operations.secrets as secrets -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger LOOKBACK_DAYS_FOR_COVERAGE = 56 diff --git a/src/server/_common.py b/src/server/_common.py index 33a3f9c48..692b83491 100644 --- a/src/server/_common.py +++ b/src/server/_common.py @@ -7,7 +7,7 @@ from werkzeug.exceptions import Unauthorized from werkzeug.local import LocalProxy -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from ._config import SECRET, REVERSE_PROXY_DEPTH from ._db import engine from ._exceptions import DatabaseErrorException, EpiDataException diff --git a/src/server/_printer.py b/src/server/_printer.py index 5616787a2..6df6d62b9 100644 --- a/src/server/_printer.py +++ b/src/server/_printer.py @@ -8,7 +8,7 @@ from ._config import MAX_RESULTS, MAX_COMPATIBILITY_RESULTS from ._common import is_compatibility_mode, log_info_with_request -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger def print_non_standard(format: str, data): diff --git a/src/server/_security.py b/src/server/_security.py index c47f948a5..2e127debf 100644 --- a/src/server/_security.py +++ b/src/server/_security.py @@ -3,7 +3,7 @@ from typing import Optional, cast import redis -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from flask import g, request from werkzeug.exceptions import Unauthorized from werkzeug.local import LocalProxy diff --git a/src/server/admin/models.py b/src/server/admin/models.py index f5c0d54ed..e0ef86b0f 100644 --- a/src/server/admin/models.py +++ b/src/server/admin/models.py @@ -4,7 +4,7 @@ from copy import deepcopy from .._db import Session, WriteSession, default_session -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from typing import Set, Optional, List from datetime import datetime as dtime diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index 11de3cbca..3d7d99e82 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -36,7 +36,7 @@ from .covidcast_utils import compute_trend, compute_trends, compute_trend_value, CovidcastMetaEntry from ..utils import shift_day_value, day_to_time_value, time_value_to_iso, time_value_to_day, shift_week_value, time_value_to_week, guess_time_value_is_day, week_to_time_value, TimeValues from .covidcast_utils.model import TimeType, count_signal_time_types, data_sources, create_source_signal_alias_mapper -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger # first argument is the endpoint name bp = Blueprint("covidcast", __name__) diff --git a/src/server/endpoints/covidcast_meta.py b/src/server/endpoints/covidcast_meta.py index 35dc9f12e..8c2219ae7 100644 --- a/src/server/endpoints/covidcast_meta.py +++ b/src/server/endpoints/covidcast_meta.py @@ -9,7 +9,7 @@ from .._printer import create_printer from .._query import filter_fields from .._security import current_user, sources_protected_by_roles -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger bp = Blueprint("covidcast_meta", __name__) diff --git a/src/server/main.py b/src/server/main.py index 2ec07e5a5..9d308c8ac 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -6,7 +6,7 @@ from flask import request, send_file, Response, send_from_directory, jsonify -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from ._config import URL_PREFIX, VERSION from ._common import app, set_compatibility_mode diff --git a/src/server/utils/dates.py b/src/server/utils/dates.py index 4d6c242c9..010a6d27f 100644 --- a/src/server/utils/dates.py +++ b/src/server/utils/dates.py @@ -10,7 +10,7 @@ from epiweeks import Week, Year from typing_extensions import TypeAlias -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger # Alias for a sequence of date ranges (int, int) or date integers IntRange: TypeAlias = Union[Tuple[int, int], int] From 81179c5f144b8f25421e799e823e18cde43c84f9 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 17 Jul 2024 10:37:40 -0700 Subject: [PATCH 2/3] lint: trailing whitespace changes --- src/acquisition/covidcast/database.py | 18 +++++++++--------- src/maintenance/signal_dash_data_generator.py | 6 +++--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index a8b146d30..5fd56923b 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -117,16 +117,16 @@ def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, get_structured_logger("insert_or_update_batch").fatal(err_msg) raise DBLoadStateException(err_msg) - # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and + # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and # `is_latest_issue` is hardcoded to 1 (which is temporary and addressed later in this method) insert_into_loader_sql = f''' INSERT INTO `{self.load_table}` (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, - `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, + `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, `is_latest_issue`, `missing_value`, `missing_stderr`, `missing_sample_size`) VALUES - (%s, %s, %s, %s, %s, %s, - UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, + (%s, %s, %s, %s, %s, %s, + UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, 1, %s, %s, %s) ''' @@ -134,11 +134,11 @@ def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, # if an entry in the load table is NOT in the latest table, it is clearly now the latest value for that key (so we do nothing (thanks to INNER join)). # if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load. fix_is_latest_issue_sql = f''' - UPDATE - `{self.load_table}` JOIN `{self.latest_view}` - USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) - SET `{self.load_table}`.`is_latest_issue`=0 - WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` + UPDATE + `{self.load_table}` JOIN `{self.latest_view}` + USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) + SET `{self.load_table}`.`is_latest_issue`=0 + WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` ''' # TODO: consider handling cc_rows as a generator instead of a list diff --git a/src/maintenance/signal_dash_data_generator.py b/src/maintenance/signal_dash_data_generator.py index 56c69491f..5a7067f83 100644 --- a/src/maintenance/signal_dash_data_generator.py +++ b/src/maintenance/signal_dash_data_generator.py @@ -150,11 +150,11 @@ def write_coverage( def get_enabled_signals(self) -> List[DashboardSignal]: """Retrieve all enabled signals from the database""" - select_statement = f'''SELECT `id`, + select_statement = f'''SELECT `id`, `name`, `source`, `covidcast_signal`, - `latest_coverage_update`, + `latest_coverage_update`, `latest_status_update` FROM `{Database.SIGNAL_TABLE_NAME}` WHERE `enabled` @@ -208,7 +208,7 @@ def get_coverage(dashboard_signal: DashboardSignal) -> List[DashboardSignalCover lambda x: pd.to_datetime(Week(x // 100, x % 100).startdate())) signal_coverage_list = [] - + for _, row in count_by_geo_type_df.iterrows(): signal_coverage = DashboardSignalCoverage( signal_id=dashboard_signal.db_id, From 8805f3c60886f5eea2dbe920fbcd1ca0ece4d2ba Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 17 Jul 2024 10:38:47 -0700 Subject: [PATCH 3/3] repo: ignore lint in blame --- .git-blame-ignore-revs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index bfaa3109e..23f95be33 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -20,3 +20,5 @@ b9ceb400d9248c8271e8342275664ac5524e335d 07ed83e5768f717ab0f9a62a9209e4e2cffa058d # style(black): format wiki acquisition 923852eafa86b8f8b182d499489249ba8f815843 +# lint: trailing whitespace changes +1d5a71db152630698e8f525b007407a83e109a83 \ No newline at end of file