diff --git a/.github/workflows/create-release.yml b/.github/workflows/create-release.yml index 01b9154ea..a8cb6c3bb 100644 --- a/.github/workflows/create-release.yml +++ b/.github/workflows/create-release.yml @@ -61,7 +61,7 @@ jobs: bump2version --list ${{ github.event.inputs.versionName }} | grep ^new_version | sed -r s,"^.*=",, - name: Copy version to indicator directory run: | - indicator_list=("changehc" "claims_hosp" "doctor_visits" "google_symptoms" "hhs_hosp" "nchs_mortality" "nowcast" "quidel_covidtest" "sir_complainsalot") + indicator_list=("changehc" "claims_hosp" "doctor_visits" "google_symptoms" "hhs_hosp" "nchs_mortality" "quidel_covidtest" "sir_complainsalot") for path in ${indicator_list[@]}; do echo "current_version = ${{ steps.indicators.outputs.version }}" > $path/version.cfg done diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 7e09009a9..d0d64cc97 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -16,7 +16,7 @@ jobs: if: github.event.pull_request.draft == false strategy: matrix: - packages: [_delphi_utils_python, changehc, claims_hosp, doctor_visits, google_symptoms, hhs_hosp, nchs_mortality, nowcast, quidel_covidtest, sir_complainsalot] + packages: [_delphi_utils_python, changehc, claims_hosp, doctor_visits, google_symptoms, hhs_hosp, nchs_mortality, quidel_covidtest, sir_complainsalot] defaults: run: working-directory: ${{ matrix.packages }} diff --git a/nowcast/.pylintrc b/nowcast/.pylintrc deleted file mode 100644 index 786c72eb8..000000000 --- a/nowcast/.pylintrc +++ /dev/null @@ -1,26 +0,0 @@ - -[MASTER] - -ignore=delphi_nowcast/nowcast_fusion - -[MESSAGES CONTROL] - -disable=logging-format-interpolation, - too-many-locals, - too-many-arguments, - # Allow pytest functions to be part of a class. - no-self-use, - # Allow pytest classes to have one test. - too-few-public-methods - -[BASIC] - -# Allow arbitrarily short-named variables. -variable-rgx=[A-Za-z_][a-z0-9_]* -argument-rgx=[A-Za-z_][a-z0-9_]* -attr-rgx=[A-Za-z_][a-z0-9_]* - -[DESIGN] - -# Don't complain about pytest "unused" arguments. -ignored-argument-names=(_.*|run_as_module) diff --git a/nowcast/Makefile b/nowcast/Makefile deleted file mode 100644 index 38c796e92..000000000 --- a/nowcast/Makefile +++ /dev/null @@ -1,29 +0,0 @@ -.PHONY = venv, lint, test, clean - -dir = $(shell find ./delphi_* -name __init__.py | grep -o 'delphi_[_[:alnum:]]*' | head -1) -venv: - python3.8 -m venv env - -install: venv - . env/bin/activate; \ - pip install wheel ; \ - pip install -e ../_delphi_utils_python ;\ - pip install -e . - -install-ci: venv - . env/bin/activate; \ - pip install wheel ; \ - pip install ../_delphi_utils_python ;\ - pip install . - -lint: - . env/bin/activate; pylint $(dir) - . env/bin/activate; pydocstyle $(dir) --match-dir '(?!nowcast_fusion)' - -test: - . env/bin/activate ;\ - (cd tests && ../env/bin/pytest --cov=$(dir) --cov-report=term-missing) - -clean: - rm -rf env - rm -f params.json diff --git a/nowcast/README.md b/nowcast/README.md deleted file mode 100644 index 8699300b7..000000000 --- a/nowcast/README.md +++ /dev/null @@ -1,61 +0,0 @@ -# Nowcast - - -## Running the Indicator - -The indicator is run by directly executing the Python module contained in this -directory. The safest way to do this is to create a virtual environment, -installed the common DELPHI tools, and then install the module and its -dependencies. To do this, run the following command from this directory: - -``` -make install -``` - -This command will install the package in editable mode, so you can make changes that -will automatically propagate to the installed package. - -All of the user-changable parameters are stored in `params.json`. To execute -the module and produce the output datasets (by default, in `receiving`), run -the following: - -``` -env/bin/python -m delphi_nowcast -``` - -If you want to enter the virtual environment in your shell, -you can run `source env/bin/activate`. Run `deactivate` to leave the virtual environment. - -Once you are finished, you can remove the virtual environment and -params file with the following: - -``` -make clean -``` - -## Testing the code - -To run static tests of the code style, run the following command: - -``` -make lint -``` - -Unit tests are also included in the module. To execute these, run the following -command from this directory: - -``` -make test -``` - -To run individual tests, run the following: - -``` -(cd tests && ../env/bin/pytest .py --cov=delphi_nowcast --cov-report=term-missing) -``` - -The output will show the number of unit tests that passed and failed, along -with the percentage of code covered by the tests. - -None of the linting or unit tests should fail, and the code lines that are not covered by unit tests should be small and -should not include critical sub-routines. diff --git a/nowcast/REVIEW.md b/nowcast/REVIEW.md deleted file mode 100644 index 93a5a6579..000000000 --- a/nowcast/REVIEW.md +++ /dev/null @@ -1,39 +0,0 @@ -## Code Review (Python) - -A code review of this module should include a careful look at the code and the -output. To assist in the process, but certainly not in replace of it, please -check the following items. - -**Documentation** - -- [ ] the README.md file template is filled out and currently accurate; it is -possible to load and test the code using only the instructions given -- [ ] minimal docstrings (one line describing what the function does) are -included for all functions; full docstrings describing the inputs and expected -outputs should be given for non-trivial functions - -**Structure** - -- [ ] code should use 4 spaces for indentation; other style decisions are -flexible, but be consistent within a module -- [ ] any required metadata files are checked into the repository and placed -within the directory `static` -- [ ] any intermediate files that are created and stored by the module should -be placed in the directory `cache` -- [ ] final expected output files to be uploaded to the API are placed in the -`receiving` directory; output files should not be committed to the respository -- [ ] all options and API keys are passed through the file `params.json` -- [ ] template parameter file (`params.json.template`) is checked into the -code; no personal (i.e., usernames) or private (i.e., API keys) information is -included in this template file - -**Testing** - -- [ ] module can be installed in a new virtual environment -- [ ] pylint with the default `.pylint` settings run over the module produces -minimal warnings; warnings that do exist have been confirmed as false positives -- [ ] reasonably high level of unit test coverage covering all of the main logic -of the code (e.g., missing coverage for raised errors that do not currently seem -possible to reach are okay; missing coverage for options that will be needed are -not) -- [ ] all unit tests run without errors diff --git a/nowcast/cache/.gitignore b/nowcast/cache/.gitignore deleted file mode 100644 index e69de29bb..000000000 diff --git a/nowcast/delphi_nowcast/__init__.py b/nowcast/delphi_nowcast/__init__.py deleted file mode 100644 index f171a0434..000000000 --- a/nowcast/delphi_nowcast/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -"""Module to generate nowcasts. - -This file defines the functions that are made public by the module. As the -module is intended to be executed though the main method, these are primarily -for testing. -""" - -from __future__ import absolute_import - -from . import run - -__version__ = "0.1.0" diff --git a/nowcast/delphi_nowcast/__main__.py b/nowcast/delphi_nowcast/__main__.py deleted file mode 100644 index 015f6f919..000000000 --- a/nowcast/delphi_nowcast/__main__.py +++ /dev/null @@ -1,11 +0,0 @@ -# -*- coding: utf-8 -*- -"""Call the function run_module when executed. - -This file indicates that calling the module (`python -m delphi_nowcast`) will -call the function `run_module` found within the run.py file. There should be -no need to change this template. -""" - -from .run import run_module # pragma: no cover - -run_module() # pragma: no cover diff --git a/nowcast/delphi_nowcast/constants.py b/nowcast/delphi_nowcast/constants.py deleted file mode 100644 index f1533fa3a..000000000 --- a/nowcast/delphi_nowcast/constants.py +++ /dev/null @@ -1,20 +0,0 @@ -"""Registry for constants.""" -from .data_containers import SensorConfig - - -# Ground truth parameters -GROUND_TRUTH_INDICATOR = SensorConfig("placeholder", "placeholder", "placeholder", 0) - -# Delay distribution -DELAY_DISTRIBUTION = [] - -# Deconvolution parameters -FIT_FUNC = "placeholder" - -# AR Sensor parameters -AR_ORDER = 3 -AR_LAMBDA = 0.1 - -# Regression Sensor parameters -REG_SENSORS = [SensorConfig("placeholder", "placeholder", "placeholder", 0),] -REG_INTERCEPT = True diff --git a/nowcast/delphi_nowcast/data_containers.py b/nowcast/delphi_nowcast/data_containers.py deleted file mode 100644 index 2a04c606f..000000000 --- a/nowcast/delphi_nowcast/data_containers.py +++ /dev/null @@ -1,87 +0,0 @@ -"""Data container classes for holding sensor configurations and data needed for fusion.""" - -from dataclasses import dataclass -from typing import List, Dict -from datetime import date - -from numpy import nan, nanmean, isnan -from pandas import date_range - - -@dataclass(frozen=True) -class SensorConfig: - """Dataclass for specifying a sensor's name, number of lag days, and origin source/signal.""" - - source: str - signal: str - name: str - lag: int - - -@dataclass -class LocationSeries: - """Data class for holding time series data specific to a single location.""" - - geo_value: str = None - geo_type: str = None - data: Dict[date, float] = None - - def add_data(self, - day: date, - value: float, - overwrite: bool = False) -> None: - """Append a date and value to existing attributes. - - Safer than appending individually since the two lists shouldn't have different lengths. - """ - if day in self.dates and not overwrite: - raise ValueError("Date already exists in LocationSeries. " - "To overwrite, use overwrite=True") - self.data[day] = value - - @property - def dates(self) -> list: - """Check if there is no stored data in the class.""" - if not self.data: - raise ValueError("No data") - return list(self.data.keys()) - - @property - def values(self) -> list: - """Check if there is no stored data in the class.""" - if not self.data: - raise ValueError("No data") - return list(self.data.values()) - - def get_data_range(self, - start_date: date, - end_date: date, - imputation_method: str = None) -> List[float]: - """ - Return value of LocationSeries between two dates with optional imputation. - - Parameters - ---------- - start_date - First day to include in range. - end_date - Last day to include in range. - imputation_method - Optional type of imputation to conduct. Currently only "mean" is supported. - - Returns - ------- - List of values, one for each day in the range. - """ - if start_date < min(self.dates) or end_date > max(self.dates): - raise ValueError(f"Data range must be within existing dates " - f"{min(self.dates)} to {max(self.dates)}.") - all_dates = date_range(start_date, end_date) - out_values = [self.data.get(day.date(), nan) for day in all_dates] - if imputation_method is None or not out_values: - return out_values - if imputation_method == "mean": - mean = nanmean(out_values) - out_values = [i if not isnan(i) else mean for i in out_values] - return out_values - raise ValueError("Invalid imputation method. Must be None or 'mean'") diff --git a/nowcast/delphi_nowcast/deconvolution/__init__.py b/nowcast/delphi_nowcast/deconvolution/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/nowcast/delphi_nowcast/deconvolution/deconvolution.py b/nowcast/delphi_nowcast/deconvolution/deconvolution.py deleted file mode 100644 index 03e4a9c08..000000000 --- a/nowcast/delphi_nowcast/deconvolution/deconvolution.py +++ /dev/null @@ -1,296 +0,0 @@ -"""Deconvolution functions.""" - -from functools import partial -from typing import Callable - -import numpy as np -from scipy.linalg import toeplitz -from scipy.sparse import diags as band - - -def deconvolve_double_smooth_ntf( - y: np.ndarray, - x: np.ndarray, - kernel: np.ndarray, - lam: float, - gam: float, - n_iters: int = 200, - k: int = 3, - clip: bool = False) -> np.ndarray: - """ - Perform natural trend filtering regularized deconvolution. Only implemented for k=3. - - Parameters - ---------- - y - array of values to convolve - x - array of positions - kernel - array with convolution kernel values - lam - regularization parameter for trend filtering penalty smoothness - gam - regularization parameter for penalty on first differences of boundary points - n_iters - number of ADMM interations to perform. - k - order of the trend filtering penalty. - clip - Boolean to clip count values to [0, infty). - - Returns - ------- - array of the deconvolved signal values - """ - assert k == 3, "Natural TF only implemented for k=3" - n = y.shape[0] - m = kernel.shape[0] - rho = lam # set equal - C = _construct_convolution_matrix(y, kernel, False)[:n, ] - D = band([-1, 1], [0, 1], shape=(n - 1, n)).toarray() - D = np.diff(D, n=k, axis=0) - P = _construct_poly_interp_mat(x, k) - D_m = band([-1, 1], [0, 1], shape=(n - 1, n)).toarray() - D_m[:-m, :] = 0 - - # kernel weights for double smoothing - weights = np.ones((D_m.shape[0],)) - weights[-m:] = np.cumsum(kernel[::-1]) - weights /= np.max(weights) - D_m = np.sqrt(np.diag(2 * gam * weights)) @ D_m - C = C @ P - D = D @ P - D_m = D_m @ P - - # pre-calculations - DtD = D.T @ D - DmtDm = D_m.T @ D_m - CtC = C.T @ C / n - Cty = C.T @ y / n - x_update_1 = np.linalg.inv(DmtDm + CtC + rho * DtD) - - # begin admm loop - x_k = None - alpha_0 = np.zeros(n - k - 1) - u_0 = np.zeros(n - k - 1) - for _ in range(n_iters): - x_k = x_update_1 @ (Cty + rho * D.T @ (alpha_0 + u_0)) - Dx = D @ x_k - alpha_k = _soft_thresh(Dx - u_0, lam / rho) - u_k = u_0 + alpha_k - Dx - alpha_0 = alpha_k - u_0 = u_k - x_k = P @ x_k - if clip: - x_k = np.clip(x_k, 0, np.infty) - return x_k - - -def deconvolve_double_smooth_tf_cv( - y: np.ndarray, - x: np.ndarray, - kernel: np.ndarray, - fit_func: Callable = deconvolve_double_smooth_ntf, - lam_cv_grid: np.ndarray = np.logspace(1, 3.5, 10), - gam_cv_grid: np.ndarray = np.r_[np.logspace(0, 0.2, 6) - 1, [1, 5, 10, 50]], - gam_n_folds: int = 10, - n_iters: int = 200, - k: int = 3, - clip: bool = True, - verbose: bool = False) -> np.ndarray: - """ - Run cross-validation to tune smoothness over deconvolve_double_smooth_ntf. - First, leave-every-third-out CV is performed over lambda, fixing gamma=0. After - choosing the lambda with the smallest squared error, forward validation is done to - select gamma. - - Parameters - ---------- - y - array of values to convolve - x - array of positions - kernel - array with convolution kernel values - fit_func - deconvolution function to use - lam_cv_grid - grid of trend filtering penalty values to search over - gam_cv_grid - grid of second boundary smoothness penalty values to search over - gam_n_folds - number of splits for forward cv (see above documentation) - n_iters - number of ADMM interations to perform. - k - order of the trend filtering penalty. - clip - Boolean to clip count values to [0, infty) - verbose - Boolean whether to print debug statements - - - Returns - ------- - array of the deconvolved signal values - """ - - fit_func = partial(fit_func, kernel=kernel, n_iters=n_iters, k=k, clip=clip) - n = y.shape[0] - lam_cv_loss = np.zeros((lam_cv_grid.shape[0],)) - gam_cv_loss = np.zeros((gam_cv_grid.shape[0],)) - - # use le3o cv for finding lambda, this controls smoothness of entire curve - for i in range(3): - test_split = np.zeros((n,), dtype=bool) - test_split[i::3] = True - for j, reg_par in enumerate(lam_cv_grid): - x_hat = np.full((n,), np.nan) - x_hat[~test_split] = fit_func(y=y[~test_split], x=x[~test_split], - lam=reg_par, gam=0) - x_hat = _impute_with_neighbors(x_hat) - y_hat = _fft_convolve(x_hat, kernel) - lam_cv_loss[j] += np.sum((y[test_split] - y_hat[test_split]) ** 2) - - lam = lam_cv_grid[np.argmin(lam_cv_loss)] - - # use forward cv to find gamma, this controls smoothness of right-boundary curve - for i in range(1, gam_n_folds + 1): - for j, reg_par in enumerate(gam_cv_grid): - x_hat = np.full((n - i + 1,), np.nan) - x_hat[:(n - i)] = fit_func(y=y[:(n - i)], x=x[:(n - i)], gam=reg_par, lam=lam) - pos = x[:(n - i + 1)] - x_hat[-1] = _linear_extrapolate(pos[-3], x_hat[-3], - pos[-2], x_hat[-2], - pos[-1]) - y_hat = _fft_convolve(x_hat, kernel) - gam_cv_loss[j] += np.sum((y[:(n - i + 1)][-1:] - y_hat[-1:]) ** 2) - - gam = gam_cv_grid[np.argmin(gam_cv_loss)] - if verbose: - print(f"Chosen parameters: lam:{lam:.4}, gam:{gam:.4}") - x_hat = fit_func(y=y, x=x, lam=lam, gam=gam) - return x_hat - - -def _construct_convolution_matrix(signal: np.ndarray, - kernel: np.ndarray, - norm: bool) -> np.ndarray: - """ - Constructs full convolution matrix (n+m-1) x n, - where n is the signal length and m the kernel length. - - Parameters - ---------- - signal - array of values to convolve - kernel - array with convolution kernel values - norm - boolean whether to normalize rows to sum to sum(kernel) - - Returns - ------- - convolution matrix - """ - n = signal.shape[0] - padding = np.zeros(n - 1) - first_col = np.r_[kernel, padding] - first_row = np.r_[kernel[0], padding] - P = toeplitz(first_col, first_row) - if norm: - scale = P.sum(axis=1) / kernel.sum() - return P / scale[:, np.newaxis] - return P - - -def _soft_thresh(x: np.ndarray, lam: float) -> np.ndarray: - """Perform soft-thresholding of x with threshold lam.""" - return np.sign(x) * np.maximum(np.abs(x) - lam, 0) - - -def _fft_convolve(signal: np.ndarray, kernel: np.ndarray) -> np.ndarray: - """ - Perform 1D convolution in the frequency domain. - - Parameters - ---------- - signal - array of values to convolve - kernel - array with convolution kernel values - - Returns - ------- - array with convolved signal values - """ - n = signal.shape[0] - m = kernel.shape[0] - signal_freq = np.fft.fft(signal, n + m - 1) - kernel_freq = np.fft.fft(kernel, n + m - 1) - return np.fft.ifft(signal_freq * kernel_freq).real[:n] - - -def _impute_with_neighbors(x: np.ndarray) -> np.ndarray: - """ - Impute missing values with the average of the elements immediately - before and after. - - Parameters - ---------- - x - Signal with missing values. - - Returns - ------- - Imputed signal. - """ - # handle edges - if np.isnan(x[0]): - x[0] = x[1] - if np.isnan(x[-1]): - x[-1] = x[-2] - imputed_x = np.copy(x) - for i, (a, b, c) in enumerate(zip(x, x[1:], x[2:])): - if np.isnan(b): - imputed_x[i + 1] = (a + c) / 2 - assert np.isnan(imputed_x).sum() == 0 - return imputed_x - - -def _construct_poly_interp_mat(x: np.ndarray, k: int = 3): - """ - Generate polynomial interpolation matrix. - - Currently only implemented for 3rd order polynomials. - - Parameters - ---------- - x - Input signal. - k - Order of the polynomial interpolation. - - Returns - ------- - n x (n - k - 1) matrix. - """ - assert k == 3, "poly interpolation matrix only constructed for k=3" - n = x.shape[0] - S = np.zeros((n, n - k - 1)) - S[0, 0] = (x[3] - x[0]) / (x[3] - x[2]) - S[0, 1] = (x[0] - x[2]) / (x[3] - x[2]) - S[1, 0] = (x[3] - x[1]) / (x[3] - x[2]) - S[1, 1] = (x[1] - x[2]) / (x[3] - x[2]) - S[n - 2, n - 6] = (x[n - 3] - x[n - 2]) / (x[n - 3] - x[n - 4]) - S[n - 2, n - 5] = (x[n - 2] - x[n - 4]) / (x[n - 3] - x[n - 4]) - S[n - 1, n - 6] = (x[n - 3] - x[n - 1]) / (x[n - 3] - x[n - 4]) - S[n - 1, n - 5] = (x[n - 1] - x[n - 4]) / (x[n - 3] - x[n - 4]) - S[2:(n - 2), :] = np.eye(n - k - 1) - return S - - -def _linear_extrapolate(x0, y0, x1, y1, x_new): - """Linearly extrapolate the value at x_new from 2 given points (x0, y0) and (x1, y1).""" - return y0 + ((x_new - x0) / (x1 - x0)) * (y1 - y0) diff --git a/nowcast/delphi_nowcast/epidata.py b/nowcast/delphi_nowcast/epidata.py deleted file mode 100644 index 8f4ceaf19..000000000 --- a/nowcast/delphi_nowcast/epidata.py +++ /dev/null @@ -1,160 +0,0 @@ -"""Functions for interfacing with Epidata.""" -import os -from datetime import datetime, date -from itertools import product -from typing import Tuple, List, Dict - -from delphi_epidata import Epidata -from numpy import isnan -from pandas import date_range - -from .data_containers import LocationSeries, SensorConfig - -EPIDATA_START_DATE = 20200101 - - -def get_indicator_data(sensors: List[SensorConfig], - locations: List[LocationSeries], - as_of: date) -> Dict[Tuple, LocationSeries]: - """ - Given a list of sensors and locations, asynchronously gets covidcast data for all combinations. - - Parameters - ---------- - sensors - list of SensorConfigs for sensors to retrieve. - locations - list of LocationSeries, one for each location desired. This is only used for the list of - locations; none of the dates or values are used. - as_of - Date that the data should be retrieved as of. - Returns - ------- - Dictionary of {(source, signal, geo_type, geo_value): LocationSeries} containing indicator - data, - """ - # gets all available data up to as_of day for now, could be optimized to only get a window - output = {} - all_combos = product(sensors, locations) - as_of_str = as_of.strftime("%Y%m%d") - all_params = [ - {"source": "covidcast", - "data_source": sensor.source, - "signals": sensor.signal, - "time_type": "day", - "geo_type": location.geo_type, - "geo_value": location.geo_value, - "time_values": f"{EPIDATA_START_DATE}-{as_of_str}", - "as_of": as_of_str} - for sensor, location in all_combos - ] - responses = Epidata.async_epidata(all_params) - for response, params in responses: - # -2 = no results, 1 = success. Truncated data or server errors may lead to this Exception. - if response["result"] not in (-2, 1): - raise Exception(f"Bad result from Epidata: {response['message']}") - data = LocationSeries( - geo_value=params["geo_value"], - geo_type=params["geo_type"], - data={datetime.strptime(str(i["time_value"]), "%Y%m%d").date(): i["value"] - for i in response.get("epidata", []) if not isnan(i["value"])} - ) - if data.data: - output[(params["data_source"], - params["signals"], - params["geo_type"], - params["geo_value"])] = data - return output - - -def get_historical_sensor_data(sensor: SensorConfig, - location: LocationSeries, - start_date: date, - end_date: date) -> Tuple[LocationSeries, list]: - """ - Query Epidata API for historical sensorization data. - - Will only return values if they are not null. If any days are null or are not available, - they will be listed as missing. - - Parameters - ---------- - sensor - SensorConfig specifying which sensor to retrieve. - location - LocationSeries for the location to get. - start_date - First day to retrieve (inclusive). - end_date - Last day to retrieve (inclusive). - Returns - ------- - Tuple of (LocationSeries containing non-na data, list of dates without valid data). If no - data was found, an empty LocationSeries is returned. - """ - response = Epidata.covidcast_nowcast( - data_source=sensor.source, - signals=sensor.signal, - time_type="day", - geo_type=location.geo_type, - time_values=Epidata.range(start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")), - geo_value=location.geo_value, - sensor_names=sensor.name, - lag=sensor.lag) - all_dates = [i.date() for i in date_range(start_date, end_date)] - if response["result"] == 1: - location.data = {datetime.strptime(str(i["time_value"]), "%Y%m%d").date(): i["value"] - for i in response.get("epidata", []) if not isnan(i["value"])} - missing_dates = [i for i in all_dates if i not in location.dates] - return location, missing_dates - if response["result"] == -2: # no results - print("No historical results found") - return location, all_dates - raise Exception(f"Bad result from Epidata: {response['message']}") - - -def export_to_csv(value: LocationSeries, - sensor: SensorConfig, - as_of_date: date, - receiving_dir: str - ) -> List[str]: - """ - Save value to csv for upload to Epidata database. - - Parameters - ---------- - value - LocationSeries containing data. - sensor - SensorConfig corresponding to value. - as_of_date - As_of date for the indicator data used to train the sensor. - receiving_dir - Export directory for Epidata acquisition. - Returns - ------- - Filepath of exported files - """ - export_dir = os.path.join( - receiving_dir, - f"issue_{as_of_date.strftime('%Y%m%d')}", - sensor.source - ) - os.makedirs(export_dir, exist_ok=True) - exported_files = [] - for time_value in value.dates: - export_file = os.path.join( - export_dir, - f"{time_value.strftime('%Y%m%d')}_{value.geo_type}_{sensor.signal}.csv" - ) - if os.path.exists(export_file): - with open(export_file, "a") as f: - f.write( - f"{sensor.name},{value.geo_value},{value.data.get(time_value, '')}\n") - else: - with open(export_file, "a") as f: - f.write("sensor_name,geo_value,value\n") - f.write( - f"{sensor.name},{value.geo_value},{value.data.get(time_value, '')}\n") - exported_files.append(export_file) - return exported_files diff --git a/nowcast/delphi_nowcast/nowcast_fusion/__init__.py b/nowcast/delphi_nowcast/nowcast_fusion/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/nowcast/delphi_nowcast/nowcast_fusion/covariance.py b/nowcast/delphi_nowcast/nowcast_fusion/covariance.py deleted file mode 100644 index e7c9894c2..000000000 --- a/nowcast/delphi_nowcast/nowcast_fusion/covariance.py +++ /dev/null @@ -1,213 +0,0 @@ -""" -======================================================================== -THIS CODE IS COPIED FROM -https://github.com/cmu-delphi/nowcast/blob/main/src/fusion/covariance.py -======================================================================== - -Maximum likelihood covariance estimation that is robust to insufficient and -missing values. -""" - -# standard library -import abc - -# third party -import numpy as np -import scipy.linalg -import scipy.stats - -# first party -from .opt_1d import maximize - - -def nancov(X): - """ - Estimate the covariance matrix of partially observed data, ignoring nans. - The covariance matrix is the elementwise quotient of the returned numerator - and denominator matrices. Data columns are assumed to be unbiased. - - Denominator elements may be zero, leading to undefined covariance. Further, - the resulting matrix may have nonpositive eigenvalues. As a result, it may - not be invertable or positive definite. - - input: - X: data matrix (N x P) (N observations, P variables) - - output: - numerator (P x P), denominator (P x P) - """ - - # a helper function which computes the dot of a matrix with itself - tdot = lambda M: np.dot(M.T, M) - - # The numerator is the dot product of each column, where nans are replaced - # with zeros. The denominator is the dot product of each column, where nans - # are replaced with zeros and everything else is replaced with ones. - return tdot(np.nan_to_num(X)), tdot(np.isfinite(X).astype(np.float)) - - -def log_likelihood(cov, data): - """ - Return the log-likelihood of data, given parameters. The mean is assumed to - be zero, or a vector of zeros, as appropriate. - - input: - cov: covariance matrix (P x P) (P variables) - data: data matrix (N x P) (N observations) - - output: - log-likelihood in the range (-np.inf, 0) - """ - mean = np.zeros(cov.shape[0]) - try: - # Attempt to compute the log likelihood. This will fail with `ValueError` - # if the covariance matrix is not positive semidefinite. Otherwise, this - # will fail with `LinAlgError` if the covariance matrix is near-singular. - return np.sum(scipy.stats.multivariate_normal.logpdf(data, mean, cov=cov)) - except (ValueError, np.linalg.LinAlgError): - # Return log likelihood of negative infinity when the covariance matrix is - # not firmly positive definite. - return -np.inf - - -class ShrinkageMethod(metaclass=abc.ABCMeta): - """ - An abstract class representing a method for shrinking a covariance matrix. - This may be necessary, for example, when there are missing values or too few - observations. The goal is to find the positive definite matrix which - maximizes the multivariate normal likelihood of the available data. - """ - - @abc.abstractmethod - def get_alpha_bounds(self): - raise NotImplementedError() - - @abc.abstractmethod - def get_cov(self, alpha): - raise NotImplementedError() - - -class DenominatorModifier(ShrinkageMethod): - """ - An abstract subclass of ShrinkageMethod representing methods that operate by - modifying the offdiagonal entries of the denominator of the empirical - covariance matrix. - """ - - def __init__(self, cov_num, cov_den, num_obs): - self.offdiag = np.ones(cov_den.shape) - np.eye(cov_den.shape[0]) - self.cov_num = cov_num - self.cov_den = cov_den - self.cov_den_diag = cov_den * (1 - self.offdiag) - self.cov_den_offdiag = cov_den * self.offdiag - n = cov_num.shape[0] - self.num_obs = num_obs - self.needed_obs = max(num_obs, (n + 1) * n / 2) - - -class BlendDiagonal0(DenominatorModifier): - """Multiply the offdiagonal entries of the denominator by a constant.""" - - def __init__(self, cov_num, cov_den, num_obs): - super().__init__(cov_num, np.maximum(cov_den, 1), num_obs) - - def get_alpha_bounds(self): - return [1, self.needed_obs] - - def get_cov(self, alpha): - return self.cov_num / (self.cov_den_diag + self.cov_den_offdiag * alpha) - - -class BlendDiagonal1(DenominatorModifier): - """Add a constant to the offdiagonal entries of the denominator.""" - - def __init__(self, cov_num, cov_den, num_obs): - super().__init__(cov_num, cov_den, num_obs) - - def get_alpha_bounds(self): - low = 0 if np.min(self.cov_den) > 0 else 1 - return [low, self.needed_obs] - - def get_cov(self, alpha): - return self.cov_num / (self.cov_den + self.offdiag * alpha) - - -class BlendDiagonal2(DenominatorModifier): - """Blend offdiagonal entries of the denominator with N.""" - - def __init__(self, cov_num, cov_den, num_obs): - super().__init__(cov_num, cov_den, num_obs) - - def get_alpha_bounds(self): - low = 0 if np.min(self.cov_den) > 0 else 1 - return [low, self.needed_obs] - - def get_cov(self, alpha): - a = alpha / self.needed_obs - x, y = self.cov_den_offdiag, self.offdiag * self.needed_obs - return self.cov_num / (self.cov_den_diag + (1 - a) * x + a * y) - - -def posdef_max_likelihood_objective(X, shrinkage): - """ - Return an objective function with which to find an optimal shrinkage value. - Optimal is defined as the value which maximizes the likelihood of the - shrunk covariance, given the data. If the shrunk covariance matrix is not - positive definite, then the objective function returns negative infinity. - - input: - X: data matrix (N x P) (N observations, P variables) - shrinkage: an instance of absract class ShrinkageMethod - - output: - an objective function suitable the mle_cov function - """ - - # replace missing values (nans) with zeros - X0 = np.nan_to_num(X) - - # define an objective function, given the data - objective = lambda alpha: log_likelihood(shrinkage.get_cov(alpha), X0) - - # return the objective function - return objective - - -def mle_cov(X, shrinkage_class): - """ - Find the covariance matrix that maximizes the likelihood of a multivariate - normal disribution, given observed data. It is assumed that the data is - already unbiased. The data may have mising values and may not have a - sufficient number of observations to uniquely determine the covariance - matrix. The returned covariance matrix is guaranteed to be positive definite, - making it suitable for applications (for example, sensorization fusion) which - require a precision matrix. - - input: - X: data matrix (N x P) (N observations, P variables) - shrinkage_class: a concrete subclass of ShrinkageMethod - - output: - the shrunk covariance matrix with maximum likelihood (P x P) - """ - - # sanity check - if X.shape[0] < 2: - raise Exception('need at least two observations to estimate covariance') - - # get the numerator and denominator of the empirical covariance matrix - cov_num, cov_den = nancov(X) - - # instantiate the shrinkage method - shrinkage = shrinkage_class(cov_num, cov_den, X.shape[0]) - - # obtain an objective function - low, high = shrinkage.get_alpha_bounds() - objective = posdef_max_likelihood_objective(X, shrinkage) - stop = lambda n_obj, d_alpha, max_ll: d_alpha <= 1 - - # let the optimizer find a good shrinkage parameter - alpha, ll = maximize(low, high, objective, stop) - - # return the shrunk covariance matrix with maximum likelihood - return shrinkage.get_cov(alpha) diff --git a/nowcast/delphi_nowcast/nowcast_fusion/fusion.py b/nowcast/delphi_nowcast/nowcast_fusion/fusion.py deleted file mode 100644 index e4d667922..000000000 --- a/nowcast/delphi_nowcast/nowcast_fusion/fusion.py +++ /dev/null @@ -1,222 +0,0 @@ -""" -==================================================================== -THIS CODE IS COPIED FROM -https://github.com/cmu-delphi/nowcast/blob/main/src/fusion/fusion.py -==================================================================== - -An implementation of the sensorization nowcast_fusion kernel and supporting methods. All -inputs and outputs are assumed to be of type numpy.ndarray. - -See also: - Farrow DC. "Modeling the Past, Present, and Future of Influenza" (Doctoral - dissertation). 2016. -""" - -# standard library -from fractions import Fraction - -# third party -import numpy as np - - -def fuse(z, R, H): - """ - Fuse measurement distribution into state distribution, given a linear mapping - from state space to measurement space. - - input: - z: row vector of sensorization measurements (1 x I) - R: sensorization noise covariance matrix (I x I) - H: matrix mapping from state space to measurement space (I x S) - - output: - - the mean of the system state distribution (1 x S) - - the covariance of the system state distribution (S x S) - """ - - # precompute common product - RiH = np.dot(np.linalg.inv(R), H) - - # return the system state distribution - P = np.linalg.inv(np.dot(H.T, RiH)) - x = np.dot(np.dot(z, RiH), P) - return (x, P) - - -def extract(x, P, W): - """ - Extract output distribution from state distribution, given a linear mapping - from state space to output space. - - The diagonal elements of the output covariance matrix are the variance of - each output variable. - - input: - x: row vector of state mean (1 x S) - P: state covariance matrix (S x S) - W: matrix mapping from state space to output space (O x S) - - output: - - the mean of the output distribution (1 x O) - - the covariance of the output distribution (O x O) - """ - - # return the output distribution - S = np.dot(np.dot(W, P), W.T) - y = np.dot(x, W.T) - return (y, S) - - -def eliminate(X): - """ - Compute the canonical reduced row echelon form of the given matrix. The - Gauss-Jordan algorithm is used to compute the elimination. The matrix is - modified in-place. - - For numerical stability, it is strongly suggested that the elements of the - input matrix be Fractions. Although discouraged, matrices of floats are also - supported. - - input: - X: the input matrix - - output: - the matrix in reduced row echelon form - """ - - # dimensions - num_r, num_c = X.shape - - # forward elimination - r, c = 0, 0 - while r < num_r and c < num_c: - values = [float(x) for x in X[r:, c]] - i = r + np.argmax(np.abs(values)) - if X[i, c] != 0: - if i != r: - temp = X[i, :].copy() - X[i, :] = X[r, :] - X[r, :] = temp - X[r, c:] /= X[r, c] - for i in range(r + 1, num_r): - X[i, c:] -= X[i, c] * X[r, c:] - r += 1 - c += 1 - - # backward substitution - for r in range(num_r - 1, -1, -1): - for c in range(num_c): - if X[r, c] != 0: - for i in range(r - 1, -1, -1): - X[i, c:] -= X[i, c] * X[r, c:] - break - - # return the result - return X - - -def matmul(*matrices): - """ - Compute the product of the given matrices. The matrices must all have - elements of type Fraction or float. The type of the output will be the same - as the type of the input. - - This function is not particularly efficient -- O(n^3) -- and is intended only - for computing the product of matrices of fractions. The product of matrices - of floats can be computed more efficiently by numpy or scipy. - - input: - *matrices: the input matrices - - output: - the product of inputs matrices - """ - - if len(matrices) == 1: - return matrices[0] - elif len(matrices) == 2: - A, B = matrices - (rows, size), (temp, cols) = A.shape, B.shape - if size != temp: - raise Exception('matrix dimensions do not match') - dot = lambda U, V: sum(u * v for (u, v) in zip(U, V)) - vals = [[dot(A[r, :], B[:, c]) for c in range(cols)] for r in range(rows)] - return np.array(vals) - else: - return matmul(matrices[0], matmul(*matrices[1:])) - - -def determine_statespace(H0, W0): - """ - Return matrices mapping from latent statespace to input space and output - space. These are the matrices H and W, respectively, used in the sensorization - nowcast_fusion kernel. Since some outputs may be indeterminate, the indices of the - fully determined rows are returned. This may be used, for example, to find - the set of outputs which make up the rows of the returned W matrix. - - inputs: - H0: map from full statespace to inputs (I x S) - W0: map from full statespace to outputs (O x S) - - outputs: - - the matrix H, mapping subspace to inputs (I x S') - - the matrix W, mapping subspace to outputs (O' x S') - - list of row indices of W0 that make up W (O') - - notes: - - S' <= S and O' <= O - - for numerical stability, inputs should be matrices of Fractions - """ - - # helper function to convert a float matrix into a fraction matrix - fractions = lambda X: np.array([[Fraction(x) for x in row] for row in X]) - - # Find a set of basis vectors that span the same subspace (of the full - # statespace) that is spanned by the input vectors in H0. The result is a - # minimal set of elements from which all inputs can be unambiguously - # determined. - B = eliminate(H0.copy()) - - # the dimensions of full statespace (number of columns) - size = B.shape[1] - - # the dimensions of the subspace (number of non-empty rows) - rank = np.sum(np.sum(np.abs(B), axis=1) > 0) - - # B should be a square matrix with rows of zeros below rows of basis vectors - num_rows = B.shape[0] - if num_rows < size: - Z = fractions(np.zeros((size - num_rows, size))) - B = np.vstack((B, Z)) - elif num_rows > size: - B = B[:size, :] - - # Attempt to build each input and output vector as a linear combination of - # the subspace basis vectors. Since B may not be full rank, it may not be - # invertible. Instead, solve by eliminating the augmented matrix of B - # (transposed) with the identity matrix. After elimination, the (transposed) - # inverse of B is contained within the augmented matrix. - I = fractions(np.eye(size)) - BtI = np.hstack((B.T, I)) - IBit = eliminate(BtI) - Bi = IBit[:, size:].T - - # possible, or "actual", solutions are in the leftmost columns - # impossible, or "pseudo", solutions are in the rightmost columns - Bi_actual, Bi_pseudo = Bi[:, :rank], Bi[:, rank:] - - # compute H, the map from statespace B to inputs - # all inputs are within the span of statespace B - H = matmul(H0, Bi_actual) - - # compute W, the map from statespace B to outputs - # outputs not within the span of statespace B must be excluded - W_actual = matmul(W0, Bi_actual) - W_pseudo = matmul(W0, Bi_pseudo) - - # only keep rows where the coeficient of all pseudo basis vectors is zero - actual_rows = np.flatnonzero(np.sum(np.abs(W_pseudo), axis=1) == 0) - W = W_actual[actual_rows, :] - - # return H, W, and the indices of the rows of W0 that make up W - return H, W, actual_rows diff --git a/nowcast/delphi_nowcast/nowcast_fusion/opt_1d.py b/nowcast/delphi_nowcast/nowcast_fusion/opt_1d.py deleted file mode 100644 index 40b615a74..000000000 --- a/nowcast/delphi_nowcast/nowcast_fusion/opt_1d.py +++ /dev/null @@ -1,85 +0,0 @@ -""" -==================================================================== -THIS CODE IS COPIED FROM -https://github.com/cmu-delphi/nowcast/blob/main/src/fusion/opt_1d.py -==================================================================== - -Provides derivative-free optimization over a bounded, one-dimensional interval. - -The function to optimize doesn't have to be convex, but it is assumed that it -has a single maximum and is monotonically decreasing away from that maximum in -both directions. - -More general optimization problems can be solved using, for example, the -Nelder-Mead algorithm. - -See also: neldermead.py -""" - - -def maximize(low, high, objective, stop): - """ - Find the scalar argument which maximizes the objective function. The search - space is bounded to the closed interval [low, high]. - - input: - low: the lower bound of the search interval - high: the upper bound of the search interval - objective: an objective function, which takes and returns a scalar - stop: a function which returns whether the search should be stopped, given - the following parameters: - - number of times the objective function has been called - - width of the current search interval - - the maximum value of the objective function so far - - output: - a tuple consisting of: - - the argument which maximizes the objective function - - the maximum value of the objective function - """ - - # The algorithm below is inspired by the Nelder-Mead and bisection methods. - - # This method tracks a set of four points and their associated values, as - # returned by the objective function. One of the values must be less than or - # equal to the remaining values. Its point -- the argmin -- is iteratively - # updated. If the argmax is not on the boundary, then the argmin is updated - # to bisect the two argmax points. Otherwise, the two argmin points are - # updated to trisect the two argmax points. Iteration continues until the - # stop function returns truth. - - diff = high - low - a, b, c, d = low, low + 1 / 3 * diff, low + 2 / 3 * diff, high - w, x, y, z = [objective(i) for i in (a, b, c, d)] - argmax = lambda: max(enumerate([w, x, y, z]), key=lambda k: k[1])[0] - n = 4 - i = argmax() - while not stop(n, d - a, [w, x, y, z][i]): - if i == 0: - diff = b - a - b, c, d = a + 1 / 3 * diff, a + 2 / 3 * diff, b - x, y, z = objective(b), objective(c), x - n += 2 - elif i == 3: - diff = d - c - a, b, c = c, c + 1 / 3 * diff, c + 2 / 3 * diff - w, x, y = y, objective(b), objective(c) - n += 2 - elif i == 1: - if c - b > b - a: - c, d = (b + c) / 2, c - y, z = objective(c), y - else: - b, c, d = (a + b) / 2, b, c - x, y, z = objective(b), x, y - n += 1 - else: - if d - c > c - b: - a, b, c = b, c, (c + d) / 2 - w, x, y = x, y, objective(c) - else: - a, b = b, (b + c) / 2 - w, x = x, objective(b) - n += 1 - i = argmax() - return ([a, b, c, d][i], [w, x, y, z][i]) diff --git a/nowcast/delphi_nowcast/run.py b/nowcast/delphi_nowcast/run.py deleted file mode 100644 index c77b72a21..000000000 --- a/nowcast/delphi_nowcast/run.py +++ /dev/null @@ -1,18 +0,0 @@ -# -*- coding: utf-8 -*- -"""Functions to call when running the function. - -This module should contain a function called `run_module`, that is executed -when the module is run with `python -m delphi_nowcast`. -""" - - -def run_module(): - """ - Skeleton to run delphi_nowcast indicator. - - Returns - ------- - prints the updated signal names - """ - # params = read_params() - return diff --git a/nowcast/delphi_nowcast/sensorization/__init__.py b/nowcast/delphi_nowcast/sensorization/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/nowcast/delphi_nowcast/sensorization/ar_model.py b/nowcast/delphi_nowcast/sensorization/ar_model.py deleted file mode 100644 index 34feb9b74..000000000 --- a/nowcast/delphi_nowcast/sensorization/ar_model.py +++ /dev/null @@ -1,113 +0,0 @@ -"""Fit autoregression models.""" - -from datetime import timedelta, date -from typing import Tuple - -import numpy as np - -from ..data_containers import LocationSeries - - -def compute_ar_sensor(day: date, - values: LocationSeries, - ar_size: int, - lambda_: float) -> float: - """ - Fit AR model through least squares and get sensorization value for a given date. - - This takes in a LocationSeries objects for the quantity of interest as well as a date to - predict and some model parameters. The model is trained on all data before the specified date, - and then the predictor at the given date is fed into the model to get the returned sensor value - for that day. - - Missing values are imputed with mean imputation, though currently this function is called - on data that has no nan values. - - Parameters - ---------- - day - date to get sensor value for - values - LocationSeries containing covariate values. - ar_size - Order of autoregressive model. - lambda_ - l2 regularization coefficient. - - Returns - ------- - Float value of sensor on `date` - """ - previous_day = day - timedelta(1) - try: - window = values.get_data_range(min(values.dates), previous_day, "mean") - except ValueError: - return np.nan - B, means, stddevs = _ar_fit(np.array(window), ar_size, lambda_) - if B is None: - return np.nan - date_X = np.hstack((1, - (np.array(window[-ar_size:]) - means) / stddevs)) - Yhat = (date_X @ B)[0] - # Taken from https://github.com/dfarrow0/covidcast-nowcast/blob/dfarrow/sf/src/sf/ar_sensor.py: - # ground truth in some locations is a zero vector, which leads to perfect AR fit, zero - # variance, and a singular covariance matrix so as a small hack, add some small noise. - np.random.seed(int(day.strftime("%Y%m%d"))) - Yhat += np.random.normal(0, 0.1) - # as a huge hack, add more noise to prevent AR from unreasonably dominating - # the nowcast since AR3 can nearly exactly predict some trendfiltered curves. - np.random.seed(int(day.strftime("%Y%m%d"))) - Yhat += np.random.normal(0, 0.1 * np.maximum(0, np.mean(Yhat))) - return Yhat - - -def _ar_fit(values: np.array, - ar_size: int, - lambda_: float) -> Tuple[np.array, np.array, np.array]: - """ - Fit AR coefficients with OLS. Standardizes and fits an intercept. - - Adapted from - https://github.com/dfarrow0/covidcast-nowcast/blob/dfarrow/sf/src/sf/ar_sensor.py - - Parameters - ---------- - values - Array of values to train on. - ar_size - Order of autoregressive model. - lambda_ - l2 regularization coefficient. - - Returns - ------- - Tuple of (fitted coefficients, mean vector, stddev vector). - """ - num_observations = len(values) - ar_size - if num_observations < 2 * (ar_size + 1): # 1 for intercept - return None, None, None - X = np.hstack([values[j:-(ar_size - j), None] for j in range(ar_size)]) - X, means, stddevs = _standardize(X) - Y = values[ar_size:, None] - B = np.linalg.inv(X.T @ X + lambda_ * np.eye(ar_size)) @ X.T @ Y - B = np.concatenate(([[np.mean(Y)]], B)) - return B, means, stddevs - - -def _standardize(data: np.ndarray) -> Tuple[np.ndarray, np.array, np.array]: - """ - Standardize a matrix and return the mean and stddevs for each column - - Parameters - ---------- - data - Numpy matrix to standardize - - Returns - ------- - Standardize matrix, mean vector, stddev vector. - """ - means = np.mean(data, axis=0) - stddevs = np.std(data, axis=0, ddof=1) - data = (data - means) / stddevs - return data, means, stddevs diff --git a/nowcast/delphi_nowcast/sensorization/regression_model.py b/nowcast/delphi_nowcast/sensorization/regression_model.py deleted file mode 100644 index a22cc01c3..000000000 --- a/nowcast/delphi_nowcast/sensorization/regression_model.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Fit linear regression mdels.""" - -from datetime import timedelta, date - -import numpy as np - -from ..data_containers import LocationSeries - -MIN_SAMPLE_SIZE = 5 # arbitrarily chosen for now. - - -def compute_regression_sensor(day: date, - covariate: LocationSeries, - response: LocationSeries, - include_intercept: bool) -> float: - """ - Fit regression model and get sensorization value for a given date. - - This takes two LocationSeries objects for a covariate and response as well as a date to - predict and some model parameters. The model is trained on all data before the specified date, - and then the predictor at the given date is fed into the model to get the returned sensor value - for that day. - - For now, this function assumes there are no gaps in the data. - - It does not normalize the data yet. - - Parameters - ---------- - day - date to get sensor value for - covariate - LocationSeries containing covariate values. - response - LocationSeries containing response values. - include_intercept - Boolean on whether or not to include intercept. - - Returns - ------- - Float value of sensor on `date` - """ - previous_day = day - timedelta(1) - try: - first_day = max(min(covariate.dates), min(response.dates)) - train_Y = response.get_data_range(first_day, previous_day) - train_covariates = covariate.get_data_range(first_day, previous_day) - except ValueError: - return np.nan - if not train_Y: - return np.nan - non_nan_values = [(i, j) for i, j in zip(train_Y, train_covariates) - if not (np.isnan(i) or np.isnan(j))] - train_Y, train_covariates = zip(*non_nan_values) if non_nan_values else ([], []) - if len(train_Y) < MIN_SAMPLE_SIZE: - print("insufficient observations") - return np.nan - train_Y = np.array(train_Y) - train_covariates = np.array(train_covariates) - X = np.ones((len(train_covariates), 1 + include_intercept)) - X[:, -1] = train_covariates - B = np.linalg.inv(X.T @ X) @ X.T @ train_Y - date_val = covariate.data.get(day, np.nan) - date_X = np.array((1, date_val)) if include_intercept else np.array([date_val]) - return date_X @ B diff --git a/nowcast/delphi_nowcast/sensorization/sensor.py b/nowcast/delphi_nowcast/sensorization/sensor.py deleted file mode 100644 index 39fa07876..000000000 --- a/nowcast/delphi_nowcast/sensorization/sensor.py +++ /dev/null @@ -1,102 +0,0 @@ -"""Functions to run sensorization.""" -from collections import defaultdict -from typing import List, DefaultDict -from datetime import timedelta, date - -import numpy as np - -from .ar_model import compute_ar_sensor -from .regression_model import compute_regression_sensor -from ..data_containers import LocationSeries, SensorConfig -from ..constants import AR_ORDER, AR_LAMBDA, REG_INTERCEPT -from ..epidata import get_indicator_data, get_historical_sensor_data, export_to_csv - - -def compute_sensors(as_of_date: date, - regression_sensors: List[SensorConfig], - ground_truth_sensor: SensorConfig, - ground_truths: List[LocationSeries], - export_dir: str = "", - ) -> DefaultDict[SensorConfig, List[LocationSeries]]: - """ - Parameters - ---------- - as_of_date - Date that the data should be retrieved as of. - regression_sensors - list of SensorConfigs for regression sensors to compute. - ground_truth_sensor - SensorConfig of the ground truth signal which is used for the AR sensor. - ground_truths - list of LocationSeries, one for each location desired. - export_dir - string of directory to output data. If empty string, no output will be exported. - Returns - ------- - Dict where keys are sensor tuples and values are lists, where each list element is a - LocationSeries holding sensor data for a location. Each LocationSeries will only have a - single value for the date (as_of_date - lag), e.g. if as_of_date is 20210110 and lag=5, - the output will be values for 20200105. - """ - output = defaultdict(list) - indicator_data = get_indicator_data(regression_sensors, ground_truths, as_of_date) - for loc in ground_truths: - ground_truth_pred_date = as_of_date - timedelta(ground_truth_sensor.lag) - ar_sensor = compute_ar_sensor(ground_truth_pred_date, loc, AR_ORDER, AR_LAMBDA) - if not np.isnan(ar_sensor): - output[ground_truth_sensor].append( - LocationSeries(loc.geo_value, loc.geo_type, {ground_truth_pred_date: ar_sensor}) - ) - for sensor in regression_sensors: - sensor_pred_date = as_of_date - timedelta(sensor.lag) - covariates = indicator_data.get( - (sensor.source, sensor.signal, loc.geo_type, loc.geo_value) - ) - if not covariates: - # TODO convert to log statements #689 # pylint: disable=fixme - print(f"No data: {(sensor.source, sensor.signal, loc.geo_type, loc.geo_value)}") - continue - reg_sensor = compute_regression_sensor(sensor_pred_date, covariates, loc, REG_INTERCEPT) - if not np.isnan(reg_sensor): - output[sensor].append( - LocationSeries(loc.geo_value, loc.geo_type, {sensor_pred_date: reg_sensor}) - ) - if export_dir: - for sensor, locations in output.items(): - for loc in locations: - print(export_to_csv(loc, sensor, as_of_date, export_dir)) - return output - - -def historical_sensors(start_date: date, - end_date: date, - sensors: List[SensorConfig], - ground_truths: List[LocationSeries], - ) -> DefaultDict[SensorConfig, List[LocationSeries]]: - """ - Retrieve past sensorized values from start to end date at given locations for specified sensors. - Parameters - ---------- - start_date - first day to attempt to get sensor values for. - end_date - last day to attempt to get sensor values for. - sensors - list of SensorConfigs for sensors to retrieve. - ground_truths - list of LocationSeries, one for each location desired. This is only used for the list of - locations; none of the dates or values are used. - Returns - ------- - Dict where keys are sensor tuples and values are lists, where each list element is a - LocationSeries holding sensor data for a location. - """ - output = defaultdict(list) - for location in ground_truths: - for sensor in sensors: - sensor_vals, _ = get_historical_sensor_data( - sensor, location, start_date, end_date - ) - if sensor_vals.data: - output[sensor].append(sensor_vals) - return output diff --git a/nowcast/params.json.template b/nowcast/params.json.template deleted file mode 100644 index 0a771326a..000000000 --- a/nowcast/params.json.template +++ /dev/null @@ -1,6 +0,0 @@ -{ - "static_file_dir": "./static", - "export_dir": "./receiving", - "cache_dir": "./cache", - "wip_signal": "" -} diff --git a/nowcast/receiving/.gitignore b/nowcast/receiving/.gitignore deleted file mode 100644 index e69de29bb..000000000 diff --git a/nowcast/setup.py b/nowcast/setup.py deleted file mode 100644 index 54e88ee80..000000000 --- a/nowcast/setup.py +++ /dev/null @@ -1,31 +0,0 @@ -from setuptools import setup -from setuptools import find_packages - -required = [ - "aiohttp", - "covidcast", - "delphi-utils", - "numpy", - "pandas", - "pydocstyle", - "pytest", - "pytest-cov", - "pylint==2.8.3", - "scipy" -] - -setup( - name="delphi_nowcast", - version="0.1.0", - description="Nowcasts", - author="", - author_email="", - url="https://github.com/cmu-delphi/covidcast-indicators", - install_requires=required, - classifiers=[ - "Development Status :: 5 - Production/Stable", - "Intended Audience :: Developers", - "Programming Language :: Python :: 3.8", - ], - packages=find_packages(), -) diff --git a/nowcast/static/.gitignore b/nowcast/static/.gitignore deleted file mode 100644 index e69de29bb..000000000 diff --git a/nowcast/tests/deconvolution/test_deconvolution.py b/nowcast/tests/deconvolution/test_deconvolution.py deleted file mode 100644 index 605a03d96..000000000 --- a/nowcast/tests/deconvolution/test_deconvolution.py +++ /dev/null @@ -1,85 +0,0 @@ -import numpy as np -import pytest - - -from delphi_nowcast.deconvolution.deconvolution import (deconvolve_double_smooth_ntf, deconvolve_double_smooth_tf_cv, - _linear_extrapolate, _construct_poly_interp_mat, - _impute_with_neighbors, _fft_convolve, _soft_thresh) - -class TestDeconvolveDoubleSmoothTFCV: - - def test_deconvolve_double_smooth_tf_cv(self): - # trivial deconvolution case - np.testing.assert_allclose( - deconvolve_double_smooth_tf_cv(np.arange(20), np.arange(20), np.array([0,1])), - np.arange(1,21).astype(float) - ) - -class TestDeconvolveDoubleSmoothNTF: - - def test_deconvolve_double_smooth_ntf(self): - # trivial deconvolution case - np.testing.assert_allclose( - deconvolve_double_smooth_ntf(np.arange(20), np.arange(20), np.array([0,1]), lam=1, gam=0), - np.arange(1,21).astype(float) - ) - - def test_deconvolve_double_smooth_ntf_infgamma(self): - # check large gamma means last values are the same - deconv_vals = deconvolve_double_smooth_ntf(np.arange(20), np.arange(20), np.array([0,1]), lam=1, gam=1e10) - assert np.isclose(deconv_vals[-1], deconv_vals[-2]) - - -class Test_SoftThresh: - - def test__soft_thresh(self): - np.testing.assert_array_equal( - _soft_thresh(np.arange(-3,4), 1), - np.array([-2, -1, 0, 0, 0, 1, 2]) - ) - -class Test_FFTConvolve: - - def test__fft_convolve(self): - np.testing.assert_array_equal( - _fft_convolve(np.array([1, 0, 1]), np.array([2, 7])), - np.array([2, 7, 2]) - ) - -class Test_ImputeWithNeighbors: - - def test__impute_with_neighbors(self): - np.testing.assert_array_equal( - _impute_with_neighbors([np.nan, 1, np.nan, 3, np.nan]), - np.array([1, 1, 2, 3, 3]) - ) - - - def test__impute_with_neighbors_no_missing(self): - np.testing.assert_array_equal( - _impute_with_neighbors(np.arange(5)), - np.arange(5) - ) - -class Test_ConstructPolyInterpMat: - - def test__construct_poly_interp_mat(self): - np.testing.assert_array_equal( - _construct_poly_interp_mat(np.arange(6), 3), - np.array([[3., -2.], - [2., -1.], - [1., 0.], - [0., 1.], - [-1., 2.], - [-2., 3.]]) - ) - - def test__construct_poly_interp_mat_wrong_k(self): - with pytest.raises(AssertionError): - _construct_poly_interp_mat(np.arange(6), 2) - - -class Test_LinearExtrapolate: - - def test__linear_extrapolate(self): - assert _linear_extrapolate(0, 0, 1, 3, 4) == 12 diff --git a/nowcast/tests/params.json.template b/nowcast/tests/params.json.template deleted file mode 100644 index 3d55211cd..000000000 --- a/nowcast/tests/params.json.template +++ /dev/null @@ -1,6 +0,0 @@ -{ - "static_file_dir": "../static", - "export_dir": "./receiving", - "cache_dir": "./cache", - "wip_signal": "" -} diff --git a/nowcast/tests/sensorization/test_ar_model.py b/nowcast/tests/sensorization/test_ar_model.py deleted file mode 100644 index 95fc1335b..000000000 --- a/nowcast/tests/sensorization/test_ar_model.py +++ /dev/null @@ -1,83 +0,0 @@ -from unittest.mock import patch -from datetime import date - -import numpy as np - -from delphi_nowcast.sensorization.ar_model import compute_ar_sensor -from delphi_nowcast.data_containers import LocationSeries - - -class TestComputeARSensor: - - @patch("numpy.random.normal") - def test_compute_ar_sensor_no_regularize(self, random_normal): - """Verified with ar.ols(x, FALSE, ar_size, intercept=TRUE, demean=FALSE).""" - random_normal.return_value = 0 - values = LocationSeries( - data={date(2020, 1, 1): -4.27815483, date(2020, 1, 2): -4.83962077, - date(2020, 1, 3): -4.09548122, date(2020, 1, 4): -3.86647783, - date(2020, 1, 5): -2.64494168, date(2020, 1, 6): -3.99573135, - date(2020, 1, 7): -3.4824841, date(2020, 1, 8): -2.77490127, - date(2020, 1, 9): -3.64162355, date(2020, 1, 10): -2.5762891, - date(2020, 1, 11): -2.46793048, date(2020, 1, 12): -3.20454941, - date(2020, 1, 13): -1.77057154, date(2020, 1, 14): -0.02058535, - date(2020, 1, 15): 0.81182691, date(2020, 1, 16): 0.32741982} - ) - assert np.isclose( - compute_ar_sensor(date(2020, 1, 15), values, 1, 0), - -0.09105891 + 0.87530957 * -0.02058535 - ) - assert np.isclose( - compute_ar_sensor(date(2020, 1, 15), values, 2, 0), - 0.31865395 + 0.64751725 * -0.02058535 + 0.30760218 * -1.77057154 - ) - - @patch("numpy.random.normal") - def test_compute_ar_sensor_regularize(self, random_normal): - """coefficients verified with lm.ridge(y~x1+x2, lambda=1*12/11) - - x1 and x2 constructed by hand, lambda is scaled since lm.ridge does some scaling by n/(n-1) - """ - random_normal.return_value = 0 - values = LocationSeries( - data={date(2020, 1, 1): -4.27815483, date(2020, 1, 2): -4.83962077, - date(2020, 1, 3): -4.09548122, date(2020, 1, 4): -3.86647783, - date(2020, 1, 5): -2.64494168, date(2020, 1, 6): -3.99573135, - date(2020, 1, 7): -3.4824841, date(2020, 1, 8): -2.77490127, - date(2020, 1, 9): -3.64162355, date(2020, 1, 10): -2.5762891, - date(2020, 1, 11): -2.46793048, date(2020, 1, 12): -3.20454941, - date(2020, 1, 13): -1.77057154, date(2020, 1, 14): -0.02058535, - date(2020, 1, 15): 0.81182691, date(2020, 1, 16): 0.32741982} - ) - assert np.isclose(compute_ar_sensor(date(2020, 1, 15), values, 2, 1), - -2.8784639 + - 0.2315984 * (-1.77057154 - -3.48901547)/0.7637391 + - 0.5143709 * (-0.02058535 - -3.28005019)/0.8645852 - ) - - def test_compute_ar_sensor_seed(self): - """Test same result over 50 runs""" - values = LocationSeries( - data={date(2020, 1, 1): -4.27815483, date(2020, 1, 2): -4.83962077, - date(2020, 1, 3): -4.09548122, date(2020, 1, 4): -3.86647783, - date(2020, 1, 5): -2.64494168, date(2020, 1, 6): -3.99573135, - date(2020, 1, 7): -3.4824841, date(2020, 1, 8): -2.77490127, - date(2020, 1, 9): -3.64162355, date(2020, 1, 10): -2.5762891, - date(2020, 1, 11): -2.46793048, date(2020, 1, 12): -3.20454941, - date(2020, 1, 13): -1.77057154, date(2020, 1, 14): -0.02058535, - date(2020, 1, 15): 0.81182691, date(2020, 1, 16): 0.32741982} - ) - assert len(set(compute_ar_sensor(date(2020, 1, 15), values, 1, 0) for _ in range(50))) == 1 - - def test_compute_ar_sensor_insufficient_data(self): - values = LocationSeries( - data={date(2020, 1, 1): -4.27815483, date(2020, 1, 2): -4.83962077} - ) - assert np.isnan(compute_ar_sensor(date(2020, 1, 2), values, 1, 0)) - assert np.isnan(compute_ar_sensor(date(2020, 1, 7), values, 1, 0)) - - def test_compute_ar_sensor_out_of_range(self): - values = LocationSeries( - data={date(2020, 1, 1): -4.27815483, date(2020, 1, 2): -4.83962077} - ) - assert np.isnan(compute_ar_sensor(date(2020, 1, 7), values, 1, 0)) diff --git a/nowcast/tests/sensorization/test_regression_model.py b/nowcast/tests/sensorization/test_regression_model.py deleted file mode 100644 index 3233bb3fa..000000000 --- a/nowcast/tests/sensorization/test_regression_model.py +++ /dev/null @@ -1,75 +0,0 @@ -from datetime import date - -import numpy as np - -from delphi_nowcast.sensorization.regression_model import compute_regression_sensor -from delphi_nowcast.data_containers import LocationSeries - - -class TestComputeRegressionSensor: - - def test_compute_regression_sensor_intercept(self): - """Verified with lm(y~x).""" - test_covariate = LocationSeries( - data={date(2020, 1, 1): 1, date(2020, 1, 2): 3, date(2020, 1, 3): 5, - date(2020, 1, 4): 6, date(2020, 1, 5): 7, date(2020, 1, 6): 9, - date(2020, 1, 7): 12} - ) - test_response = LocationSeries( - data={date(2020, 1, 1): 10, date(2020, 1, 2): 16, date(2020, 1, 3): 22, - date(2020, 1, 4): 29, date(2020, 1, 5): 28, date(2020, 1, 6): 35, - date(2020, 1, 7): 42} - ) - assert np.isclose( - compute_regression_sensor(date(2020, 1, 6), test_covariate, test_response, True), - 6.586207 + 3.275862 * 9 - ) - - def test_compute_regression_sensor_no_intercept(self): - """Verified with lm(y~x-1).""" - test_covariate = LocationSeries( - data={date(2020, 1, 1): 1, date(2020, 1, 2): 3, date(2020, 1, 3): 5, - date(2020, 1, 4): 6, date(2020, 1, 5): 7, date(2020, 1, 6): 9, - date(2020, 1, 7): 12} - ) - test_response = LocationSeries( - data={date(2020, 1, 1): 10, date(2020, 1, 2): 16, date(2020, 1, 3): 22, - date(2020, 1, 4): 29, date(2020, 1, 5): 28, date(2020, 1, 6): 35, - date(2020, 1, 7): 42} - ) - assert np.isclose( - compute_regression_sensor(date(2020, 1, 6), test_covariate, test_response, False), - 4.483333 * 9 - ) - - def test_compute_regression_sensor_insufficient_data(self): - test_covariate = LocationSeries( - data={date(2020, 1, 1): 1, date(2020, 1, 2): 3, date(2020, 1, 3): np.nan, - date(2020, 1, 4): 6, date(2020, 1, 5): 7, date(2020, 1, 6): 9, - date(2020, 1, 7): 12} - ) - test_response = LocationSeries( - data={date(2020, 1, 1): 10, date(2020, 1, 2): 16, date(2020, 1, 3): 22, - date(2020, 1, 4): 29, date(2020, 1, 5): 28, date(2020, 1, 6): 35, - date(2020, 1, 7): 42} - ) - assert np.isnan(compute_regression_sensor(date(2020, 1, 1), test_covariate, test_response, False)) - assert np.isnan(compute_regression_sensor(date(2020, 1, 6), test_covariate, test_response, False)) - - def test_compute_regression_sensor_out_of_range(self): - test_covariate = LocationSeries( - data={date(2020, 1, 1): 1, date(2020, 1, 2): 3, date(2020, 1, 3): 5, - date(2020, 1, 4): 6, date(2020, 1, 5): 7, date(2020, 1, 6): 9, - date(2020, 1, 7): 12} - ) - test_response = LocationSeries( - data={date(2020, 1, 1): 10, date(2020, 1, 2): 16, date(2020, 1, 3): 22, - date(2020, 1, 4): 29, date(2020, 1, 5): 28, date(2020, 1, 6): 35, - date(2020, 1, 7): 42} - ) - assert np.isnan(compute_regression_sensor(date(2020, 1, 16), test_covariate, test_response, False)) - - def test_compute_regression_sensor_no_data(self): - test_covariate = LocationSeries() - test_response = LocationSeries() - assert np.isnan(compute_regression_sensor(date(2020, 1, 16), test_covariate, test_response, False)) diff --git a/nowcast/tests/sensorization/test_sensor.py b/nowcast/tests/sensorization/test_sensor.py deleted file mode 100644 index c5dc79478..000000000 --- a/nowcast/tests/sensorization/test_sensor.py +++ /dev/null @@ -1,74 +0,0 @@ -import csv -from datetime import date -import os -import tempfile -from unittest.mock import patch - -import numpy as np -import pandas as pd - -from delphi_nowcast.data_containers import LocationSeries, SensorConfig -from delphi_nowcast.sensorization.sensor import compute_sensors, historical_sensors - - -class TestComputeSensors: - - @patch("delphi_nowcast.sensorization.sensor.compute_ar_sensor") - @patch("delphi_nowcast.sensorization.sensor.get_indicator_data") - def test_compute_sensors_no_covariates(self, mock_get_indicator_data, mock_compute_ar_sensor): - """Test only ground truth sensor is returned if no data is available to compute the rest.""" - mock_get_indicator_data.return_value = {} - mock_compute_ar_sensor.return_value = 1.5 - test_sensors = [SensorConfig("a", "b", "c", 1), SensorConfig("x", "y", "z", 2)] - test_ground_truth_sensor = SensorConfig("i", "j", "k", 3) - test_ground_truth = [LocationSeries("ca", "state")] - assert compute_sensors( - date(2020, 5, 5), test_sensors, test_ground_truth_sensor, test_ground_truth, False - ) == { - SensorConfig("i", "j", "k", 3): [LocationSeries("ca", "state", {date(2020, 5, 2): 1.5})], - } - - @patch("delphi_nowcast.sensorization.sensor.compute_regression_sensor") - @patch("delphi_nowcast.sensorization.sensor.compute_ar_sensor") - @patch("delphi_nowcast.sensorization.sensor.get_indicator_data") - def test_compute_sensors_covariates(self, mock_get_indicator_data, mock_compute_ar_sensor, mock_compute_regression_sensor): - """Test ground truth sensor and non-na regression sensor ar returned""" - mock_get_indicator_data.return_value = {("a", "b", "state", "ca"): ["placeholder"], - ("x", "y", "state", "ca"): ["placeholder"]} - mock_compute_ar_sensor.return_value = 1.5 - mock_compute_regression_sensor.side_effect = [2.5, np.nan] # nan means 2nd sensor is skipped - test_sensors = [SensorConfig("a", "b", "c", 1), SensorConfig("x", "y", "z", 2)] - test_ground_truth_sensor = SensorConfig("i", "j", "k", 3) - test_ground_truth = [LocationSeries("ca", "state")] - assert compute_sensors( - date(2020, 5, 5), test_sensors, test_ground_truth_sensor, test_ground_truth, False - ) == { - SensorConfig("i", "j", "k", 3): [LocationSeries("ca", "state", {date(2020, 5, 2): 1.5})], - SensorConfig("a", "b", "c", 1): [LocationSeries("ca", "state", {date(2020, 5, 4): 2.5})], - } - - -class TestHisoricalSensors: - - @patch("delphi_nowcast.sensorization.sensor.get_historical_sensor_data") - def test_historical_sensors_some_data(self, mock_historical): - """Test non empty data is returned for first two sensors.""" - mock_historical.side_effect = [(LocationSeries(data={date(2020, 1, 1): 2}), []), - (LocationSeries(data={date(2020, 1, 3): 4}), []), - (LocationSeries(), [])] - test_sensors = [SensorConfig("i", "j", "k", 3), SensorConfig("a", "b", "c", 1), SensorConfig("x", "y", "z", 2)] - test_ground_truth = [LocationSeries("ca", "state")] - assert historical_sensors( - None, None, test_sensors, test_ground_truth) == { - SensorConfig("i", "j", "k", 3): [LocationSeries(data={date(2020, 1, 1): 2})], - SensorConfig("a", "b", "c", 1): [LocationSeries(data={date(2020, 1, 3): 4})] - } - - @patch("delphi_nowcast.sensorization.sensor.get_historical_sensor_data") - def test_historical_sensors_no_data(self, mock_historical): - """Test nothing returned for any sensor.""" - mock_historical.return_value = (LocationSeries(), []) - test_sensors = [SensorConfig("i", "j", "k", 3), SensorConfig("a", "b", "c", 1), SensorConfig("x", "y", "z", 2)] - test_ground_truth = [LocationSeries("ca", "state")] - assert historical_sensors( - None, None, test_sensors, test_ground_truth) == {} diff --git a/nowcast/tests/test_constants.py b/nowcast/tests/test_constants.py deleted file mode 100644 index 1daa49993..000000000 --- a/nowcast/tests/test_constants.py +++ /dev/null @@ -1,21 +0,0 @@ -"""This test is to ensure the constants are not accidentally altered.""" - -from delphi_nowcast import constants -from delphi_nowcast.data_containers import SensorConfig - - -def test_constants(): - """If any of these tests fail, please verify that the constant changes are intended. - - If any sensorization constants are changed, verify that you have updated the sensor name in - constants.py so you do not mix the newly configured sensor values with values from previous - configurations. - """ - assert len(dir(constants)) == 16 - assert constants.GROUND_TRUTH_INDICATOR == SensorConfig("placeholder", "placeholder", "placeholder", 0) - assert constants.DELAY_DISTRIBUTION == [] - assert constants.FIT_FUNC == "placeholder" - assert constants.AR_ORDER == 3 - assert constants.AR_LAMBDA == 0.1 - assert constants.REG_SENSORS == [SensorConfig("placeholder", "placeholder", "placeholder", 0),] - assert constants.REG_INTERCEPT is True diff --git a/nowcast/tests/test_data_containers.py b/nowcast/tests/test_data_containers.py deleted file mode 100644 index 6234f0744..000000000 --- a/nowcast/tests/test_data_containers.py +++ /dev/null @@ -1,46 +0,0 @@ -import pytest -import numpy as np -from datetime import date - -from delphi_nowcast.data_containers import LocationSeries - - -class TestLocationSeries: - - def test_add_data(self): - test_ls = LocationSeries(data={date(2020, 1, 1): 2}) - test_ls.add_data(date(2020, 1, 3), 4) - assert test_ls == LocationSeries(data={date(2020, 1, 1): 2, date(2020, 1, 3): 4}) - - def test_get_data_range_out_of_bounds(self): - test_ls = LocationSeries(data={date(2020, 1, 1): 7, date(2020, 1, 2): 8, date(2020, 1, 3): 9}) - with pytest.raises(ValueError, - match="Data range must be within existing dates " - "2020-01-01 to 2020-01-03"): - test_ls.get_data_range(date(2019, 12, 31), date(2020, 1, 3)) - with pytest.raises(ValueError, - match="Data range must be within existing dates " - "2020-01-01 to 2020-01-03"): - test_ls.get_data_range(date(2020, 1, 1), date(2020, 1, 4)) - - def test_get_data_range_no_impute(self): - test_ls = LocationSeries(data={date(2020, 1, 1): 7, date(2020, 1, 2): np.nan, date(2020, 1, 3): 9}) - assert test_ls.get_data_range(date(2020, 1, 1), date(2020, 1, 3), None) == [7, np.nan, 9] - assert test_ls.get_data_range(date(2020, 1, 1), date(2020, 1, 2), None) == [7, np.nan] - - def test_get_data_range_mean_impute(self): - test_ls = LocationSeries(data={date(2020, 1, 1): 7, date(2020, 1, 2): np.nan, date(2020, 1, 3): 9}) - assert test_ls.get_data_range(date(2020, 1, 1), date(2020, 1, 3), "mean") == [7, 8.0, 9] - assert test_ls.get_data_range(date(2020, 1, 1), date(2020, 1, 2), "mean") == [7, 7] - - def test_get_data_range_invalid_impute(self): - test_ls = LocationSeries(data={date(2020, 1, 1): 7, date(2020, 1, 2): np.nan, date(2020, 1, 3): 9}) - with pytest.raises(ValueError, match="Invalid imputation method. Must be None or 'mean'"): - test_ls.get_data_range(date(2020, 1, 1), date(2020, 1, 3), "fakeimpute") - - def test_no_data(self): - test_ls = LocationSeries() - with pytest.raises(ValueError, match="No data"): - test_ls.dates - with pytest.raises(ValueError, match="No data"): - test_ls.values diff --git a/nowcast/tests/test_epidata.py b/nowcast/tests/test_epidata.py deleted file mode 100644 index 517a1cbdc..000000000 --- a/nowcast/tests/test_epidata.py +++ /dev/null @@ -1,183 +0,0 @@ -import os -import tempfile -from datetime import date -from unittest.mock import patch - -import numpy as np -import pandas as pd -import pytest -from delphi_nowcast.data_containers import LocationSeries, SensorConfig -from delphi_nowcast.epidata import export_to_csv, get_indicator_data, get_historical_sensor_data, EPIDATA_START_DATE - - -class TestGetIndicatorData: - - @patch("delphi_epidata.Epidata.async_epidata") - def test_results(self, mock_epidata): - mock_epidata.return_value = [ - ({"result": 1, "epidata": [{"time_value": 20200101, "value": 1}, - {"time_value": 20200102, "value": np.nan}]}, - {"data_source": "src1", "signals": "sig1", "geo_type": "state", "geo_value": "ca"}), - ({"result": 1, "epidata": [{"time_value": 20200101, "value": 2.5}]}, - {"data_source": "src1", "signals": "sig1", "geo_type": "county", "geo_value": "01001"}), - ({"result": -2}, - {"data_source": "src2", "signals": "sig2", "geo_type": "state", "geo_value": "ca"}), - ({"result": -2}, - {"data_source": "src2", "signals": "sig2", "geo_type": "county", "geo_value": "01001"}), - ] - test_output = get_indicator_data( - [SensorConfig("src1", "sig1", None, None), SensorConfig("src2", "sig2", None, None)], - [LocationSeries("ca", "state"), LocationSeries("01001", "county")], - date(2020, 1, 1) - ) - assert test_output == { - ("src1", "sig1", "state", "ca"): LocationSeries("ca", "state", {date(2020, 1, 1): 1}), - ("src1", "sig1", "county", "01001"): LocationSeries("01001", "county", {date(2020, 1, 1): 2.5}) - } - mock_epidata.assert_called_once_with([ - {"source": "covidcast", - "data_source": "src1", - "signals": "sig1", - "time_type": "day", - "geo_type": "state", - "geo_value": "ca", - "time_values": f"{EPIDATA_START_DATE}-20200101", - "as_of": "20200101"}, - {"source": "covidcast", - "data_source": "src1", - "signals": "sig1", - "time_type": "day", - "geo_type": "county", - "geo_value": "01001", - "time_values": f"{EPIDATA_START_DATE}-20200101", - "as_of": "20200101"}, - {"source": "covidcast", - "data_source": "src2", - "signals": "sig2", - "time_type": "day", - "geo_type": "state", - "geo_value": "ca", - "time_values": f"{EPIDATA_START_DATE}-20200101", - "as_of": "20200101"}, - {"source": "covidcast", - "data_source": "src2", - "signals": "sig2", - "time_type": "day", - "geo_type": "county", - "geo_value": "01001", - "time_values": f"{EPIDATA_START_DATE}-20200101", - "as_of": "20200101"} - ]) - - @patch("delphi_epidata.Epidata.async_epidata") - def test_no_results(self, mock_epidata): - mock_epidata.return_value = [ - ({"result": -2}, - {"data_source": "src1", "signals": "sig1", "geo_type": "state", "geo_value": "ca"}), - ({"result": -2}, - {"data_source": "src1", "signals": "sig1", "geo_type": "county", "geo_value": "01001"}), - ] - test_output = get_indicator_data( - [SensorConfig("src1", "sig1", None, None), SensorConfig("src2", "sig2", None, None)], - [LocationSeries("ca", "state")], - date(2020, 1, 1) - ) - assert test_output == {} - mock_epidata.assert_called_once_with([ - {"source": "covidcast", - "data_source": "src1", - "signals": "sig1", - "time_type": "day", - "geo_type": "state", - "geo_value": "ca", - "time_values": f"{EPIDATA_START_DATE}-20200101", - "as_of": "20200101"}, - {"source": "covidcast", - "data_source": "src2", - "signals": "sig2", - "time_type": "day", - "geo_type": "state", - "geo_value": "ca", - "time_values": f"{EPIDATA_START_DATE}-20200101", - "as_of": "20200101"}, - ]) - - @patch("delphi_epidata.Epidata.async_epidata") - def test_error(self, mock_epidata): - mock_epidata.return_value = [({"result": -3, "message": "test failure"}, {})] - with pytest.raises(Exception, match="Bad result from Epidata: test failure"): - get_indicator_data([SensorConfig(None, None, None, None)], - [LocationSeries(None, None)], - date(2020, 1, 1)) - mock_epidata.assert_called_once_with([ - {"source": "covidcast", - "data_source": None, - "signals": None, - "time_type": "day", - "geo_type": None, - "geo_value": None, - "time_values": f"{EPIDATA_START_DATE}-20200101", - "as_of": "20200101"} - ]) - - -class TestGetHistoricalSensorData: - - @patch("delphi_epidata.Epidata.covidcast_nowcast") - def test_results(self, mock_epidata): - mock_epidata.return_value = { - "result": 1, - "epidata": [{"time_value": 20200101, "value": 1}, - {"time_value": 20200102, "value": np.nan}] - } - test_output = get_historical_sensor_data(SensorConfig(None, None, None, None), - LocationSeries(None, None), - date(2020, 1, 1), - date(2020, 1, 4)) - - assert test_output == (LocationSeries(None, None, {date(2020, 1, 1): 1}), - [date(2020, 1, 2), - date(2020, 1, 3), - date(2020, 1, 4)]) - - @patch("delphi_epidata.Epidata.covidcast_nowcast") - def test_no_results(self, mock_epidata): - mock_epidata.return_value = {"result": -2} - test_output = get_historical_sensor_data(SensorConfig(None, None, None, None), - LocationSeries(None, None), - date(2020, 1, 1), - date(2020, 1, 4)) - - assert test_output == (LocationSeries(None, None), [date(2020, 1, 1), date(2020, 1, 2), - date(2020, 1, 3), date(2020, 1, 4)]) - - @patch("delphi_epidata.Epidata.covidcast_nowcast") - def test_error(self, mock_epidata): - mock_epidata.return_value = {"result": -3, "message": "test failure"} - with pytest.raises(Exception, match="Bad result from Epidata: test failure"): - get_historical_sensor_data(SensorConfig(None, None, None, None), - LocationSeries(None, None), - date(2020, 1, 1), - date(2020, 1, 4)) - - -class TestExportToCSV: - - def test_export_to_csv(self): - """Test export creates the right file and right contents.""" - test_sensor = SensorConfig(source="src", - signal="sig", - name="test", - lag=4) - test_value = LocationSeries("ca", "state", {date(2020, 1, 1): 1.5}) - with tempfile.TemporaryDirectory() as tmpdir: - out_files = export_to_csv(test_value, test_sensor, date(2020, 1, 5), receiving_dir=tmpdir) - assert len(out_files) == 1 - out_file = out_files[0] - assert os.path.isfile(out_file) - assert out_file.endswith("issue_20200105/src/20200101_state_sig.csv") - out_file_df = pd.read_csv(out_file) - pd.testing.assert_frame_equal(out_file_df, - pd.DataFrame({"sensor_name": ["test"], - "geo_value": ["ca"], - "value": [1.5]})) diff --git a/nowcast/tests/test_run.py b/nowcast/tests/test_run.py deleted file mode 100644 index 0e7075056..000000000 --- a/nowcast/tests/test_run.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Test placeholder.""" - -def test_run(): - pass diff --git a/nowcast/version.cfg b/nowcast/version.cfg deleted file mode 100644 index 047910754..000000000 --- a/nowcast/version.cfg +++ /dev/null @@ -1 +0,0 @@ -current_version = 0.3.47