diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index 6ecc66a83..8141949e8 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -23,7 +23,7 @@ from app import config, configure_logging import app.utils.time as time_utils from app.utils.redis import create_redis -from app.stations import get_stations_synchronously, StationSourceEnum +from app.stations import get_stations_synchronously from app.db.models.weather_models import (ProcessedModelRunUrl, PredictionModelRunTimestamp, WeatherStationModelPrediction, ModelRunPrediction) import app.db.database @@ -187,10 +187,10 @@ class ModelValueProcessor: """ Iterate through model runs that have completed, and calculate the interpolated weather predictions. """ - def __init__(self, session, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): + def __init__(self, session): """ Prepare variables we're going to use throughout """ self.session = session - self.stations = get_stations_synchronously(station_source) + self.stations = get_stations_synchronously() self.station_count = len(self.stations) def _process_model_run(self, model_run: PredictionModelRunTimestamp, model_type: ModelEnum): diff --git a/api/app/jobs/env_canada.py b/api/app/jobs/env_canada.py index 301c8da78..3167580b5 100644 --- a/api/app/jobs/env_canada.py +++ b/api/app/jobs/env_canada.py @@ -22,7 +22,6 @@ import app.utils.time as time_utils from app.weather_models.process_grib import GribFileProcessor, ModelRunInfo import app.db.database -from app.stations import StationSourceEnum from app.rocketchat_notifications import send_rocketchat_notification from app.jobs.env_canada_utils import adjust_model_day, get_model_run_urls @@ -160,14 +159,14 @@ class EnvCanada(): Canada. """ - def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): + def __init__(self, session: Session, model_type: ModelEnum): """ Prep variables """ self.files_downloaded = 0 self.files_processed = 0 self.exception_count = 0 # We always work in UTC: self.now = time_utils.get_utc_now() - self.grib_processor = GribFileProcessor(station_source) + self.grib_processor = GribFileProcessor() self.model_type: ModelEnum = model_type self.session = session # set projection based on model_type @@ -246,7 +245,7 @@ def process(self): self.model_type, hour, exc_info=exception) -def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): +def process_models(): """ downloading and processing models """ # set the model type requested based on arg passed via command line @@ -257,11 +256,11 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI start_time = datetime.datetime.now() with app.db.database.get_write_session_scope() as session: - env_canada = EnvCanada(session, model_type, station_source) + env_canada = EnvCanada(session, model_type) env_canada.process() # interpolate and machine learn everything that needs interpolating. - model_value_processor = ModelValueProcessor(session, station_source) + model_value_processor = ModelValueProcessor(session) model_value_processor.process(model_type) # calculate the execution time. diff --git a/api/app/jobs/noaa.py b/api/app/jobs/noaa.py index bef780ba9..c9ad06741 100644 --- a/api/app/jobs/noaa.py +++ b/api/app/jobs/noaa.py @@ -23,7 +23,6 @@ from app.weather_models import ModelEnum, ProjectionEnum from app.weather_models.process_grib import GribFileProcessor, ModelRunInfo import app.db.database -from app.stations import StationSourceEnum from app.rocketchat_notifications import send_rocketchat_notification # If running as its own process, configure logging appropriately. @@ -261,14 +260,14 @@ class NOAA(): """ Class that orchestrates downloading and processing of GFS weather model grib files from NOAA. """ - def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): + def __init__(self, session: Session, model_type: ModelEnum): """ Prep variables """ self.files_downloaded = 0 self.files_processed = 0 self.exception_count = 0 # We always work in UTC: self.now = time_utils.get_utc_now() - self.grib_processor = GribFileProcessor(station_source) + self.grib_processor = GribFileProcessor() self.model_type: ModelEnum = model_type self.session = session # projection depends on model type @@ -346,7 +345,7 @@ def process(self): self.model_type, hour, exc_info=exception) -def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): +def process_models(): """ downloading and processing models """ # set the model type requested based on arg passed via command line model_type = ModelEnum(sys.argv[1]) @@ -356,11 +355,11 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI start_time = datetime.datetime.now() with app.db.database.get_write_session_scope() as session: - noaa = NOAA(session, model_type, station_source) + noaa = NOAA(session, model_type) noaa.process() # interpolate and machine learn everything that needs interpolating. - model_value_processor = ModelValueProcessor(session, station_source) + model_value_processor = ModelValueProcessor(session) model_value_processor.process(model_type) # calculate the execution time. diff --git a/api/app/routers/stations.py b/api/app/routers/stations.py index ecdba3a49..da75199a7 100644 --- a/api/app/routers/stations.py +++ b/api/app/routers/stations.py @@ -6,7 +6,7 @@ from app.utils.time import get_utc_now, get_hour_20 from app.schemas.stations import (WeatherStationGroupsMemberRequest, WeatherStationsResponse, DetailedWeatherStationsResponse, WeatherStationGroupsResponse, WeatherStationGroupMembersResponse) -from app.stations import StationSourceEnum, get_stations_as_geojson, fetch_detailed_stations_as_geojson +from app.stations import get_stations_as_geojson, fetch_detailed_stations_as_geojson from app.wildfire_one import wfwx_api @@ -20,11 +20,7 @@ @router.get('/details/', response_model=DetailedWeatherStationsResponse) -async def get_detailed_stations(response: Response, - toi: datetime = None, - source: StationSourceEnum = StationSourceEnum.WILDFIRE_ONE, - __=Depends(audit), - _=Depends(authentication_required)): +async def get_detailed_stations(response: Response, toi: datetime = None, __=Depends(audit), _=Depends(authentication_required)): """ Returns a list of fire weather stations with detailed information. -) Unspecified: Use configuration to establish source. -) LocalStorage: Read from json file (ignore configuration). @@ -40,7 +36,7 @@ async def get_detailed_stations(response: Response, toi = get_utc_now() else: toi = get_hour_20(toi) - weather_stations = await fetch_detailed_stations_as_geojson(toi, source) + weather_stations = await fetch_detailed_stations_as_geojson(toi) return DetailedWeatherStationsResponse(features=weather_stations) except Exception as exception: @@ -49,8 +45,7 @@ async def get_detailed_stations(response: Response, @router.get('/', response_model=WeatherStationsResponse) -async def get_stations(response: Response, - source: StationSourceEnum = StationSourceEnum.UNSPECIFIED): +async def get_stations(response: Response): """ Return a list of fire weather stations. Stations source can be: -) Unspecified: Use configuration to establish source. @@ -60,7 +55,7 @@ async def get_stations(response: Response, try: logger.info('/stations/') - weather_stations = await get_stations_as_geojson(source) + weather_stations = await get_stations_as_geojson() response.headers["Cache-Control"] = no_cache return WeatherStationsResponse(features=weather_stations) diff --git a/api/app/stations.py b/api/app/stations.py index 25d029b8a..8474755d9 100644 --- a/api/app/stations.py +++ b/api/app/stations.py @@ -29,19 +29,6 @@ dirname, 'data/weather_stations.json') -class StationSourceEnum(enum.Enum): - """ Station list sources. - We currently have two sources for station listing, local json file, or wildfire one api. - If the source is unspecified, configuration will govern which is used. - """ - # Configuration wins: - UNSPECIFIED = 'unspecified' - # Use wildfire one as source, filtering on active stations: - WILDFIRE_ONE = 'wildfire_one' - # Use static file as source for testing purposes: - TEST = 'test' - - def _get_stations_local() -> List[WeatherStation]: """ Get list of stations from local json files. """ @@ -106,36 +93,24 @@ async def get_stations_by_codes(station_codes: List[int]) -> List[WeatherStation return await wfwx_api.get_stations_by_codes(station_codes) -async def get_stations_from_source( - station_source: StationSourceEnum = StationSourceEnum.WILDFIRE_ONE) -> List[WeatherStation]: - """ Get list of stations from some source (ideally WFWX Fireweather API) - """ - if station_source == StationSourceEnum.UNSPECIFIED or station_source == StationSourceEnum.WILDFIRE_ONE: - return await get_stations_asynchronously() - # Get from local: - return _get_stations_local() - - -async def fetch_detailed_stations_as_geojson( - time_of_interest: datetime, - station_source: StationSourceEnum) \ - -> List[GeoJsonDetailedWeatherStation]: - """ Fetch a detailed list of stations. i.e. more than just the fire station name and code, - throw some observations and forecast in the mix. """ - if station_source == StationSourceEnum.UNSPECIFIED or station_source == StationSourceEnum.WILDFIRE_ONE: - # Get from wildfire one: - logger.info('requesting detailed stations...') - result = await get_detailed_stations(time_of_interest) - logger.info('detailed stations loaded.') - return result - return await _get_detailed_stations(time_of_interest) - - -async def get_stations_as_geojson( - station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED) -> List[GeoJsonWeatherStation]: +async def get_stations_from_source() -> List[WeatherStation]: + """Get list of stations from some source (ideally WFWX Fireweather API)""" + return await get_stations_asynchronously() + + +async def fetch_detailed_stations_as_geojson(time_of_interest: datetime) -> List[GeoJsonDetailedWeatherStation]: + """Fetch a detailed list of stations. i.e. more than just the fire station name and code, + throw some observations and forecast in the mix.""" + logger.info("requesting detailed stations...") + result = await get_detailed_stations(time_of_interest) + logger.info("detailed stations loaded.") + return result + + +async def get_stations_as_geojson() -> List[GeoJsonWeatherStation]: """ Format stations to conform to GeoJson spec """ geojson_stations = [] - stations = await get_stations_from_source(station_source) + stations = await get_stations_from_source() for station in stations: geojson_stations.append( GeoJsonWeatherStation(properties=WeatherStationProperties( @@ -154,9 +129,9 @@ async def get_stations_asynchronously(): return await get_station_data(session, header) -def get_stations_synchronously(station_source: StationSourceEnum) -> List[WeatherStation]: +def get_stations_synchronously() -> List[WeatherStation]: """ Get list of stations - in a synchronous/blocking call. """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - return loop.run_until_complete(get_stations_from_source(station_source)) + return loop.run_until_complete(get_stations_from_source()) diff --git a/api/app/tests/weather_models/test_env_canada_gdps.py b/api/app/tests/weather_models/test_env_canada_gdps.py index dbdee46fb..2e635437c 100644 --- a/api/app/tests/weather_models/test_env_canada_gdps.py +++ b/api/app/tests/weather_models/test_env_canada_gdps.py @@ -7,6 +7,7 @@ from datetime import datetime import pytest import requests +from aiohttp import ClientSession from sqlalchemy.orm import Session from app.jobs import env_canada from app.jobs.env_canada_utils import GRIB_LAYERS, get_global_model_run_download_urls @@ -14,9 +15,9 @@ import app.utils.time as time_utils from app.weather_models import machine_learning import app.db.crud.weather_models -from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, PredictionModelRunTimestamp) +from app.tests.common import default_mock_client_get from app.tests.weather_models.crud import get_actuals_left_outer_join_with_predictions from app.tests.weather_models.test_models_common import (MockResponse, mock_get_processed_file_count, mock_get_stations) @@ -118,16 +119,15 @@ def mock_get_stations_synchronously(monkeypatch): @pytest.mark.usefixtures('mock_get_processed_file_record') -def test_process_gdps(mock_download, - mock_database, - mock_get_actuals_left_outer_join_with_predictions, - mock_get_stations_synchronously, - mock_get_processed_file_count): +def test_process_gdps( + mock_download, mock_database, mock_get_actuals_left_outer_join_with_predictions, mock_get_stations_synchronously, mock_get_processed_file_count, monkeypatch: pytest.MonkeyPatch +): """ run main method to see if it runs successfully. """ # All files, except one, are marked as already having been downloaded, so we expect one file to # be processed. + monkeypatch.setattr(ClientSession, "get", default_mock_client_get) sys.argv = ["argv", "GDPS"] - assert env_canada.process_models(StationSourceEnum.TEST) == 1 + assert env_canada.process_models() == 1 def test_for_zero_day_bug(monkeypatch): diff --git a/api/app/tests/weather_models/test_env_canada_hrdps.py b/api/app/tests/weather_models/test_env_canada_hrdps.py index 9b1300bb0..0313ad538 100644 --- a/api/app/tests/weather_models/test_env_canada_hrdps.py +++ b/api/app/tests/weather_models/test_env_canada_hrdps.py @@ -7,6 +7,7 @@ import pytest import requests import datetime +from aiohttp import ClientSession from sqlalchemy.orm import Session from pytest_mock import MockerFixture from app.jobs.env_canada_utils import HRDPS_GRIB_LAYERS, get_high_res_model_run_download_urls @@ -17,9 +18,9 @@ import app.jobs.common_model_fetchers import app.weather_models.process_grib from app.weather_models import ProjectionEnum -from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, PredictionModelRunTimestamp) +from app.tests.common import default_mock_client_get from app.tests.weather_models.test_env_canada_gdps import MockResponse @@ -97,12 +98,13 @@ def test_get_hrdps_download_urls(): @pytest.mark.usefixtures('mock_get_processed_file_record') -def test_process_hrdps(mock_download, mock_database): +def test_process_hrdps(mock_download, mock_database, monkeypatch: pytest.MonkeyPatch): """ run process method to see if it runs successfully. """ # All files, except one, are marked as already having been downloaded, so we expect one file to # be processed. + monkeypatch.setattr(ClientSession, "get", default_mock_client_get) sys.argv = ["argv", "HRDPS"] - assert app.jobs.env_canada.process_models(StationSourceEnum.TEST) == 1 + assert app.jobs.env_canada.process_models() == 1 def test_main_fail(mocker: MockerFixture, monkeypatch): diff --git a/api/app/tests/weather_models/test_env_canada_rdps.py b/api/app/tests/weather_models/test_env_canada_rdps.py index c44fff18a..b986a793d 100644 --- a/api/app/tests/weather_models/test_env_canada_rdps.py +++ b/api/app/tests/weather_models/test_env_canada_rdps.py @@ -5,6 +5,7 @@ import logging import pytest import requests +from aiohttp import ClientSession from typing import Optional from sqlalchemy.orm import Session from app.jobs.env_canada_utils import GRIB_LAYERS, get_regional_model_run_download_urls @@ -13,9 +14,9 @@ import app.jobs.env_canada import app.jobs.common_model_fetchers import app.db.crud.weather_models -from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, PredictionModelRunTimestamp) +from app.tests.common import default_mock_client_get from app.tests.weather_models.test_env_canada_gdps import (MockResponse) logger = logging.getLogger(__name__) @@ -103,10 +104,10 @@ def test_get_rdps_download_urls(): @pytest.mark.usefixtures('mock_get_processed_file_record') -def test_process_rdps(mock_download, - mock_database): +def test_process_rdps(mock_download, mock_database, monkeypatch: pytest.MonkeyPatch): """ run main method to see if it runs successfully. """ # All files, except one, are marked as already having been downloaded, so we expect one file to # be processed. + monkeypatch.setattr(ClientSession, "get", default_mock_client_get) sys.argv = ["argv", "RDPS"] - assert app.jobs.env_canada.process_models(StationSourceEnum.TEST) == 1 + assert app.jobs.env_canada.process_models() == 1 diff --git a/api/app/tests/weather_models/test_process_grib.py b/api/app/tests/weather_models/test_process_grib.py index c7aa878e5..830a06d0d 100644 --- a/api/app/tests/weather_models/test_process_grib.py +++ b/api/app/tests/weather_models/test_process_grib.py @@ -5,7 +5,6 @@ import math import pytest from app.geospatial import NAD83_CRS -from app.stations import StationSourceEnum from app.tests.common import default_mock_client_get from app.weather_models import process_grib @@ -37,10 +36,7 @@ def test_read_single_raster_value(monkeypatch: pytest.MonkeyPatch): geo_to_raster_transformer = process_grib.get_transformer(NAD83_CRS, crs) padf_transform = process_grib.get_dataset_geometry(filename) - processor = process_grib.GribFileProcessor(StationSourceEnum.UNSPECIFIED, - padf_transform, - raster_to_geo_transformer, - geo_to_raster_transformer) + processor = process_grib.GribFileProcessor(padf_transform, raster_to_geo_transformer, geo_to_raster_transformer) raster_band = dataset.GetRasterBand(1) station, value = next(processor.yield_value_for_stations(raster_band)) diff --git a/api/app/weather_models/process_grib.py b/api/app/weather_models/process_grib.py index f9f1077ae..30423c67f 100644 --- a/api/app/weather_models/process_grib.py +++ b/api/app/weather_models/process_grib.py @@ -14,7 +14,7 @@ import rasterio from rasterio.io import DatasetReader from app.geospatial import NAD83_CRS -from app.stations import get_stations_synchronously, StationSourceEnum +from app.stations import get_stations_synchronously from app.db.models.weather_models import ( ModelRunPrediction, PredictionModel, PredictionModelRunTimestamp) from app.db.crud.weather_models import ( @@ -137,13 +137,9 @@ class GribFileProcessor(): """ Instances of this object can be used to process and ingest a grib file. """ - def __init__(self, - station_source: StationSourceEnum, - padf_transform=None, - raster_to_geo_transformer=None, - geo_to_raster_transformer=None): + def __init__(self, padf_transform=None, raster_to_geo_transformer=None, geo_to_raster_transformer=None): # Get list of stations we're interested in, and store it so that we only call it once. - self.stations = get_stations_synchronously(station_source) + self.stations = get_stations_synchronously() self.padf_transform = padf_transform self.raster_to_geo_transformer = raster_to_geo_transformer self.geo_to_raster_transformer = geo_to_raster_transformer