Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove StationSourceEnum #4040

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from app import config, configure_logging
import app.utils.time as time_utils
from app.utils.redis import create_redis
from app.stations import get_stations_synchronously, StationSourceEnum
from app.stations import get_stations_synchronously
from app.db.models.weather_models import (ProcessedModelRunUrl, PredictionModelRunTimestamp,
WeatherStationModelPrediction, ModelRunPrediction)
import app.db.database
Expand Down Expand Up @@ -187,10 +187,10 @@ class ModelValueProcessor:
""" Iterate through model runs that have completed, and calculate the interpolated weather predictions.
"""

def __init__(self, session, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def __init__(self, session):
""" Prepare variables we're going to use throughout """
self.session = session
self.stations = get_stations_synchronously(station_source)
self.stations = get_stations_synchronously()
self.station_count = len(self.stations)

def _process_model_run(self, model_run: PredictionModelRunTimestamp, model_type: ModelEnum):
Expand Down
11 changes: 5 additions & 6 deletions api/app/jobs/env_canada.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import app.utils.time as time_utils
from app.weather_models.process_grib import GribFileProcessor, ModelRunInfo
import app.db.database
from app.stations import StationSourceEnum
from app.rocketchat_notifications import send_rocketchat_notification
from app.jobs.env_canada_utils import adjust_model_day, get_model_run_urls

Expand Down Expand Up @@ -160,14 +159,14 @@ class EnvCanada():
Canada.
"""

def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def __init__(self, session: Session, model_type: ModelEnum):
""" Prep variables """
self.files_downloaded = 0
self.files_processed = 0
self.exception_count = 0
# We always work in UTC:
self.now = time_utils.get_utc_now()
self.grib_processor = GribFileProcessor(station_source)
self.grib_processor = GribFileProcessor()
self.model_type: ModelEnum = model_type
self.session = session
# set projection based on model_type
Expand Down Expand Up @@ -246,7 +245,7 @@ def process(self):
self.model_type, hour, exc_info=exception)


def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def process_models():
""" downloading and processing models """

# set the model type requested based on arg passed via command line
Expand All @@ -257,11 +256,11 @@ def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECI
start_time = datetime.datetime.now()

with app.db.database.get_write_session_scope() as session:
env_canada = EnvCanada(session, model_type, station_source)
env_canada = EnvCanada(session, model_type)
env_canada.process()

# interpolate and machine learn everything that needs interpolating.
model_value_processor = ModelValueProcessor(session, station_source)
model_value_processor = ModelValueProcessor(session)
model_value_processor.process(model_type)

# calculate the execution time.
Expand Down
11 changes: 5 additions & 6 deletions api/app/jobs/noaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from app.weather_models import ModelEnum, ProjectionEnum
from app.weather_models.process_grib import GribFileProcessor, ModelRunInfo
import app.db.database
from app.stations import StationSourceEnum
from app.rocketchat_notifications import send_rocketchat_notification

# If running as its own process, configure logging appropriately.
Expand Down Expand Up @@ -261,14 +260,14 @@
""" Class that orchestrates downloading and processing of GFS weather model grib files from NOAA.
"""

def __init__(self, session: Session, model_type: ModelEnum, station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def __init__(self, session: Session, model_type: ModelEnum):
""" Prep variables """
self.files_downloaded = 0
self.files_processed = 0
self.exception_count = 0
# We always work in UTC:
self.now = time_utils.get_utc_now()
self.grib_processor = GribFileProcessor(station_source)
self.grib_processor = GribFileProcessor()

Check warning on line 270 in api/app/jobs/noaa.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/noaa.py#L270

Added line #L270 was not covered by tests
self.model_type: ModelEnum = model_type
self.session = session
# projection depends on model type
Expand Down Expand Up @@ -346,7 +345,7 @@
self.model_type, hour, exc_info=exception)


def process_models(station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
def process_models():
""" downloading and processing models """
# set the model type requested based on arg passed via command line
model_type = ModelEnum(sys.argv[1])
Expand All @@ -356,11 +355,11 @@
start_time = datetime.datetime.now()

with app.db.database.get_write_session_scope() as session:
noaa = NOAA(session, model_type, station_source)
noaa = NOAA(session, model_type)

Check warning on line 358 in api/app/jobs/noaa.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/noaa.py#L358

Added line #L358 was not covered by tests
noaa.process()

# interpolate and machine learn everything that needs interpolating.
model_value_processor = ModelValueProcessor(session, station_source)
model_value_processor = ModelValueProcessor(session)

Check warning on line 362 in api/app/jobs/noaa.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/noaa.py#L362

Added line #L362 was not covered by tests
model_value_processor.process(model_type)

# calculate the execution time.
Expand Down
15 changes: 5 additions & 10 deletions api/app/routers/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from app.utils.time import get_utc_now, get_hour_20
from app.schemas.stations import (WeatherStationGroupsMemberRequest, WeatherStationsResponse, DetailedWeatherStationsResponse, WeatherStationGroupsResponse,
WeatherStationGroupMembersResponse)
from app.stations import StationSourceEnum, get_stations_as_geojson, fetch_detailed_stations_as_geojson
from app.stations import get_stations_as_geojson, fetch_detailed_stations_as_geojson
from app.wildfire_one import wfwx_api


Expand All @@ -20,11 +20,7 @@


@router.get('/details/', response_model=DetailedWeatherStationsResponse)
async def get_detailed_stations(response: Response,
toi: datetime = None,
source: StationSourceEnum = StationSourceEnum.WILDFIRE_ONE,
__=Depends(audit),
_=Depends(authentication_required)):
async def get_detailed_stations(response: Response, toi: datetime = None, __=Depends(audit), _=Depends(authentication_required)):
""" Returns a list of fire weather stations with detailed information.
-) Unspecified: Use configuration to establish source.
-) LocalStorage: Read from json file (ignore configuration).
Expand All @@ -40,7 +36,7 @@ async def get_detailed_stations(response: Response,
toi = get_utc_now()
else:
toi = get_hour_20(toi)
weather_stations = await fetch_detailed_stations_as_geojson(toi, source)
weather_stations = await fetch_detailed_stations_as_geojson(toi)
return DetailedWeatherStationsResponse(features=weather_stations)

except Exception as exception:
Expand All @@ -49,8 +45,7 @@ async def get_detailed_stations(response: Response,


@router.get('/', response_model=WeatherStationsResponse)
async def get_stations(response: Response,
source: StationSourceEnum = StationSourceEnum.UNSPECIFIED):
async def get_stations(response: Response):
""" Return a list of fire weather stations.
Stations source can be:
-) Unspecified: Use configuration to establish source.
Expand All @@ -60,7 +55,7 @@ async def get_stations(response: Response,
try:
logger.info('/stations/')

weather_stations = await get_stations_as_geojson(source)
weather_stations = await get_stations_as_geojson()
response.headers["Cache-Control"] = no_cache

return WeatherStationsResponse(features=weather_stations)
Expand Down
61 changes: 18 additions & 43 deletions api/app/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,6 @@
dirname, 'data/weather_stations.json')


class StationSourceEnum(enum.Enum):
""" Station list sources.
We currently have two sources for station listing, local json file, or wildfire one api.
If the source is unspecified, configuration will govern which is used.
"""
# Configuration wins:
UNSPECIFIED = 'unspecified'
# Use wildfire one as source, filtering on active stations:
WILDFIRE_ONE = 'wildfire_one'
# Use static file as source for testing purposes:
TEST = 'test'


def _get_stations_local() -> List[WeatherStation]:
""" Get list of stations from local json files.
"""
Expand Down Expand Up @@ -106,36 +93,24 @@ async def get_stations_by_codes(station_codes: List[int]) -> List[WeatherStation
return await wfwx_api.get_stations_by_codes(station_codes)


async def get_stations_from_source(
station_source: StationSourceEnum = StationSourceEnum.WILDFIRE_ONE) -> List[WeatherStation]:
""" Get list of stations from some source (ideally WFWX Fireweather API)
"""
if station_source == StationSourceEnum.UNSPECIFIED or station_source == StationSourceEnum.WILDFIRE_ONE:
return await get_stations_asynchronously()
# Get from local:
return _get_stations_local()


async def fetch_detailed_stations_as_geojson(
time_of_interest: datetime,
station_source: StationSourceEnum) \
-> List[GeoJsonDetailedWeatherStation]:
""" Fetch a detailed list of stations. i.e. more than just the fire station name and code,
throw some observations and forecast in the mix. """
if station_source == StationSourceEnum.UNSPECIFIED or station_source == StationSourceEnum.WILDFIRE_ONE:
# Get from wildfire one:
logger.info('requesting detailed stations...')
result = await get_detailed_stations(time_of_interest)
logger.info('detailed stations loaded.')
return result
return await _get_detailed_stations(time_of_interest)


async def get_stations_as_geojson(
station_source: StationSourceEnum = StationSourceEnum.UNSPECIFIED) -> List[GeoJsonWeatherStation]:
async def get_stations_from_source() -> List[WeatherStation]:
"""Get list of stations from some source (ideally WFWX Fireweather API)"""
return await get_stations_asynchronously()


async def fetch_detailed_stations_as_geojson(time_of_interest: datetime) -> List[GeoJsonDetailedWeatherStation]:
"""Fetch a detailed list of stations. i.e. more than just the fire station name and code,
throw some observations and forecast in the mix."""
logger.info("requesting detailed stations...")
result = await get_detailed_stations(time_of_interest)
logger.info("detailed stations loaded.")
return result


async def get_stations_as_geojson() -> List[GeoJsonWeatherStation]:
""" Format stations to conform to GeoJson spec """
geojson_stations = []
stations = await get_stations_from_source(station_source)
stations = await get_stations_from_source()
for station in stations:
geojson_stations.append(
GeoJsonWeatherStation(properties=WeatherStationProperties(
Expand All @@ -154,9 +129,9 @@ async def get_stations_asynchronously():
return await get_station_data(session, header)


def get_stations_synchronously(station_source: StationSourceEnum) -> List[WeatherStation]:
def get_stations_synchronously() -> List[WeatherStation]:
""" Get list of stations - in a synchronous/blocking call.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(get_stations_from_source(station_source))
return loop.run_until_complete(get_stations_from_source())
14 changes: 7 additions & 7 deletions api/app/tests/weather_models/test_env_canada_gdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
from datetime import datetime
import pytest
import requests
from aiohttp import ClientSession
from sqlalchemy.orm import Session
from app.jobs import env_canada
from app.jobs.env_canada_utils import GRIB_LAYERS, get_global_model_run_download_urls
from app.jobs import common_model_fetchers
import app.utils.time as time_utils
from app.weather_models import machine_learning
import app.db.crud.weather_models
from app.stations import StationSourceEnum
from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl,
PredictionModelRunTimestamp)
from app.tests.common import default_mock_client_get
from app.tests.weather_models.crud import get_actuals_left_outer_join_with_predictions
from app.tests.weather_models.test_models_common import (MockResponse, mock_get_processed_file_count, mock_get_stations)

Expand Down Expand Up @@ -118,16 +119,15 @@ def mock_get_stations_synchronously(monkeypatch):


@pytest.mark.usefixtures('mock_get_processed_file_record')
def test_process_gdps(mock_download,
mock_database,
mock_get_actuals_left_outer_join_with_predictions,
mock_get_stations_synchronously,
mock_get_processed_file_count):
def test_process_gdps(
mock_download, mock_database, mock_get_actuals_left_outer_join_with_predictions, mock_get_stations_synchronously, mock_get_processed_file_count, monkeypatch: pytest.MonkeyPatch
):
""" run main method to see if it runs successfully. """
# All files, except one, are marked as already having been downloaded, so we expect one file to
# be processed.
monkeypatch.setattr(ClientSession, "get", default_mock_client_get)
sys.argv = ["argv", "GDPS"]
assert env_canada.process_models(StationSourceEnum.TEST) == 1
assert env_canada.process_models() == 1


def test_for_zero_day_bug(monkeypatch):
Expand Down
8 changes: 5 additions & 3 deletions api/app/tests/weather_models/test_env_canada_hrdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
import requests
import datetime
from aiohttp import ClientSession
from sqlalchemy.orm import Session
from pytest_mock import MockerFixture
from app.jobs.env_canada_utils import HRDPS_GRIB_LAYERS, get_high_res_model_run_download_urls
Expand All @@ -17,9 +18,9 @@
import app.jobs.common_model_fetchers
import app.weather_models.process_grib
from app.weather_models import ProjectionEnum
from app.stations import StationSourceEnum
from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl,
PredictionModelRunTimestamp)
from app.tests.common import default_mock_client_get
from app.tests.weather_models.test_env_canada_gdps import MockResponse


Expand Down Expand Up @@ -97,12 +98,13 @@ def test_get_hrdps_download_urls():


@pytest.mark.usefixtures('mock_get_processed_file_record')
def test_process_hrdps(mock_download, mock_database):
def test_process_hrdps(mock_download, mock_database, monkeypatch: pytest.MonkeyPatch):
""" run process method to see if it runs successfully. """
# All files, except one, are marked as already having been downloaded, so we expect one file to
# be processed.
monkeypatch.setattr(ClientSession, "get", default_mock_client_get)
sys.argv = ["argv", "HRDPS"]
assert app.jobs.env_canada.process_models(StationSourceEnum.TEST) == 1
assert app.jobs.env_canada.process_models() == 1


def test_main_fail(mocker: MockerFixture, monkeypatch):
Expand Down
9 changes: 5 additions & 4 deletions api/app/tests/weather_models/test_env_canada_rdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import pytest
import requests
from aiohttp import ClientSession
from typing import Optional
from sqlalchemy.orm import Session
from app.jobs.env_canada_utils import GRIB_LAYERS, get_regional_model_run_download_urls
Expand All @@ -13,9 +14,9 @@
import app.jobs.env_canada
import app.jobs.common_model_fetchers
import app.db.crud.weather_models
from app.stations import StationSourceEnum
from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl,
PredictionModelRunTimestamp)
from app.tests.common import default_mock_client_get
from app.tests.weather_models.test_env_canada_gdps import (MockResponse)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,10 +104,10 @@ def test_get_rdps_download_urls():


@pytest.mark.usefixtures('mock_get_processed_file_record')
def test_process_rdps(mock_download,
mock_database):
def test_process_rdps(mock_download, mock_database, monkeypatch: pytest.MonkeyPatch):
""" run main method to see if it runs successfully. """
# All files, except one, are marked as already having been downloaded, so we expect one file to
# be processed.
monkeypatch.setattr(ClientSession, "get", default_mock_client_get)
sys.argv = ["argv", "RDPS"]
assert app.jobs.env_canada.process_models(StationSourceEnum.TEST) == 1
assert app.jobs.env_canada.process_models() == 1
6 changes: 1 addition & 5 deletions api/app/tests/weather_models/test_process_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import math
import pytest
from app.geospatial import NAD83_CRS
from app.stations import StationSourceEnum
from app.tests.common import default_mock_client_get
from app.weather_models import process_grib

Expand Down Expand Up @@ -37,10 +36,7 @@ def test_read_single_raster_value(monkeypatch: pytest.MonkeyPatch):
geo_to_raster_transformer = process_grib.get_transformer(NAD83_CRS, crs)
padf_transform = process_grib.get_dataset_geometry(filename)

processor = process_grib.GribFileProcessor(StationSourceEnum.UNSPECIFIED,
padf_transform,
raster_to_geo_transformer,
geo_to_raster_transformer)
processor = process_grib.GribFileProcessor(padf_transform, raster_to_geo_transformer, geo_to_raster_transformer)

raster_band = dataset.GetRasterBand(1)
station, value = next(processor.yield_value_for_stations(raster_band))
Expand Down
Loading
Loading