diff --git a/api/alembic/versions/5b745fe0bd7a_add_station_code_to_modelrunprediction.py b/api/alembic/versions/5b745fe0bd7a_add_station_code_to_modelrunprediction.py new file mode 100644 index 000000000..5d379d91a --- /dev/null +++ b/api/alembic/versions/5b745fe0bd7a_add_station_code_to_modelrunprediction.py @@ -0,0 +1,34 @@ +"""Add station_code to ModelRunPrediction + +Revision ID: 5b745fe0bd7a +Revises: 505a3f03ba75 +Create Date: 2023-10-25 11:34:45.429269 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '5b745fe0bd7a' +down_revision = '505a3f03ba75' +branch_labels = None +depends_on = None + + +def upgrade(): + # Delete unusable data from the model_run_predictions table + op.execute(sa.text('DELETE FROM model_run_predictions')) + # ### commands auto generated by Alembic ### + op.add_column('model_run_predictions', sa.Column('station_code', sa.Integer(), nullable=False)) + op.drop_constraint('model_run_predictions_prediction_model_run_timestamp_id_pre_key', 'model_run_predictions', type_='unique') + op.create_unique_constraint('model_run_predictions_unique_constraint', 'model_run_predictions', ['prediction_model_run_timestamp_id', 'prediction_timestamp', 'station_code']) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic ### + op.drop_constraint('model_run_predictions_unique_constraint', 'model_run_predictions', type_='unique') + op.create_unique_constraint('model_run_predictions_prediction_model_run_timestamp_id_pre_key', 'model_run_predictions', ['prediction_model_run_timestamp_id', 'prediction_timestamp']) + op.drop_column('model_run_predictions', 'station_code') + # ### end Alembic commands ### diff --git a/api/app/db/crud/observations.py b/api/app/db/crud/observations.py index 50c04444f..5f91f95d2 100644 --- a/api/app/db/crud/observations.py +++ b/api/app/db/crud/observations.py @@ -39,7 +39,8 @@ def get_actuals_left_outer_join_with_predictions( """ return session.query(HourlyActual, ModelRunPrediction)\ .outerjoin(ModelRunPrediction, - and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date))\ + and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date, + ModelRunPrediction.station_code == station_code))\ .outerjoin(PredictionModelRunTimestamp, and_(PredictionModelRunTimestamp.id == ModelRunPrediction.prediction_model_run_timestamp_id, diff --git a/api/app/db/crud/weather_models.py b/api/app/db/crud/weather_models.py index d5e355ac7..ddf6920bd 100644 --- a/api/app/db/crud/weather_models.py +++ b/api/app/db/crud/weather_models.py @@ -62,14 +62,15 @@ def get_or_create_prediction_run(session, prediction_model: PredictionModel, return prediction_run -def get_model_run_predictions(session: Session, +def get_model_run_predictions_for_station(session: Session, station_code: int, prediction_run: PredictionModelRunTimestamp) -> List: """ Get all the predictions for a provided model run """ logger.info("Getting model predictions for grid %s", prediction_run) - return session.query(ModelRunPrediction).\ - filter(ModelRunPrediction.prediction_model_run_timestamp_id == - prediction_run.id).\ - order_by(ModelRunPrediction.prediction_timestamp) + return session.query(ModelRunPrediction)\ + .filter(ModelRunPrediction.prediction_model_run_timestamp_id == + prediction_run.id)\ + .filter(ModelRunPrediction.station_code == station_code)\ + .order_by(ModelRunPrediction.prediction_timestamp) def delete_weather_station_model_predictions(session: Session, older_than: datetime): @@ -79,6 +80,15 @@ def delete_weather_station_model_predictions(session: Session, older_than: datet session.query(WeatherStationModelPrediction)\ .filter(WeatherStationModelPrediction.prediction_timestamp < older_than)\ .delete() + + +def delete_model_run_predictions(session: Session, older_than: datetime): + """ Delete any model run prediction older than a certain date. + """ + logger.info('Deleting model_run_prediction data older than %s...', older_than) + session.query(ModelRunPrediction)\ + .filter(ModelRunPrediction.prediction_timestamp < older_than)\ + .delete() def get_station_model_predictions_order_by_prediction_timestamp( diff --git a/api/app/db/models/weather_models.py b/api/app/db/models/weather_models.py index c63f377b5..9226cb032 100644 --- a/api/app/db/models/weather_models.py +++ b/api/app/db/models/weather_models.py @@ -170,7 +170,7 @@ class ModelRunPrediction(Base): Each value is a numeric value that corresponds to the lat lon from the model raster """ __tablename__ = 'model_run_predictions' __table_args__ = ( - UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp'), + UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp', 'station_code'), {'comment': 'The prediction values of a particular model run.'} ) @@ -183,6 +183,8 @@ class ModelRunPrediction(Base): "PredictionModelRunTimestamp", foreign_keys=[prediction_model_run_timestamp_id]) # The date and time to which the prediction applies. prediction_timestamp = Column(TZTimeStamp, nullable=False, index=True) + # The station code representing the location (aka weather station). + station_code = Column(Integer, nullable=True) # Temperature 2m above model layer. tmp_tgl_2 = Column(Float, nullable=True) # Relative humidity 2m above model layer. diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index 66153aac3..8ac07d163 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -9,10 +9,11 @@ from app.db.crud.weather_models import (get_processed_file_record, get_processed_file_count, get_prediction_model_run_timestamp_records, - get_model_run_predictions, + get_model_run_predictions_for_station, get_weather_station_model_prediction, delete_weather_station_model_predictions, - refresh_morecast2_materialized_view) + refresh_morecast2_materialized_view, + delete_model_run_predictions) from app.weather_models.machine_learning import StationMachineLearning from app.weather_models import SCALAR_MODEL_VALUE_KEYS, ModelEnum, construct_interpolated_noon_prediction from app.schemas.stations import WeatherStation @@ -162,6 +163,7 @@ def apply_data_retention_policy(): # keeping 21 days (3 weeks) of historic data is sufficient. oldest_to_keep = time_utils.get_utc_now() - time_utils.data_retention_threshold delete_weather_station_model_predictions(session, oldest_to_keep) + delete_model_run_predictions(session, oldest_to_keep) def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: ModelRunPrediction, model_run_hour: int): @@ -349,7 +351,7 @@ def _process_model_run_for_station(self, machine.learn() # Get all the predictions associated to this particular model run. - query = get_model_run_predictions(self.session, model_run) + query = get_model_run_predictions_for_station(self.session, station.code, model_run) nam_cumulative_precip = 0.0 # Iterate through all the predictions. diff --git a/api/app/tests/weather_models/test_process_grib.py b/api/app/tests/weather_models/test_process_grib.py index f42d54e21..8b585e6ac 100644 --- a/api/app/tests/weather_models/test_process_grib.py +++ b/api/app/tests/weather_models/test_process_grib.py @@ -40,8 +40,9 @@ def test_read_single_raster_value(): geo_to_raster_transformer) raster_band = dataset.GetRasterBand(1) - value = next(processor.yield_value_for_stations(raster_band)) + station, value = next(processor.yield_value_for_stations(raster_band)) assert math.isclose(value, 55.976, abs_tol=0.001) + assert station.code == 322 del dataset diff --git a/api/app/weather_models/process_grib.py b/api/app/weather_models/process_grib.py index 923c840a0..a6ed3a343 100644 --- a/api/app/weather_models/process_grib.py +++ b/api/app/weather_models/process_grib.py @@ -171,7 +171,7 @@ def yield_value_for_stations(self, raster_band: gdal.Dataset): logger.warning('coordinate not in raster - %s', station) continue - yield value + yield (station, value) def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_band: gdal.Dataset, variable: str): """ Given a list of stations and 2 gdal datasets (one for u-component of wind, one for v-component @@ -191,11 +191,11 @@ def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_ v_value = v_raster_band.ReadAsArray(x_coordinate, y_coordinate, 1, 1)[0, 0] if variable == 'wdir_tgl_10': - yield calculate_wind_dir_from_u_v(u_value, v_value) + yield (station, calculate_wind_dir_from_u_v(u_value, v_value)) elif variable == 'wind_tgl_10': metres_per_second_speed = calculate_wind_speed_from_u_v(u_value, v_value) kilometres_per_hour_speed = convert_mps_to_kph(metres_per_second_speed) - yield kilometres_per_hour_speed + yield (station, kilometres_per_hour_speed) else: logger.warning('coordinate not in u/v wind rasters - %s', station) @@ -240,6 +240,7 @@ def get_variable_name(self, grib_info: ModelRunInfo) -> str: return variable_name def store_prediction_value(self, + station_code: int, value: float, preduction_model_run: PredictionModelRunTimestamp, grib_info: ModelRunInfo, @@ -247,15 +248,18 @@ def store_prediction_value(self, """ Store the values around the area of interest. """ # Load the record if it exists. - prediction = session.query(ModelRunPrediction).\ - filter( - ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id).\ - filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp).first() + prediction = session.query(ModelRunPrediction)\ + .filter( + ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id)\ + .filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp)\ + .filter(ModelRunPrediction.station_code == station_code)\ + .first() if not prediction: # Record doesn't exist, so we create it. prediction = ModelRunPrediction() prediction.prediction_model_run_timestamp_id = preduction_model_run.id prediction.prediction_timestamp = grib_info.prediction_timestamp + prediction.station_code = station_code variable_name = self.get_variable_name(grib_info) setattr(prediction, variable_name, value) @@ -267,13 +271,13 @@ def process_env_can_grib_file(self, session: Session, dataset, grib_info: ModelR # for GDPS, RDPS, HRDPS models, always only ever 1 raster band in the dataset raster_band = dataset.GetRasterBand(1) # Iterate through stations: - for value in self.yield_value_for_stations(raster_band): + for station, value in self.yield_value_for_stations(raster_band): # Convert wind speed from metres per second to kilometres per hour for Environment Canada # models (NOAA models handled elswhere) if grib_info.variable_name.lower().startswith("wind_agl") or grib_info.variable_name.lower().startswith('wind_tgl'): value = convert_mps_to_kph(value) - self.store_prediction_value(value, prediction_run, grib_info, session) + self.store_prediction_value(station.code, value, prediction_run, grib_info, session) def get_raster_bands(self, dataset, grib_info: ModelRunInfo): """ Returns raster bands of dataset for temperature, RH, U/V wind components, and @@ -308,22 +312,22 @@ def process_noaa_grib_file(self, session: Session, dataset, grib_info: ModelRunI tmp_raster_band, rh_raster_band, u_wind_raster_band, v_wind_raster_band, precip_raster_band = self.get_raster_bands( dataset, grib_info) - for tmp_value in self.yield_value_for_stations(tmp_raster_band): + for station, tmp_value in self.yield_value_for_stations(tmp_raster_band): grib_info.variable_name = 'tmp_tgl_2' - self.store_prediction_value(tmp_value, prediction_run, grib_info, session) - for rh_value in self.yield_value_for_stations(rh_raster_band): + self.store_prediction_value(station.code, tmp_value, prediction_run, grib_info, session) + for station, rh_value in self.yield_value_for_stations(rh_raster_band): grib_info.variable_name = 'rh_tgl_2' - self.store_prediction_value(rh_value, prediction_run, grib_info, session) + self.store_prediction_value(station.code, rh_value, prediction_run, grib_info, session) if precip_raster_band: - for apcp_value in self.yield_value_for_stations(precip_raster_band): + for station, apcp_value in self.yield_value_for_stations(precip_raster_band): grib_info.variable_name = 'apcp_sfc_0' - self.store_prediction_value(apcp_value, prediction_run, grib_info, session) - for wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'): + self.store_prediction_value(station.code, apcp_value, prediction_run, grib_info, session) + for station, wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'): grib_info.variable_name = 'wdir_tgl_10' - self.store_prediction_value(wdir_value, prediction_run, grib_info, session) - for wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'): + self.store_prediction_value(station.code, wdir_value, prediction_run, grib_info, session) + for station, wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'): grib_info.variable_name = 'wind_tgl_10' - self.store_prediction_value(wind_value, prediction_run, grib_info, session) + self.store_prediction_value(station.code, wind_value, prediction_run, grib_info, session) def process_grib_file(self, filename, grib_info: ModelRunInfo, session: Session): """ Process a grib file, extracting and storing relevant information. """