From cef800ef6b84140439886459854d8f11127cc92e Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Fri, 22 Sep 2023 15:52:37 -0700 Subject: [PATCH 01/10] Test that verifies lookup of single value from raster in GribFileProcessor --- .../tests/weather_models/test_process_grib.py | 40 +++++++++++++++---- api/app/weather_models/process_grib.py | 17 ++++---- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/api/app/tests/weather_models/test_process_grib.py b/api/app/tests/weather_models/test_process_grib.py index 9da6bbfe5..7506bfa52 100644 --- a/api/app/tests/weather_models/test_process_grib.py +++ b/api/app/tests/weather_models/test_process_grib.py @@ -1,13 +1,10 @@ -import pytest import os from osgeo import gdal -from datetime import datetime, timezone -import requests - +from pyproj import CRS +import math +from app.geospatial import NAD83_CRS +from app.stations import StationSourceEnum from app.weather_models import process_grib -from app.weather_models import ModelEnum, ProjectionEnum -from app.jobs import noaa -from app.tests.weather_models.test_models_common import MockResponse sample_values_json = [{ @@ -60,3 +57,32 @@ def test_convert_mps_to_kph_zero_wind_speed(): metres_per_second_speed = 0 kilometres_per_hour_speed = process_grib.convert_mps_to_kph(metres_per_second_speed) assert kilometres_per_hour_speed == 0 + + +def test_read_single_raster_value(): + """ + Verified with gdallocationinfo CMC_reg_RH_TGL_2_ps10km_2020110500_P034.grib2 -wgs84 -120.4816667 50.6733333 + """ + filename = os.path.join(os.path.dirname(__file__), 'CMC_reg_RH_TGL_2_ps10km_2020110500_P034.grib2') + dataset = gdal.Open(filename, gdal.GA_ReadOnly) + + # Ensure that grib file uses EPSG:4269 (NAD83) coordinate system + # (this step is included because HRDPS grib files are in another coordinate system) + wkt = dataset.GetProjection() + crs = CRS.from_string(wkt) + raster_to_geo_transformer = process_grib.get_transformer(crs, NAD83_CRS) + geo_to_raster_transformer = process_grib.get_transformer(NAD83_CRS, crs) + padf_transform = process_grib.get_dataset_geometry(filename) + + processor = process_grib.GribFileProcessor(StationSourceEnum.UNSPECIFIED, + padf_transform, + raster_to_geo_transformer, + geo_to_raster_transformer) + + raster_band = dataset.GetRasterBand(1) + value = next(processor.yield_data_for_stations(raster_band)) + + assert math.isclose(value, 55.976, abs_tol=0.001) + + del dataset + dataset = None diff --git a/api/app/weather_models/process_grib.py b/api/app/weather_models/process_grib.py index 66444b397..ce2eab1fe 100644 --- a/api/app/weather_models/process_grib.py +++ b/api/app/weather_models/process_grib.py @@ -169,12 +169,16 @@ class GribFileProcessor(): """ Instances of this object can be used to process and ingest a grib file. """ - def __init__(self, station_source: StationSourceEnum): + def __init__(self, + station_source: StationSourceEnum, + padf_transform=None, + raster_to_geo_transformer=None, + geo_to_raster_transformer=None): # Get list of stations we're interested in, and store it so that we only call it once. self.stations = get_stations_synchronously(station_source) - self.padf_transform = None - self.raster_to_geo_transformer = None - self.geo_to_raster_transformer = None + self.padf_transform = padf_transform + self.raster_to_geo_transformer = raster_to_geo_transformer + self.geo_to_raster_transformer = geo_to_raster_transformer self.prediction_model: PredictionModel = None def yield_data_for_stations(self, raster_band: gdal.Dataset): @@ -188,13 +192,12 @@ def yield_data_for_stations(self, raster_band: gdal.Dataset): longitude, latitude, self.padf_transform, self.geo_to_raster_transformer) if 0 <= x_coordinate < raster_band.XSize and 0 <= y_coordinate < raster_band.YSize: - points, values = get_surrounding_grid( - raster_band, x_coordinate, y_coordinate) + value = raster_band.ReadAsArray(x_coordinate, y_coordinate, 1, 1)[0, 0] else: logger.warning('coordinate not in raster - %s', station) continue - yield (points, values) + yield value def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_band: gdal.Dataset, variable: str): """ Given a list of stations and 2 gdal datasets (one for u-component of wind, one for v-component From 53ba4ac523f7d8612f06f8bc95c229041cef1667 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 26 Sep 2023 16:41:45 -0700 Subject: [PATCH 02/10] Add singular weather model table --- ...ingular_value_weather_model_prediction_.py | 52 +++++++++++++++++++ api/app/db/models/weather_models.py | 30 +++++++++++ 2 files changed, 82 insertions(+) create mode 100644 api/alembic/versions/2442f07d975c_singular_value_weather_model_prediction_.py diff --git a/api/alembic/versions/2442f07d975c_singular_value_weather_model_prediction_.py b/api/alembic/versions/2442f07d975c_singular_value_weather_model_prediction_.py new file mode 100644 index 000000000..51ff04e3b --- /dev/null +++ b/api/alembic/versions/2442f07d975c_singular_value_weather_model_prediction_.py @@ -0,0 +1,52 @@ +"""singular value weather model prediction table + +Revision ID: 2442f07d975c +Revises: f49418d95584 +Create Date: 2023-09-26 16:14:01.658515 + +""" +from alembic import op +import sqlalchemy as sa +from app.db.models.common import TZTimeStamp + + +# revision identifiers, used by Alembic. +revision = '2442f07d975c' +down_revision = 'f49418d95584' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('model_run_predictions', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('prediction_model_run_timestamp_id', sa.Integer(), nullable=False), + sa.Column('prediction_timestamp', TZTimeStamp, nullable=False), + sa.Column('tmp_tgl_2', sa.Float(), nullable=True), + sa.Column('rh_tgl_2', sa.Float(), nullable=True), + sa.Column('apcp_sfc_0', sa.Float(), nullable=True), + sa.Column('wdir_tgl_10', sa.Float(), nullable=True), + sa.Column('wind_tgl_10', sa.Float(), nullable=True), + sa.ForeignKeyConstraint(['prediction_model_run_timestamp_id'], [ + 'prediction_model_run_timestamps.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp'), + comment='The prediction values of a particular model run.' + ) + op.create_index(op.f('ix_model_run_predictions_id'), 'model_run_predictions', ['id'], unique=False) + op.create_index(op.f('ix_model_run_predictions_prediction_model_run_timestamp_id'), + 'model_run_predictions', ['prediction_model_run_timestamp_id'], unique=False) + op.create_index(op.f('ix_model_run_predictions_prediction_timestamp'), + 'model_run_predictions', ['prediction_timestamp'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_model_run_predictions_prediction_timestamp'), table_name='model_run_predictions') + op.drop_index(op.f('ix_model_run_predictions_prediction_model_run_timestamp_id'), + table_name='model_run_predictions') + op.drop_index(op.f('ix_model_run_predictions_id'), table_name='model_run_predictions') + op.drop_table('model_run_predictions') + # ### end Alembic commands ### diff --git a/api/app/db/models/weather_models.py b/api/app/db/models/weather_models.py index 2c0298418..c63f377b5 100644 --- a/api/app/db/models/weather_models.py +++ b/api/app/db/models/weather_models.py @@ -165,6 +165,36 @@ def __str__(self): 'wind_tgl_10={self.wind_tgl_10}').format(self=self) +class ModelRunPrediction(Base): + """ The prediction for a particular model. + Each value is a numeric value that corresponds to the lat lon from the model raster """ + __tablename__ = 'model_run_predictions' + __table_args__ = ( + UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp'), + {'comment': 'The prediction values of a particular model run.'} + ) + + id = Column(Integer, Sequence('model_run_predictions_id_seq'), + primary_key=True, nullable=False, index=True) + # Which model run does this forecacst apply to? E.g. The GDPS 15x.15 run from 2020 07 07 12h00. + prediction_model_run_timestamp_id = Column(Integer, ForeignKey( + 'prediction_model_run_timestamps.id'), nullable=False, index=True) + prediction_model_run_timestamp = relationship( + "PredictionModelRunTimestamp", foreign_keys=[prediction_model_run_timestamp_id]) + # The date and time to which the prediction applies. + prediction_timestamp = Column(TZTimeStamp, nullable=False, index=True) + # Temperature 2m above model layer. + tmp_tgl_2 = Column(Float, nullable=True) + # Relative humidity 2m above model layer. + rh_tgl_2 = Column(Float, nullable=True) + # Accumulated precipitation (units kg.m^-2) + apcp_sfc_0 = Column(Float, nullable=True) + # Wind direction 10m above ground. + wdir_tgl_10 = Column(Float, nullable=True) + # Wind speed 10m above ground. + wind_tgl_10 = Column(Float, nullable=True) + + class WeatherStationModelPrediction(Base): """ The model prediction for a particular weather station. Based on values from ModelRunGridSubsetPrediction, but captures linear interpolations based on weather From c45d4b7f953272ced53970c12df491030db9796f Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 27 Sep 2023 11:13:41 -0700 Subject: [PATCH 03/10] single values for noaa and env canada model processing --- .../tests/weather_models/test_process_grib.py | 2 +- api/app/weather_models/process_grib.py | 83 ++++++++----------- 2 files changed, 35 insertions(+), 50 deletions(-) diff --git a/api/app/tests/weather_models/test_process_grib.py b/api/app/tests/weather_models/test_process_grib.py index 7506bfa52..9c8299767 100644 --- a/api/app/tests/weather_models/test_process_grib.py +++ b/api/app/tests/weather_models/test_process_grib.py @@ -80,7 +80,7 @@ def test_read_single_raster_value(): geo_to_raster_transformer) raster_band = dataset.GetRasterBand(1) - value = next(processor.yield_data_for_stations(raster_band)) + value = next(processor.yield_value_for_stations(raster_band)) assert math.isclose(value, 55.976, abs_tol=0.001) diff --git a/api/app/weather_models/process_grib.py b/api/app/weather_models/process_grib.py index ce2eab1fe..a132f8e27 100644 --- a/api/app/weather_models/process_grib.py +++ b/api/app/weather_models/process_grib.py @@ -7,7 +7,6 @@ import logging import logging.config from typing import List, Tuple, Optional -from sqlalchemy.dialects.postgresql import array from sqlalchemy.orm import Session from osgeo import gdal from pyproj import CRS, Transformer @@ -17,9 +16,9 @@ from app.geospatial import NAD83_CRS from app.stations import get_stations_synchronously, StationSourceEnum from app.db.models.weather_models import ( - PredictionModel, PredictionModelRunTimestamp, ModelRunGridSubsetPrediction) + ModelRunPrediction, PredictionModel, PredictionModelRunTimestamp) from app.db.crud.weather_models import ( - get_prediction_model, get_or_create_prediction_run, get_or_create_grid_subset) + get_prediction_model, get_or_create_prediction_run) from app.weather_models import ModelEnum, ProjectionEnum @@ -181,8 +180,8 @@ def __init__(self, self.geo_to_raster_transformer = geo_to_raster_transformer self.prediction_model: PredictionModel = None - def yield_data_for_stations(self, raster_band: gdal.Dataset): - """ Given a list of stations, and a gdal dataset, yield relevant data + def yield_value_for_stations(self, raster_band: gdal.Dataset): + """ Given a list of stations, and a gdal dataset, yield relevant data value """ for station in self.stations: longitude = station.long @@ -212,17 +211,16 @@ def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_ if 0 <= x_coordinate < u_raster_band.XSize and 0 <= x_coordinate < v_raster_band.XSize and\ 0 <= y_coordinate < u_raster_band.YSize and 0 <= y_coordinate < v_raster_band.YSize: - u_points, u_values = get_surrounding_grid(u_raster_band, x_coordinate, y_coordinate) - v_points, v_values = get_surrounding_grid(v_raster_band, x_coordinate, y_coordinate) - assert u_points == v_points - - zipped_uv_values = list(zip(u_values, v_values)) + u_value = u_raster_band.ReadAsArray(x_coordinate, y_coordinate, 1, 1)[0, 0] + v_value = v_raster_band.ReadAsArray(x_coordinate, y_coordinate, 1, 1)[0, 0] if variable == 'wdir_tgl_10': - yield self.get_wind_dir_values(u_points, zipped_uv_values) + yield calculate_wind_dir_from_u_v(u_value, v_value) elif variable == 'wind_tgl_10': - yield self.get_wind_speed_values(u_points, zipped_uv_values) + metres_per_second_speed = calculate_wind_speed_from_u_v(u_value, v_value) + kilometres_per_hour_speed = convert_mps_to_kph(metres_per_second_speed) + yield kilometres_per_hour_speed else: logger.warning('coordinate not in u/v wind rasters - %s', station) @@ -266,39 +264,26 @@ def get_variable_name(self, grib_info: ModelRunInfo) -> str: return variable_name - def store_bounding_values(self, - points, - values, - preduction_model_run: PredictionModelRunTimestamp, - grib_info: ModelRunInfo, - session: Session): + def store_prediction_value(self, + value: float, + preduction_model_run: PredictionModelRunTimestamp, + grib_info: ModelRunInfo, + session: Session): """ Store the values around the area of interest. """ - # Convert points to geographic coordinates: - geographic_points = [] - for point in points: - geographic_points.append( - calculate_geographic_coordinate(point, self.padf_transform, self.raster_to_geo_transformer)) - # Get the grid subset, i.e. the relevant bounding area for this particular model. - grid_subset = get_or_create_grid_subset( - session, self.prediction_model, geographic_points) - # Load the record if it exists. - prediction = session.query(ModelRunGridSubsetPrediction).\ + prediction = session.query(ModelRunPrediction).\ filter( - ModelRunGridSubsetPrediction.prediction_model_run_timestamp_id == preduction_model_run.id).\ - filter(ModelRunGridSubsetPrediction.prediction_timestamp == grib_info.prediction_timestamp).\ - filter(ModelRunGridSubsetPrediction.prediction_model_grid_subset_id == - grid_subset.id).first() + ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id).\ + filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp).first() if not prediction: # Record doesn't exist, so we create it. - prediction = ModelRunGridSubsetPrediction() + prediction = ModelRunPrediction() prediction.prediction_model_run_timestamp_id = preduction_model_run.id prediction.prediction_timestamp = grib_info.prediction_timestamp - prediction.prediction_model_grid_subset_id = grid_subset.id variable_name = self.get_variable_name(grib_info) - setattr(prediction, variable_name, array(values)) + setattr(prediction, variable_name, value) session.add(prediction) session.commit() @@ -307,13 +292,13 @@ def process_env_can_grib_file(self, session: Session, dataset, grib_info: ModelR # for GDPS, RDPS, HRDPS models, always only ever 1 raster band in the dataset raster_band = dataset.GetRasterBand(1) # Iterate through stations: - for (points, values) in self.yield_data_for_stations(raster_band): + for value in self.yield_value_for_stations(raster_band): # Convert wind speed from metres per second to kilometres per hour for Environment Canada # models (NOAA models handled elswhere) if grib_info.variable_name.lower().startswith("wind_agl") or grib_info.variable_name.lower().startswith('wind_tgl'): - values = [convert_mps_to_kph(value) for value in values] - self.store_bounding_values( - points, values, prediction_run, grib_info, session) + value = convert_mps_to_kph(value) + + self.store_prediction_value(value, prediction_run, grib_info, session) def get_raster_bands(self, dataset, grib_info: ModelRunInfo): """ Returns raster bands of dataset for temperature, RH, U/V wind components, and @@ -348,22 +333,22 @@ def process_noaa_grib_file(self, session: Session, dataset, grib_info: ModelRunI tmp_raster_band, rh_raster_band, u_wind_raster_band, v_wind_raster_band, precip_raster_band = self.get_raster_bands( dataset, grib_info) - for (tmp_points, tmp_values) in self.yield_data_for_stations(tmp_raster_band): + for tmp_value in self.yield_value_for_stations(tmp_raster_band): grib_info.variable_name = 'tmp_tgl_2' - self.store_bounding_values(tmp_points, tmp_values, prediction_run, grib_info, session) - for (rh_points, rh_values) in self.yield_data_for_stations(rh_raster_band): + self.store_prediction_value(tmp_value, prediction_run, grib_info, session) + for rh_value in self.yield_value_for_stations(rh_raster_band): grib_info.variable_name = 'rh_tgl_2' - self.store_bounding_values(rh_points, rh_values, prediction_run, grib_info, session) + self.store_prediction_value(rh_value, prediction_run, grib_info, session) if precip_raster_band: - for (apcp_points, apcp_values) in self.yield_data_for_stations(precip_raster_band): + for apcp_value in self.yield_value_for_stations(precip_raster_band): grib_info.variable_name = 'apcp_sfc_0' - self.store_bounding_values(apcp_points, apcp_values, prediction_run, grib_info, session) - for wdir_points, wdir_values in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'): + self.store_prediction_value(apcp_value, prediction_run, grib_info, session) + for wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'): grib_info.variable_name = 'wdir_tgl_10' - self.store_bounding_values(wdir_points, wdir_values, prediction_run, grib_info, session) - for (wind_points, wind_values) in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'): + self.store_prediction_value(wdir_value, prediction_run, grib_info, session) + for wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'): grib_info.variable_name = 'wind_tgl_10' - self.store_bounding_values(wind_points, wind_values, prediction_run, grib_info, session) + self.store_prediction_value(wind_value, prediction_run, grib_info, session) def process_grib_file(self, filename, grib_info: ModelRunInfo, session: Session): """ Process a grib file, extracting and storing relevant information. """ From 29dfb5dd546479503b460f69ec2affda7fb4069e Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 27 Sep 2023 15:55:09 -0700 Subject: [PATCH 04/10] Adjust machine learning to use single values --- api/app/db/crud/observations.py | 13 ++++--- .../tests/weather_models/test_process_grib.py | 1 - api/app/weather_models/__init__.py | 34 +++++++------------ api/app/weather_models/machine_learning.py | 6 ++-- 4 files changed, 21 insertions(+), 33 deletions(-) diff --git a/api/app/db/crud/observations.py b/api/app/db/crud/observations.py index cf9c4dd59..50c04444f 100644 --- a/api/app/db/crud/observations.py +++ b/api/app/db/crud/observations.py @@ -5,7 +5,7 @@ from sqlalchemy import and_, select from sqlalchemy.sql import func from sqlalchemy.orm import Session -from app.db.models.weather_models import ModelRunGridSubsetPrediction, PredictionModelRunTimestamp +from app.db.models.weather_models import ModelRunPrediction, PredictionModelRunTimestamp from app.db.models.observations import HourlyActual @@ -31,19 +31,18 @@ def get_hourly_actuals( def get_actuals_left_outer_join_with_predictions( - session: Session, model_id: int, grid_id: int, station_code: int, + session: Session, model_id: int, station_code: int, start_date: datetime, end_date: datetime): """ NOTE: Can improve this query by only returning the most recent prediction, maybe using nested queries. It works for now - but things could be faster. """ - return session.query(HourlyActual, ModelRunGridSubsetPrediction)\ - .outerjoin(ModelRunGridSubsetPrediction, - and_(ModelRunGridSubsetPrediction.prediction_timestamp == HourlyActual.weather_date, - ModelRunGridSubsetPrediction.prediction_model_grid_subset_id == grid_id))\ + return session.query(HourlyActual, ModelRunPrediction)\ + .outerjoin(ModelRunPrediction, + and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date))\ .outerjoin(PredictionModelRunTimestamp, and_(PredictionModelRunTimestamp.id == - ModelRunGridSubsetPrediction.prediction_model_run_timestamp_id, + ModelRunPrediction.prediction_model_run_timestamp_id, PredictionModelRunTimestamp.prediction_model_id == model_id))\ .filter(HourlyActual.station_code == station_code)\ .filter(HourlyActual.weather_date >= start_date)\ diff --git a/api/app/tests/weather_models/test_process_grib.py b/api/app/tests/weather_models/test_process_grib.py index 9c8299767..705c701ce 100644 --- a/api/app/tests/weather_models/test_process_grib.py +++ b/api/app/tests/weather_models/test_process_grib.py @@ -85,4 +85,3 @@ def test_read_single_raster_value(): assert math.isclose(value, 55.976, abs_tol=0.001) del dataset - dataset = None diff --git a/api/app/weather_models/__init__.py b/api/app/weather_models/__init__.py index 3548cce7c..5713bf951 100644 --- a/api/app/weather_models/__init__.py +++ b/api/app/weather_models/__init__.py @@ -1,11 +1,10 @@ """ Code common to app.weather_models.fetch """ from datetime import datetime from enum import Enum -from typing import List import logging from scipy.interpolate import interp1d -from app.db.models.weather_models import ModelRunGridSubsetPrediction +from app.db.models.weather_models import ModelRunPrediction logger = logging.getLogger(__name__) @@ -38,7 +37,7 @@ class ProjectionEnum(str, Enum): def interpolate_between_two_points( - x1: int, x2: int, y1: List[float], y2: List[float], xn: int): + x1: int, x2: int, y1: float, y2: float, xn: int): """ Interpolate values between two points in time. :param x1: X coordinate of the 1st value. :param x2: X coordinate of the 2nd value. @@ -51,12 +50,7 @@ def interpolate_between_two_points( # Prepare x-axis (time). x_axis = [x1, x2] # Prepare y-axis (values). - y_axis = [ - [y1[0], y2[0]], - [y1[1], y2[1]], - [y1[2], y2[2]], - [y1[3], y2[3]] - ] + y_axis = [y1, y2] # Create interpolation function. function = interp1d(x_axis, y_axis, kind='linear') @@ -99,31 +93,27 @@ def interpolate_bearing(time_a: datetime, time_b: datetime, target_time: datetim return interpolated_value -def interpolate_wind_direction(prediction_a: ModelRunGridSubsetPrediction, - prediction_b: ModelRunGridSubsetPrediction, +def interpolate_wind_direction(prediction_a: ModelRunPrediction, + prediction_b: ModelRunPrediction, target_timestamp: datetime): """ Interpolate wind direction """ if prediction_a.wdir_tgl_10 is None or prediction_b.wdir_tgl_10 is None: # There's nothing to interpolate! return None - result = [] - for wdir_tgl_10_a, wdir_tgl_10_b in zip(prediction_a.wdir_tgl_10, prediction_b.wdir_tgl_10): - interpolated_wdir = interpolate_bearing(prediction_a.prediction_timestamp, - prediction_b.prediction_timestamp, target_timestamp, - wdir_tgl_10_a, wdir_tgl_10_b) + interpolated_wdir = interpolate_bearing(prediction_a.prediction_timestamp, + prediction_b.prediction_timestamp, target_timestamp, + prediction_a.wdir_tgl_10, prediction_b.wdir_tgl_10) - result.append(interpolated_wdir) + return interpolated_wdir - return result - -def construct_interpolated_noon_prediction(prediction_a: ModelRunGridSubsetPrediction, - prediction_b: ModelRunGridSubsetPrediction): +def construct_interpolated_noon_prediction(prediction_a: ModelRunPrediction, + prediction_b: ModelRunPrediction): """ Construct a noon prediction by interpolating. """ # create a noon prediction. (using utc hour 20, as that is solar noon in B.C.) - noon_prediction = ModelRunGridSubsetPrediction() + noon_prediction = ModelRunPrediction() noon_prediction.prediction_timestamp = prediction_a.prediction_timestamp.replace( hour=20) # throw timestamps into their own variables. diff --git a/api/app/weather_models/machine_learning.py b/api/app/weather_models/machine_learning.py index 4ab83f3f4..1c51bf7af 100644 --- a/api/app/weather_models/machine_learning.py +++ b/api/app/weather_models/machine_learning.py @@ -11,7 +11,7 @@ from sqlalchemy.orm import Session from app.weather_models import SCALAR_MODEL_VALUE_KEYS, construct_interpolated_noon_prediction from app.db.models.weather_models import ( - PredictionModel, PredictionModelGridSubset, ModelRunGridSubsetPrediction) + PredictionModel, PredictionModelGridSubset, ModelRunPrediction) from app.db.models.observations import HourlyActual from app.db.crud.observations import get_actuals_left_outer_join_with_predictions @@ -147,7 +147,7 @@ def __init__(self, self.max_days_to_learn = MAX_DAYS_TO_LEARN def _add_sample_to_collection(self, - prediction: ModelRunGridSubsetPrediction, + prediction: ModelRunPrediction, actual: HourlyActual, sample_collection: SampleCollection): """ Take the provided prediction and observed value, adding them to the collection of samples """ @@ -179,7 +179,7 @@ def _collect_data(self): # Query actuals, with prediction left outer joined (so if there isn't a prediction, you'll # get an actual, but prediction will be None) query = get_actuals_left_outer_join_with_predictions( - self.session, self.model.id, self.grid.id, self.station_code, start_date, self.max_learn_date) + self.session, self.model.id, self.station_code, start_date, self.max_learn_date) # We need to keep track of previous so that we can do interpolation for the global model. prev_actual = None prev_prediction = None From 12163a4a706d6e0b428c0cbdaa8c12d1a2ceeb70 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 27 Sep 2023 17:15:27 -0700 Subject: [PATCH 05/10] Fix tests --- api/app/db/crud/weather_models.py | 77 ++------------- api/app/jobs/common_model_fetchers.py | 96 ++++++++----------- api/app/tests/weather_models/crud.py | 34 +++---- .../weather_models/test_env_canada_gdps.py | 37 +------ .../test_machine_learning.feature | 18 ---- .../weather_models/test_machine_learning.py | 81 ++++++---------- api/app/weather_models/machine_learning.py | 29 ++---- 7 files changed, 104 insertions(+), 268 deletions(-) delete mode 100644 api/app/tests/weather_models/test_machine_learning.feature diff --git a/api/app/db/crud/weather_models.py b/api/app/db/crud/weather_models.py index 440448243..2b613d59e 100644 --- a/api/app/db/crud/weather_models.py +++ b/api/app/db/crud/weather_models.py @@ -9,7 +9,7 @@ from app.weather_models import ModelEnum, ProjectionEnum from app.db.models.weather_models import ( ProcessedModelRunUrl, PredictionModel, PredictionModelRunTimestamp, PredictionModelGridSubset, - ModelRunGridSubsetPrediction, WeatherStationModelPrediction, MoreCast2MaterializedView) + ModelRunPrediction, WeatherStationModelPrediction, MoreCast2MaterializedView) import app.utils.time as time_utils logger = logging.getLogger(__name__) @@ -118,25 +118,14 @@ def get_grids_for_coordinate(session: Session, return query -def get_model_run_predictions_for_grid(session: Session, - prediction_run: PredictionModelRunTimestamp, - grid: PredictionModelGridSubset) -> List: - """ Get all the predictions for a provided model run and grid. """ - logger.info("Getting model predictions for grid %s", grid) - return session.query(ModelRunGridSubsetPrediction).\ - filter(ModelRunGridSubsetPrediction.prediction_model_grid_subset_id == grid.id).\ - filter(ModelRunGridSubsetPrediction.prediction_model_run_timestamp_id == +def get_model_run_predictions(session: Session, + prediction_run: PredictionModelRunTimestamp) -> List: + """ Get all the predictions for a provided model run """ + logger.info("Getting model predictions for grid %s", prediction_run) + return session.query(ModelRunPrediction).\ + filter(ModelRunPrediction.prediction_model_run_timestamp_id == prediction_run.id).\ - order_by(ModelRunGridSubsetPrediction.prediction_timestamp) - - -def delete_model_run_grid_subset_predictions(session: Session, older_than: datetime): - """ Delete any grid subset prediction older than a certain date. - """ - logger.info('Deleting grid subset data older than %s...', older_than) - session.query(ModelRunGridSubsetPrediction)\ - .filter(ModelRunGridSubsetPrediction.prediction_timestamp < older_than)\ - .delete() + order_by(ModelRunPrediction.prediction_timestamp) def delete_weather_station_model_predictions(session: Session, older_than: datetime): @@ -148,56 +137,6 @@ def delete_weather_station_model_predictions(session: Session, older_than: datet .delete() -def get_model_run_predictions( - session: Session, - prediction_run: PredictionModelRunTimestamp, - coordinates) -> List: - """ - Get the predictions for a particular model run, for a specified geographical coordinate. - - Returns a PredictionModelGridSubset with joined Prediction and PredictionValueType.""" - # condition for query: are coordinates within the saved grids - geom_or = _construct_grid_filter(coordinates) - - # We are only interested in predictions from now onwards - now = time_utils.get_utc_now() - - # Build up the query: - query = session.query(PredictionModelGridSubset, ModelRunGridSubsetPrediction).\ - filter(geom_or).\ - filter(ModelRunGridSubsetPrediction.prediction_model_run_timestamp_id == prediction_run.id).\ - filter(ModelRunGridSubsetPrediction.prediction_model_grid_subset_id == PredictionModelGridSubset.id).\ - filter(ModelRunGridSubsetPrediction.prediction_timestamp >= now).\ - order_by(PredictionModelGridSubset.id, - ModelRunGridSubsetPrediction.prediction_timestamp.asc()) - return query - - -def get_predictions_from_coordinates(session: Session, coordinates: List, model: str) -> List: - """ Get the predictions for a particular model, at a specified geographical coordinate. """ - # condition for query: are coordinates within the saved grids - geom_or = _construct_grid_filter(coordinates) - - # We are only interested in the last 5 days. - now = time_utils.get_utc_now() - back_5_days = now - datetime.timedelta(days=5) - - # Build the query: - query = session.query(PredictionModelGridSubset, - ModelRunGridSubsetPrediction, - PredictionModel).\ - filter(geom_or).\ - filter(ModelRunGridSubsetPrediction.prediction_timestamp >= back_5_days, - ModelRunGridSubsetPrediction.prediction_timestamp <= now).\ - filter(PredictionModelGridSubset.id == - ModelRunGridSubsetPrediction.prediction_model_grid_subset_id).\ - filter(PredictionModelGridSubset.prediction_model_id == PredictionModel.id, - PredictionModel.abbreviation == model).\ - order_by(PredictionModelGridSubset.id, - ModelRunGridSubsetPrediction.prediction_timestamp.asc()) - return query - - def get_station_model_predictions_order_by_prediction_timestamp( session: Session, station_codes: List, diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index b98b45363..2df4cce6b 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -10,10 +10,9 @@ from app.db.crud.weather_models import (get_processed_file_record, get_processed_file_count, get_prediction_model_run_timestamp_records, - get_model_run_predictions_for_grid, + get_model_run_predictions, get_grids_for_coordinate, get_weather_station_model_prediction, - delete_model_run_grid_subset_predictions, delete_weather_station_model_predictions, refresh_morecast2_materialized_view) from app.weather_models.machine_learning import StationMachineLearning @@ -24,7 +23,7 @@ from app.utils.redis import create_redis from app.stations import get_stations_synchronously, StationSourceEnum from app.db.models.weather_models import (ProcessedModelRunUrl, PredictionModelRunTimestamp, - WeatherStationModelPrediction, ModelRunGridSubsetPrediction) + WeatherStationModelPrediction, ModelRunPrediction) import app.db.database from app.db.crud.observations import get_accumulated_precipitation @@ -164,17 +163,16 @@ def apply_data_retention_policy(): # Currently we're using 19 days of data for machine learning, so # keeping 21 days (3 weeks) of historic data is sufficient. oldest_to_keep = time_utils.get_utc_now() - time_utils.data_retention_threshold - delete_model_run_grid_subset_predictions(session, oldest_to_keep) delete_weather_station_model_predictions(session, oldest_to_keep) -def accumulate_nam_precipitation(nam_cumulative_precip: list[float], prediction: ModelRunGridSubsetPrediction, model_run_hour: int): +def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: ModelRunPrediction, model_run_hour: int): """ Calculate overall cumulative precip and cumulative precip for the current prediction. """ # 00 and 12 hour model runs accumulate precipitation in 12 hour intervals, 06 and 18 hour accumulate in # 3 hour intervals nam_accumulation_interval = 3 if model_run_hour == 6 or model_run_hour == 18 else 12 cumulative_precip = nam_cumulative_precip - prediction_precip = prediction.apcp_sfc_0 or [0.0, 0.0, 0.0, 0.0] + prediction_precip = prediction.apcp_sfc_0 or 0.0 current_precip = numpy.add(nam_cumulative_precip, prediction_precip) if prediction.prediction_timestamp.hour % nam_accumulation_interval == 0: # If we're on an 'accumulation interval', update the cumulative precip @@ -210,7 +208,7 @@ def _process_model_run(self, model_run: PredictionModelRunTimestamp, model_type: logger.info('done commit.') def _process_prediction(self, - prediction: ModelRunGridSubsetPrediction, + prediction: ModelRunPrediction, station: WeatherStation, model_run: PredictionModelRunTimestamp, machine: StationMachineLearning): @@ -229,26 +227,26 @@ def _process_prediction(self, # 2020 Dec 15, Sybrand: Encountered situation where tmp_tgl_2 was None, add this workaround for it. # NOTE: Not sure why this value would ever be None. This could happen if for whatever reason, the # tmp_tgl_2 layer failed to download and process, while other layers did. - if prediction.tmp_tgl_2 is None or len(prediction.tmp_tgl_2) == 0: + if prediction.tmp_tgl_2 is None: logger.warning('tmp_tgl_2 is None or empty for ModelRunGridSubsetPrediction.id == %s', prediction.id) else: - station_prediction.tmp_tgl_2 = prediction.tmp_tgl_2[0] + station_prediction.tmp_tgl_2 = prediction.tmp_tgl_2 # 2020 Dec 10, Sybrand: Encountered situation where rh_tgl_2 was None, add this workaround for it. # NOTE: Not sure why this value would ever be None. This could happen if for whatever reason, the # rh_tgl_2 layer failed to download and process, while other layers did. - if prediction.rh_tgl_2 is None or len(prediction.rh_tgl_2) == 0: + if prediction.rh_tgl_2 is None: # This is unexpected, so we log it. - logger.warning('rh_tgl_2 is None or empty for ModelRunGridSubsetPrediction.id == %s', prediction.id) + logger.warning('rh_tgl_2 is None for ModelRunGridSubsetPrediction.id == %s', prediction.id) station_prediction.rh_tgl_2 = None else: - station_prediction.rh_tgl_2 = prediction.rh_tgl_2[0] + station_prediction.rh_tgl_2 = prediction.rh_tgl_2 # Check that apcp_sfc_0 is None, since accumulated precipitation # does not exist for 00 hour. if prediction.apcp_sfc_0 is None: station_prediction.apcp_sfc_0 = 0.0 else: - station_prediction.apcp_sfc_0 = prediction.apcp_sfc_0[0] + station_prediction.apcp_sfc_0 = prediction.apcp_sfc_0 # Calculate the delta_precipitation based on station's previous prediction_timestamp # for the same model run self.session.flush() @@ -288,7 +286,7 @@ def _process_prediction(self, self.session.add(station_prediction) def _calculate_past_24_hour_precip(self, station: WeatherStation, model_run: PredictionModelRunTimestamp, - prediction: ModelRunGridSubsetPrediction, station_prediction: WeatherStationModelPrediction): + prediction: ModelRunPrediction, station_prediction: WeatherStationModelPrediction): """ Calculate the predicted precipitation over the previous 24 hours within the specified model run. If the model run does not contain a prediction timestamp for 24 hours prior to the current prediction, return the predicted precipitation from the previous run of the same model for the same time frame. """ @@ -343,49 +341,35 @@ def _process_model_run_for_station(self, # Lookup the grid our weather station is in. logger.info("Getting grid for coordinate %s and model %s", coordinate, model_run.prediction_model) - # There should never be more than one grid per model - but it can happen. - # TODO: Re-factor away the need for the grid table entirely. - grid_query = get_grids_for_coordinate( - self.session, model_run.prediction_model, coordinate) - - for grid in grid_query: - # Convert the grid database object to a polygon object. - poly = to_shape(grid.geom) - # Extract the vertices of the polygon. - points = list(poly.exterior.coords)[:-1] - - machine = StationMachineLearning( - session=self.session, - model=model_run.prediction_model, - grid=grid, - points=points, - target_coordinate=coordinate, - station_code=station.code, - max_learn_date=model_run.prediction_run_timestamp) - machine.learn() - - # Get all the predictions associated to this particular model run, in the grid. - query = get_model_run_predictions_for_grid( - self.session, model_run, grid) - - nam_cumulative_precip = [0.0, 0.0, 0.0, 0.0] - # Iterate through all the predictions. - prev_prediction = None - - for prediction in query: - # NAM model requires manual calculation of cumulative precip - if model_type == ModelEnum.NAM: - nam_cumulative_precip, prediction.apcp_sfc_0 = accumulate_nam_precipitation( - nam_cumulative_precip, prediction, model_run.prediction_run_timestamp.hour) - if (prev_prediction is not None - and prev_prediction.prediction_timestamp.hour == 18 - and prediction.prediction_timestamp.hour == 21): - noon_prediction = construct_interpolated_noon_prediction(prev_prediction, prediction) - self._process_prediction( - noon_prediction, station, model_run, machine) + machine = StationMachineLearning( + session=self.session, + model=model_run.prediction_model, + target_coordinate=coordinate, + station_code=station.code, + max_learn_date=model_run.prediction_run_timestamp) + machine.learn() + + # Get all the predictions associated to this particular model run. + query = get_model_run_predictions(self.session, model_run) + + nam_cumulative_precip = 0.0 + # Iterate through all the predictions. + prev_prediction = None + + for prediction in query: + # NAM model requires manual calculation of cumulative precip + if model_type == ModelEnum.NAM: + nam_cumulative_precip, prediction.apcp_sfc_0 = accumulate_nam_precipitation( + nam_cumulative_precip, prediction, model_run.prediction_run_timestamp.hour) + if (prev_prediction is not None + and prev_prediction.prediction_timestamp.hour == 18 + and prediction.prediction_timestamp.hour == 21): + noon_prediction = construct_interpolated_noon_prediction(prev_prediction, prediction) self._process_prediction( - prediction, station, model_run, machine) - prev_prediction = prediction + noon_prediction, station, model_run, machine) + self._process_prediction( + prediction, station, model_run, machine) + prev_prediction = prediction def _mark_model_run_interpolated(self, model_run: PredictionModelRunTimestamp): """ Having completely processed a model run, we can mark it has having been interpolated. diff --git a/api/app/tests/weather_models/crud.py b/api/app/tests/weather_models/crud.py index d78165ccc..3966ea984 100644 --- a/api/app/tests/weather_models/crud.py +++ b/api/app/tests/weather_models/crud.py @@ -1,7 +1,7 @@ """ Some crud responses used to mock our calls to app.db.crud """ from datetime import datetime -from app.db.models.weather_models import ModelRunGridSubsetPrediction +from app.db.models.weather_models import ModelRunPrediction from app.db.models.observations import HourlyActual @@ -16,10 +16,10 @@ def get_actuals_left_outer_join_with_predictions(*args): temp_valid=True, relative_humidity=50, rh_valid=True), - ModelRunGridSubsetPrediction( - tmp_tgl_2=[2, 3, 4, 5], - rh_tgl_2=[10, 20, 30, 40], - apcp_sfc_0=[2, 4, 3, 6], + ModelRunPrediction( + tmp_tgl_2=2, + rh_tgl_2=10, + apcp_sfc_0=2, prediction_timestamp=datetime(2020, 10, 10, 18))], [HourlyActual(weather_date=datetime(2020, 10, 10, 19)), None], [HourlyActual(weather_date=datetime(2020, 10, 10, 20), @@ -33,10 +33,10 @@ def get_actuals_left_outer_join_with_predictions(*args): temp_valid=True, relative_humidity=100, rh_valid=True), - ModelRunGridSubsetPrediction( - tmp_tgl_2=[1, 2, 3, 4], - rh_tgl_2=[20, 30, 40, 50], - apcp_sfc_0=[3, 6, 3, 4], + ModelRunPrediction( + tmp_tgl_2=1, + rh_tgl_2=20, + apcp_sfc_0=3, prediction_timestamp=datetime(2020, 10, 10, 21))], # day 2 [HourlyActual( @@ -45,10 +45,10 @@ def get_actuals_left_outer_join_with_predictions(*args): temp_valid=True, relative_humidity=50, rh_valid=True), - ModelRunGridSubsetPrediction( - tmp_tgl_2=[2, 3, 4, 5], - rh_tgl_2=[10, 20, 30, 40], - apcp_sfc_0=[2, 4, 3, 6], + ModelRunPrediction( + tmp_tgl_2=2, + rh_tgl_2=10, + apcp_sfc_0=2, prediction_timestamp=datetime(2020, 10, 11, 18))], [HourlyActual(weather_date=datetime(2020, 10, 11, 19)), None], [HourlyActual(weather_date=datetime(2020, 10, 11, 20), @@ -62,10 +62,10 @@ def get_actuals_left_outer_join_with_predictions(*args): temp_valid=True, relative_humidity=100, rh_valid=True), - ModelRunGridSubsetPrediction( - tmp_tgl_2=[1, 2, 3, 4], - rh_tgl_2=[20, 30, 40, 50], - apcp_sfc_0=[3, 6, 3, 4], + ModelRunPrediction( + tmp_tgl_2=1, + rh_tgl_2=20, + apcp_sfc_0=3, prediction_timestamp=datetime(2020, 10, 11, 21))] ] return result diff --git a/api/app/tests/weather_models/test_env_canada_gdps.py b/api/app/tests/weather_models/test_env_canada_gdps.py index 161810c6e..e19a17c2b 100644 --- a/api/app/tests/weather_models/test_env_canada_gdps.py +++ b/api/app/tests/weather_models/test_env_canada_gdps.py @@ -16,8 +16,8 @@ from app.weather_models import machine_learning import app.db.crud.weather_models from app.stations import StationSourceEnum -from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, PredictionModelRunTimestamp, - ModelRunGridSubsetPrediction, PredictionModelGridSubset) +from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, + PredictionModelRunTimestamp, PredictionModelGridSubset) from app.tests.weather_models.crud import get_actuals_left_outer_join_with_predictions from app.tests.weather_models.test_models_common import (MockResponse, mock_get_processed_file_count, mock_get_stations) @@ -39,38 +39,6 @@ def get_processed_file_record(session: Session, url: str): monkeypatch.setattr(env_canada, 'get_processed_file_record', get_processed_file_record) -@pytest.fixture() -def mock_get_model_run_predictions_for_grid(monkeypatch): - """ Mock out call to DB returning predictions """ - def mock_get(*args): - result = [ - ModelRunGridSubsetPrediction( - tmp_tgl_2=[2, 3, 4, 5], - rh_tgl_2=[10, 20, 30, 40], - apcp_sfc_0=[2, 4, 3, 6], - wdir_tgl_10=[10, 20, 30, 40], - wind_tgl_10=[1, 2, 3, 4], - prediction_timestamp=datetime(2020, 10, 10, 18)), - ModelRunGridSubsetPrediction( - tmp_tgl_2=[1, 2, 3, 4], - rh_tgl_2=[20, 30, 40, 50], - apcp_sfc_0=[3, 6, 3, 4], - wdir_tgl_10=[280, 290, 300, 310], - wind_tgl_10=[5, 6, 7, 8], - prediction_timestamp=datetime(2020, 10, 10, 21)), - ModelRunGridSubsetPrediction( - tmp_tgl_2=[1, 2, 3, 4], - rh_tgl_2=None, - apcp_sfc_0=[3, 6, 3, 4], - wdir_tgl_10=[20, 30, 40, 50], - wind_tgl_10=[4, 3, 2, 1], - prediction_timestamp=datetime(2020, 10, 10, 21)) - ] - return result - monkeypatch.setattr( - common_model_fetchers, 'get_model_run_predictions_for_grid', mock_get) - - @pytest.fixture() def mock_get_actuals_left_outer_join_with_predictions(monkeypatch): """ Mock out call to DB returning actuals macthed with predictions """ @@ -162,7 +130,6 @@ def mock_get_stations_synchronously(monkeypatch): @pytest.mark.usefixtures('mock_get_processed_file_record') def test_process_gdps(mock_download, mock_database, - mock_get_model_run_predictions_for_grid, mock_get_actuals_left_outer_join_with_predictions, mock_get_stations_synchronously, mock_get_processed_file_count): diff --git a/api/app/tests/weather_models/test_machine_learning.feature b/api/app/tests/weather_models/test_machine_learning.feature deleted file mode 100644 index e89a929db..000000000 --- a/api/app/tests/weather_models/test_machine_learning.feature +++ /dev/null @@ -1,18 +0,0 @@ -Feature: Linear regression for weather - - Scenario: Learn weather - Given An instance of StationMachineLearning - When The machine learns - Then The model_temp: for results in - And The model_rh: for results in - Examples: - | model_temp | model_rh | timestamp | bias_adjusted_temp | bias_adjusted_rh | - # using a timestamp with sample data, we should get some bias adjusted values: - | 20 | 50 | 2020-09-03T21:14:51.939836+00:00 | 30 | 100 | - # using a timestamp without any samples, we should get None for the bias adjusted values: - | 20 | 50 | 2020-09-03T01:14:51.939836+00:00 | None | None | -# | model_temp | model_rh | frog | bias_adjusted_temp | bias_adjusted_rh | coordinate | points | -# using a timestamp with sample data, we should get some bias adjusted values: -# | 20 | 50 | 2020-09-03T21:14:51.939836+00:00 | 30 | 100 | [-120.4816667, 50.6733333] | [[-120.52499999999998, 50.775000000000006], [-120.37499999999997, 50.775000000000006], [-120.37499999999997, 50.62500000000001], [-120.52499999999998, 50.62500000000001]] | -# using a timestamp without any samples, we should get None for the bias adjusted values: -# | 20 | 50 | 2020-09-03T01:14:51.939836+00:00 | None | None | [-120.4816667, 50.6733333] | [[-120.52499999999998, 50.775000000000006], [-120.37499999999997, 50.775000000000006], [-120.37499999999997, 50.62500000000001], [-120.52499999999998, 50.62500000000001]] | \ No newline at end of file diff --git a/api/app/tests/weather_models/test_machine_learning.py b/api/app/tests/weather_models/test_machine_learning.py index ca8c00517..28b80f6d8 100644 --- a/api/app/tests/weather_models/test_machine_learning.py +++ b/api/app/tests/weather_models/test_machine_learning.py @@ -1,14 +1,9 @@ -""" Test machine learning code - collecting data, learning from data, and predicting a bias adjusted -result. -""" from datetime import datetime import pytest -from pytest_bdd import scenario, given, parsers, then, when -from app.tests import _load_json_file -from app.db.models.weather_models import PredictionModel, PredictionModelGridSubset from app.weather_models import machine_learning from app.tests.weather_models.crud import get_actuals_left_outer_join_with_predictions -from app.tests.common import str2float +from app.db.models.weather_models import PredictionModel +from app.weather_models.machine_learning import StationMachineLearning @pytest.fixture() @@ -18,57 +13,41 @@ def mock_get_actuals_left_outer_join_with_predictions(monkeypatch): get_actuals_left_outer_join_with_predictions) -@pytest.mark.usefixtures('mock_get_actuals_left_outer_join_with_predictions') -@scenario("test_machine_learning.feature", "Learn weather") -def test_machine_learning(): - """ BDD Scenario for predictions """ +def test_bias_adjustment_with_samples(mock_get_actuals_left_outer_join_with_predictions): + predict_date_with_samples = datetime.fromisoformat("2020-09-03T21:14:51.939836+00:00") - -@given(parsers.parse("An instance of StationMachineLearning"), - target_fixture='instance') -def given_an_instance() -> machine_learning.StationMachineLearning: - """ Bind the data variable """ - # super weird bug? with pytest_bdd that hooked into isoformat on the coordinate and points fields. - # tried forever to figure out why - and gave up in the end. removed coordinate and point from - # feature file and just loading it in here. - coordinate = _load_json_file(__file__, 'coordinate.json') - points = _load_json_file(__file__, 'points.json') - return machine_learning.StationMachineLearning( + machine_learner = StationMachineLearning( session=None, model=PredictionModel(id=1), - grid=PredictionModelGridSubset(id=1), - points=points, - target_coordinate=coordinate, + target_coordinate=[ + -120.4816667, + 50.6733333 + ], station_code=None, max_learn_date=datetime.now()) + machine_learner.learn() - -@when('The machine learns') -def learn(instance: machine_learning.StationMachineLearning): - """ Train the machine learning model """ - instance.learn() + temp_result = machine_learner.predict_temperature(20, predict_date_with_samples) + rh_result = machine_learner.predict_rh(50, predict_date_with_samples) + assert temp_result == 30 + assert rh_result == 100 -@then(parsers.parse('The model_temp: {model_temp} for {timestamp} results in {bias_adjusted_temp}'), - converters=dict( - model_temp=float, - timestamp=datetime.fromisoformat, - bias_adjusted_temp=str2float)) -def assert_temperature( - instance: machine_learning.StationMachineLearning, - model_temp: float, timestamp: datetime, bias_adjusted_temp: float): - """ Assert that the ML algorithm predicts the temperature correctly """ - result = instance.predict_temperature(model_temp, timestamp) - assert result == bias_adjusted_temp +def test_bias_adjustment_without_samples(mock_get_actuals_left_outer_join_with_predictions): + predict_date_without_samples = datetime.fromisoformat("2020-09-03T01:14:51.939836+00:00") + machine_learner = StationMachineLearning( + session=None, + model=PredictionModel(id=1), + target_coordinate=[ + -120.4816667, + 50.6733333 + ], + station_code=None, + max_learn_date=datetime.now()) + machine_learner.learn() -@then(parsers.parse('The model_rh: {model_rh} for {timestamp} results in {bias_adjusted_rh}'), - converters=dict( - model_rh=float, - timestamp=datetime.fromisoformat, - bias_adjusted_rh=str2float)) -def assert_rh(instance: machine_learning.StationMachineLearning, - model_rh: float, timestamp: datetime, bias_adjusted_rh: float): - """ Assert that the ML algorithm predicts the relative humidity correctly """ - result = instance.predict_rh(model_rh, timestamp) - assert result == bias_adjusted_rh + temp_result = machine_learner.predict_temperature(20, predict_date_without_samples) + rh_result = machine_learner.predict_rh(50, predict_date_without_samples) + assert temp_result is None + assert rh_result is None diff --git a/api/app/weather_models/machine_learning.py b/api/app/weather_models/machine_learning.py index 1c51bf7af..202de5776 100644 --- a/api/app/weather_models/machine_learning.py +++ b/api/app/weather_models/machine_learning.py @@ -6,12 +6,10 @@ from typing import List from logging import getLogger from sklearn.linear_model import LinearRegression -from scipy.interpolate import griddata import numpy as np from sqlalchemy.orm import Session from app.weather_models import SCALAR_MODEL_VALUE_KEYS, construct_interpolated_noon_prediction -from app.db.models.weather_models import ( - PredictionModel, PredictionModelGridSubset, ModelRunPrediction) +from app.db.models.weather_models import (PredictionModel, ModelRunPrediction) from app.db.models.observations import HourlyActual from app.db.crud.observations import get_actuals_left_outer_join_with_predictions @@ -80,26 +78,18 @@ def np_y(self, hour): return np.array(self._y[hour]) def add_sample(self, - points: List, - target_point: List, - model_values: List, + model_value: float, actual_value: float, timestamp: datetime, model_key: str, sample_key: str): """ Add a sample, interpolating the model values spatially """ - # Interpolate spatially, to get close to our actual position: - try: - interpolated_value = griddata( - points, model_values, target_point, method='linear') - except: - # Additional logging to assist with finding errors: - logger.error('for %s->%s griddata failed with points: %s, model_values %s, target_point: %s', - model_key, sample_key, points, model_values, target_point) - raise + # Additional logging to assist with finding errors: + logger.info('adding sample for %s->%s with: model_values %s, actual_value: %s', + model_key, sample_key, model_value, actual_value) # Add to the data we're going to learn from: # Using two variables, the interpolated temperature value, and the hour of the day. - self.append_x(interpolated_value[0], timestamp) + self.append_x(model_value, timestamp) self.append_y(actual_value, timestamp) @@ -119,8 +109,6 @@ class StationMachineLearning: def __init__(self, session: Session, model: PredictionModel, - grid: PredictionModelGridSubset, - points: List, target_coordinate: List[float], station_code: int, max_learn_date: datetime): @@ -135,8 +123,6 @@ def __init__(self, """ self.session = session self.model = model - self.grid = grid - self.points = points self.target_coordinate = target_coordinate self.station_code = station_code self.regression_models = defaultdict(RegressionModels) @@ -160,8 +146,7 @@ def _add_sample_to_collection(self, logger.warning('no actual value for %s', sample_key) continue sample_value = getattr(sample_collection, sample_key) - sample_value.add_sample(self.points, self.target_coordinate, model_value, - actual_value, actual.weather_date, model_key, sample_key) + sample_value.add_sample(model_value, actual_value, actual.weather_date, model_key, sample_key) else: # Sometimes, for reasons that probably need investigation, model values # are None. From 4bc8ddc9b145c42b99d5101f036eb322f39cafe9 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 27 Sep 2023 17:53:49 -0700 Subject: [PATCH 06/10] Lint --- api/app/jobs/common_model_fetchers.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index 2df4cce6b..110bdddc5 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -5,13 +5,11 @@ import numpy from datetime import datetime, timedelta, timezone from pyproj import Geod -from geoalchemy2.shape import to_shape from sqlalchemy.orm import Session from app.db.crud.weather_models import (get_processed_file_record, get_processed_file_count, get_prediction_model_run_timestamp_records, get_model_run_predictions, - get_grids_for_coordinate, get_weather_station_model_prediction, delete_weather_station_model_predictions, refresh_morecast2_materialized_view) From ba4693b2b8e4c9040081859a35c1ea1f076ee491 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 27 Sep 2023 18:05:41 -0700 Subject: [PATCH 07/10] Fix test --- api/app/tests/weather_models/test_env_canada_gdps.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/api/app/tests/weather_models/test_env_canada_gdps.py b/api/app/tests/weather_models/test_env_canada_gdps.py index e19a17c2b..d3e2a9eca 100644 --- a/api/app/tests/weather_models/test_env_canada_gdps.py +++ b/api/app/tests/weather_models/test_env_canada_gdps.py @@ -7,7 +7,6 @@ from datetime import datetime import pytest import requests -from geoalchemy2.shape import from_shape from sqlalchemy.orm import Session from shapely import wkt from app.jobs import env_canada @@ -17,7 +16,7 @@ import app.db.crud.weather_models from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, - PredictionModelRunTimestamp, PredictionModelGridSubset) + PredictionModelRunTimestamp) from app.tests.weather_models.crud import get_actuals_left_outer_join_with_predictions from app.tests.weather_models.test_models_common import (MockResponse, mock_get_processed_file_count, mock_get_stations) @@ -72,17 +71,12 @@ def mock_get_processed_file_record(session, url: str): return gdps_processed_model_run return None - def mock_get_grids_for_coordinate(session, prediction_model, coordinate): - return [PredictionModelGridSubset( - id=1, prediction_model_id=gdps_prediction_model.id, geom=from_shape(shape)), ] - def mock_get_prediction_run(*args, **kwargs): return gdps_prediction_model_run monkeypatch.setattr(common_model_fetchers, 'get_prediction_model_run_timestamp_records', mock_get_gdps_prediction_model_run_timestamp_records) monkeypatch.setattr(common_model_fetchers, 'get_processed_file_record', mock_get_processed_file_record) - monkeypatch.setattr(common_model_fetchers, 'get_grids_for_coordinate', mock_get_grids_for_coordinate) monkeypatch.setattr(app.db.crud.weather_models, 'get_prediction_run', mock_get_prediction_run) From 7489a5e9464c206a65b932848f4e96d336340904 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 27 Sep 2023 18:47:35 -0700 Subject: [PATCH 08/10] Fix tests --- api/app/tests/weather_models/test_env_canada_hrdps.py | 8 +------- api/app/tests/weather_models/test_env_canada_rdps.py | 8 +------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/api/app/tests/weather_models/test_env_canada_hrdps.py b/api/app/tests/weather_models/test_env_canada_hrdps.py index 5cb17c0ef..ce2a572b6 100644 --- a/api/app/tests/weather_models/test_env_canada_hrdps.py +++ b/api/app/tests/weather_models/test_env_canada_hrdps.py @@ -9,7 +9,6 @@ import shapely import datetime from sqlalchemy.orm import Session -from geoalchemy2.shape import from_shape from pytest_mock import MockerFixture import app.utils.time as time_utils import app.db.database @@ -20,7 +19,7 @@ from app.weather_models import ProjectionEnum from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, - PredictionModelRunTimestamp, PredictionModelGridSubset) + PredictionModelRunTimestamp) from app.tests.weather_models.test_env_canada_gdps import MockResponse @@ -55,10 +54,6 @@ def mock_get_processed_file_record(session, url: str): return hrdps_processed_model_run return None - def mock_get_grids_for_coordinate(session, prediction_model, coordinate): - return [PredictionModelGridSubset( - id=1, prediction_model_id=hrdps_prediction_model.id, geom=from_shape(shape)), ] - def mock_get_prediction_run(*args, **kwargs): return hrdps_prediction_model_run @@ -66,7 +61,6 @@ def mock_get_prediction_run(*args, **kwargs): monkeypatch.setattr(app.jobs.common_model_fetchers, 'get_prediction_model_run_timestamp_records', mock_get_hrdps_prediction_model_run_timestamp_records) monkeypatch.setattr(app.jobs.env_canada, 'get_processed_file_record', mock_get_processed_file_record) - monkeypatch.setattr(app.jobs.common_model_fetchers, 'get_grids_for_coordinate', mock_get_grids_for_coordinate) monkeypatch.setattr(app.db.crud.weather_models, 'get_prediction_run', mock_get_prediction_run) diff --git a/api/app/tests/weather_models/test_env_canada_rdps.py b/api/app/tests/weather_models/test_env_canada_rdps.py index 8160328e8..7cf2f389c 100644 --- a/api/app/tests/weather_models/test_env_canada_rdps.py +++ b/api/app/tests/weather_models/test_env_canada_rdps.py @@ -8,7 +8,6 @@ from typing import Optional from shapely import wkt from sqlalchemy.orm import Session -from geoalchemy2.shape import from_shape import app.utils.time as time_utils import app.weather_models.process_grib import app.jobs.env_canada @@ -16,7 +15,7 @@ import app.db.crud.weather_models from app.stations import StationSourceEnum from app.db.models.weather_models import (PredictionModel, ProcessedModelRunUrl, - PredictionModelRunTimestamp, PredictionModelGridSubset) + PredictionModelRunTimestamp) from app.tests.weather_models.test_env_canada_gdps import (MockResponse) logger = logging.getLogger(__name__) @@ -52,10 +51,6 @@ def mock_get_processed_file_record(session, url: str): return rdps_processed_model_run return None - def mock_get_grids_for_coordinate(session, prediction_model, coordinate): - return [PredictionModelGridSubset( - id=1, prediction_model_id=rdps_prediction_model.id, geom=from_shape(shape)), ] - def mock_get_prediction_run(*args, **kwargs): return rdps_prediction_model_run @@ -63,7 +58,6 @@ def mock_get_prediction_run(*args, **kwargs): monkeypatch.setattr(app.jobs.common_model_fetchers, 'get_prediction_model_run_timestamp_records', mock_get_rdps_prediction_model_run_timestamp_records) monkeypatch.setattr(app.jobs.env_canada, 'get_processed_file_record', mock_get_processed_file_record) - monkeypatch.setattr(app.jobs.common_model_fetchers, 'get_grids_for_coordinate', mock_get_grids_for_coordinate) monkeypatch.setattr(app.db.crud.weather_models, 'get_prediction_run', mock_get_prediction_run) From 2e225a0f8ecfb09358dfb651199ccbe487df6610 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 28 Sep 2023 10:54:14 -0700 Subject: [PATCH 09/10] Delete dead code, fix references --- api/app/db/crud/weather_models.py | 60 +-------------------------- api/app/jobs/common_model_fetchers.py | 10 ++--- 2 files changed, 7 insertions(+), 63 deletions(-) diff --git a/api/app/db/crud/weather_models.py b/api/app/db/crud/weather_models.py index 2b613d59e..d5e355ac7 100644 --- a/api/app/db/crud/weather_models.py +++ b/api/app/db/crud/weather_models.py @@ -3,57 +3,16 @@ import logging import datetime from typing import List, Union -from sqlalchemy import or_, and_, func +from sqlalchemy import and_, func from sqlalchemy.orm import Session from sqlalchemy.sql import text from app.weather_models import ModelEnum, ProjectionEnum from app.db.models.weather_models import ( - ProcessedModelRunUrl, PredictionModel, PredictionModelRunTimestamp, PredictionModelGridSubset, + ProcessedModelRunUrl, PredictionModel, PredictionModelRunTimestamp, ModelRunPrediction, WeatherStationModelPrediction, MoreCast2MaterializedView) -import app.utils.time as time_utils logger = logging.getLogger(__name__) -# -------------- COMMON UTILITY FUNCTIONS --------------------------- - - -def _construct_grid_filter(coordinates): - # Run through each coordinate, adding it to the "or" construct. - geom_or = None - for coordinate in coordinates: - condition = PredictionModelGridSubset.geom.ST_Contains( - 'POINT({longitude} {latitude})'.format(longitude=coordinate[0], latitude=coordinate[1])) - if geom_or is None: - geom_or = or_(condition) - else: - geom_or = or_(condition, geom_or) - return geom_or - - -# ----------- end of UTILITY FUNCTIONS ------------------------ - - -def get_or_create_grid_subset(session: Session, - prediction_model: PredictionModel, - geographic_points) -> PredictionModelGridSubset: - """ Get the subset of grid points of interest. """ - geom = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {} {}))'.format( - geographic_points[0][0], geographic_points[0][1], - geographic_points[1][0], geographic_points[1][1], - geographic_points[2][0], geographic_points[2][1], - geographic_points[3][0], geographic_points[3][1], - geographic_points[0][0], geographic_points[0][1]) - grid_subset = session.query(PredictionModelGridSubset).\ - filter(PredictionModelGridSubset.prediction_model_id == prediction_model.id).\ - filter(PredictionModelGridSubset.geom == geom).first() - if not grid_subset: - logger.info('creating grid subset %s', geographic_points) - grid_subset = PredictionModelGridSubset( - prediction_model_id=prediction_model.id, geom=geom) - session.add(grid_subset) - session.commit() - return grid_subset - def get_prediction_run(session: Session, prediction_model_id: int, prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: @@ -103,21 +62,6 @@ def get_or_create_prediction_run(session, prediction_model: PredictionModel, return prediction_run -def get_grids_for_coordinate(session: Session, - prediction_model: PredictionModel, - coordinate) -> PredictionModelGridSubset: - """ Given a specified coordinate and model, return the appropriate grids. - There should only every be one grid per coordinate - but it's conceivable that there are more than one. - """ - logger.info("Model %s, coords %s,%s", prediction_model.id, - coordinate[1], coordinate[0]) - query = session.query(PredictionModelGridSubset).\ - filter(PredictionModelGridSubset.geom.ST_Contains( - 'POINT({longitude} {latitude})'.format(longitude=coordinate[0], latitude=coordinate[1]))).\ - filter(PredictionModelGridSubset.prediction_model_id == prediction_model.id) - return query - - def get_model_run_predictions(session: Session, prediction_run: PredictionModelRunTimestamp) -> List: """ Get all the predictions for a provided model run """ diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index 110bdddc5..112a09ecd 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -210,7 +210,7 @@ def _process_prediction(self, station: WeatherStation, model_run: PredictionModelRunTimestamp, machine: StationMachineLearning): - """ Create a WeatherStationModelPrediction from the ModelRunGridSubsetPrediction data. + """ Create a WeatherStationModelPrediction from the ModelRunPrediction data. """ # If there's already a prediction, we want to update it station_prediction = get_weather_station_model_prediction( @@ -226,7 +226,7 @@ def _process_prediction(self, # NOTE: Not sure why this value would ever be None. This could happen if for whatever reason, the # tmp_tgl_2 layer failed to download and process, while other layers did. if prediction.tmp_tgl_2 is None: - logger.warning('tmp_tgl_2 is None or empty for ModelRunGridSubsetPrediction.id == %s', prediction.id) + logger.warning('tmp_tgl_2 is None for ModelRunPrediction.id == %s', prediction.id) else: station_prediction.tmp_tgl_2 = prediction.tmp_tgl_2 @@ -235,7 +235,7 @@ def _process_prediction(self, # rh_tgl_2 layer failed to download and process, while other layers did. if prediction.rh_tgl_2 is None: # This is unexpected, so we log it. - logger.warning('rh_tgl_2 is None for ModelRunGridSubsetPrediction.id == %s', prediction.id) + logger.warning('rh_tgl_2 is None for ModelRunPrediction.id == %s', prediction.id) station_prediction.rh_tgl_2 = None else: station_prediction.rh_tgl_2 = prediction.rh_tgl_2 @@ -255,10 +255,10 @@ def _process_prediction(self, # Get the closest wind speed if prediction.wind_tgl_10 is not None: - station_prediction.wind_tgl_10 = prediction.wind_tgl_10[0] + station_prediction.wind_tgl_10 = prediction.wind_tgl_10 # Get the closest wind direcion if prediction.wdir_tgl_10 is not None: - station_prediction.wdir_tgl_10 = prediction.wdir_tgl_10[0] + station_prediction.wdir_tgl_10 = prediction.wdir_tgl_10 # Predict the temperature station_prediction.bias_adjusted_temperature = machine.predict_temperature( From 4e71d230a74cd8943d9075f8d2c1e9e87ef8fcc3 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 28 Sep 2023 15:02:43 -0700 Subject: [PATCH 10/10] Fix interpolate return value --- api/app/weather_models/__init__.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/api/app/weather_models/__init__.py b/api/app/weather_models/__init__.py index 5713bf951..20cbeb1aa 100644 --- a/api/app/weather_models/__init__.py +++ b/api/app/weather_models/__init__.py @@ -37,14 +37,14 @@ class ProjectionEnum(str, Enum): def interpolate_between_two_points( - x1: int, x2: int, y1: float, y2: float, xn: int): + x1: int, x2: int, y1: float, y2: float, xn: int) -> float: """ Interpolate values between two points in time. :param x1: X coordinate of the 1st value. :param x2: X coordinate of the 2nd value. - :param y1: List of values at the 1st timestamp. - :param y2: List of values at the 2nd timestamp. + :param y1: value at the 1st timestamp. + :param y2: value at the 2nd timestamp. :param xn: The c coordinate we want values for. - :return: Interpolated values. + :return: Interpolated value. """ # Prepare x-axis (time). @@ -55,7 +55,7 @@ def interpolate_between_two_points( # Create interpolation function. function = interp1d(x_axis, y_axis, kind='linear') # Use iterpolation function to derive values at the time of interest. - return function(xn) + return function(xn).item() def interpolate_bearing(time_a: datetime, time_b: datetime, target_time: datetime, @@ -129,6 +129,7 @@ def construct_interpolated_noon_prediction(prediction_a: ModelRunPrediction, continue value = interpolate_between_two_points(timestamp_a, timestamp_b, value_a, value_b, noon_timestamp) + assert isinstance(value, float) setattr(noon_prediction, key, value) if noon_prediction.apcp_sfc_0 is None or prediction_a.apcp_sfc_0 is None: noon_prediction.delta_precip = None