Skip to content

Commit

Permalink
Add station code field (#3199)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgboss authored Oct 30, 2023
1 parent c27f2ad commit 780e08e
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add station_code to ModelRunPrediction
Revision ID: 5b745fe0bd7a
Revises: 505a3f03ba75
Create Date: 2023-10-25 11:34:45.429269
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '5b745fe0bd7a'
down_revision = '505a3f03ba75'
branch_labels = None
depends_on = None


def upgrade():
# Delete unusable data from the model_run_predictions table
op.execute(sa.text('DELETE FROM model_run_predictions'))
# ### commands auto generated by Alembic ###
op.add_column('model_run_predictions', sa.Column('station_code', sa.Integer(), nullable=False))
op.drop_constraint('model_run_predictions_prediction_model_run_timestamp_id_pre_key', 'model_run_predictions', type_='unique')
op.create_unique_constraint('model_run_predictions_unique_constraint', 'model_run_predictions', ['prediction_model_run_timestamp_id', 'prediction_timestamp', 'station_code'])
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic ###
op.drop_constraint('model_run_predictions_unique_constraint', 'model_run_predictions', type_='unique')
op.create_unique_constraint('model_run_predictions_prediction_model_run_timestamp_id_pre_key', 'model_run_predictions', ['prediction_model_run_timestamp_id', 'prediction_timestamp'])
op.drop_column('model_run_predictions', 'station_code')
# ### end Alembic commands ###
3 changes: 2 additions & 1 deletion api/app/db/crud/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def get_actuals_left_outer_join_with_predictions(
"""
return session.query(HourlyActual, ModelRunPrediction)\
.outerjoin(ModelRunPrediction,
and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date))\
and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date,
ModelRunPrediction.station_code == station_code))\
.outerjoin(PredictionModelRunTimestamp,
and_(PredictionModelRunTimestamp.id ==
ModelRunPrediction.prediction_model_run_timestamp_id,
Expand Down
20 changes: 15 additions & 5 deletions api/app/db/crud/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ def get_or_create_prediction_run(session, prediction_model: PredictionModel,
return prediction_run


def get_model_run_predictions(session: Session,
def get_model_run_predictions_for_station(session: Session, station_code: int,
prediction_run: PredictionModelRunTimestamp) -> List:
""" Get all the predictions for a provided model run """
logger.info("Getting model predictions for grid %s", prediction_run)
return session.query(ModelRunPrediction).\
filter(ModelRunPrediction.prediction_model_run_timestamp_id ==
prediction_run.id).\
order_by(ModelRunPrediction.prediction_timestamp)
return session.query(ModelRunPrediction)\
.filter(ModelRunPrediction.prediction_model_run_timestamp_id ==
prediction_run.id)\
.filter(ModelRunPrediction.station_code == station_code)\
.order_by(ModelRunPrediction.prediction_timestamp)


def delete_weather_station_model_predictions(session: Session, older_than: datetime):
Expand All @@ -79,6 +80,15 @@ def delete_weather_station_model_predictions(session: Session, older_than: datet
session.query(WeatherStationModelPrediction)\
.filter(WeatherStationModelPrediction.prediction_timestamp < older_than)\
.delete()


def delete_model_run_predictions(session: Session, older_than: datetime):
""" Delete any model run prediction older than a certain date.
"""
logger.info('Deleting model_run_prediction data older than %s...', older_than)
session.query(ModelRunPrediction)\
.filter(ModelRunPrediction.prediction_timestamp < older_than)\
.delete()


def get_station_model_predictions_order_by_prediction_timestamp(
Expand Down
4 changes: 3 additions & 1 deletion api/app/db/models/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ModelRunPrediction(Base):
Each value is a numeric value that corresponds to the lat lon from the model raster """
__tablename__ = 'model_run_predictions'
__table_args__ = (
UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp'),
UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp', 'station_code'),
{'comment': 'The prediction values of a particular model run.'}
)

Expand All @@ -183,6 +183,8 @@ class ModelRunPrediction(Base):
"PredictionModelRunTimestamp", foreign_keys=[prediction_model_run_timestamp_id])
# The date and time to which the prediction applies.
prediction_timestamp = Column(TZTimeStamp, nullable=False, index=True)
# The station code representing the location (aka weather station).
station_code = Column(Integer, nullable=True)
# Temperature 2m above model layer.
tmp_tgl_2 = Column(Float, nullable=True)
# Relative humidity 2m above model layer.
Expand Down
8 changes: 5 additions & 3 deletions api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from app.db.crud.weather_models import (get_processed_file_record,
get_processed_file_count,
get_prediction_model_run_timestamp_records,
get_model_run_predictions,
get_model_run_predictions_for_station,
get_weather_station_model_prediction,
delete_weather_station_model_predictions,
refresh_morecast2_materialized_view)
refresh_morecast2_materialized_view,
delete_model_run_predictions)
from app.weather_models.machine_learning import StationMachineLearning
from app.weather_models import SCALAR_MODEL_VALUE_KEYS, ModelEnum, construct_interpolated_noon_prediction
from app.schemas.stations import WeatherStation
Expand Down Expand Up @@ -162,6 +163,7 @@ def apply_data_retention_policy():
# keeping 21 days (3 weeks) of historic data is sufficient.
oldest_to_keep = time_utils.get_utc_now() - time_utils.data_retention_threshold
delete_weather_station_model_predictions(session, oldest_to_keep)
delete_model_run_predictions(session, oldest_to_keep)


def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: ModelRunPrediction, model_run_hour: int):
Expand Down Expand Up @@ -349,7 +351,7 @@ def _process_model_run_for_station(self,
machine.learn()

# Get all the predictions associated to this particular model run.
query = get_model_run_predictions(self.session, model_run)
query = get_model_run_predictions_for_station(self.session, station.code, model_run)

nam_cumulative_precip = 0.0
# Iterate through all the predictions.
Expand Down
3 changes: 2 additions & 1 deletion api/app/tests/weather_models/test_process_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ def test_read_single_raster_value():
geo_to_raster_transformer)

raster_band = dataset.GetRasterBand(1)
value = next(processor.yield_value_for_stations(raster_band))
station, value = next(processor.yield_value_for_stations(raster_band))

assert math.isclose(value, 55.976, abs_tol=0.001)
assert station.code == 322

del dataset
42 changes: 23 additions & 19 deletions api/app/weather_models/process_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def yield_value_for_stations(self, raster_band: gdal.Dataset):
logger.warning('coordinate not in raster - %s', station)
continue

yield value
yield (station, value)

def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_band: gdal.Dataset, variable: str):
""" Given a list of stations and 2 gdal datasets (one for u-component of wind, one for v-component
Expand All @@ -191,11 +191,11 @@ def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_
v_value = v_raster_band.ReadAsArray(x_coordinate, y_coordinate, 1, 1)[0, 0]

if variable == 'wdir_tgl_10':
yield calculate_wind_dir_from_u_v(u_value, v_value)
yield (station, calculate_wind_dir_from_u_v(u_value, v_value))
elif variable == 'wind_tgl_10':
metres_per_second_speed = calculate_wind_speed_from_u_v(u_value, v_value)
kilometres_per_hour_speed = convert_mps_to_kph(metres_per_second_speed)
yield kilometres_per_hour_speed
yield (station, kilometres_per_hour_speed)
else:
logger.warning('coordinate not in u/v wind rasters - %s', station)

Expand Down Expand Up @@ -240,22 +240,26 @@ def get_variable_name(self, grib_info: ModelRunInfo) -> str:
return variable_name

def store_prediction_value(self,
station_code: int,
value: float,
preduction_model_run: PredictionModelRunTimestamp,
grib_info: ModelRunInfo,
session: Session):
""" Store the values around the area of interest.
"""
# Load the record if it exists.
prediction = session.query(ModelRunPrediction).\
filter(
ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id).\
filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp).first()
prediction = session.query(ModelRunPrediction)\
.filter(
ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id)\
.filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp)\
.filter(ModelRunPrediction.station_code == station_code)\
.first()
if not prediction:
# Record doesn't exist, so we create it.
prediction = ModelRunPrediction()
prediction.prediction_model_run_timestamp_id = preduction_model_run.id
prediction.prediction_timestamp = grib_info.prediction_timestamp
prediction.station_code = station_code

variable_name = self.get_variable_name(grib_info)
setattr(prediction, variable_name, value)
Expand All @@ -267,13 +271,13 @@ def process_env_can_grib_file(self, session: Session, dataset, grib_info: ModelR
# for GDPS, RDPS, HRDPS models, always only ever 1 raster band in the dataset
raster_band = dataset.GetRasterBand(1)
# Iterate through stations:
for value in self.yield_value_for_stations(raster_band):
for station, value in self.yield_value_for_stations(raster_band):
# Convert wind speed from metres per second to kilometres per hour for Environment Canada
# models (NOAA models handled elswhere)
if grib_info.variable_name.lower().startswith("wind_agl") or grib_info.variable_name.lower().startswith('wind_tgl'):
value = convert_mps_to_kph(value)

self.store_prediction_value(value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, value, prediction_run, grib_info, session)

def get_raster_bands(self, dataset, grib_info: ModelRunInfo):
""" Returns raster bands of dataset for temperature, RH, U/V wind components, and
Expand Down Expand Up @@ -308,22 +312,22 @@ def process_noaa_grib_file(self, session: Session, dataset, grib_info: ModelRunI
tmp_raster_band, rh_raster_band, u_wind_raster_band, v_wind_raster_band, precip_raster_band = self.get_raster_bands(
dataset, grib_info)

for tmp_value in self.yield_value_for_stations(tmp_raster_band):
for station, tmp_value in self.yield_value_for_stations(tmp_raster_band):
grib_info.variable_name = 'tmp_tgl_2'
self.store_prediction_value(tmp_value, prediction_run, grib_info, session)
for rh_value in self.yield_value_for_stations(rh_raster_band):
self.store_prediction_value(station.code, tmp_value, prediction_run, grib_info, session)
for station, rh_value in self.yield_value_for_stations(rh_raster_band):
grib_info.variable_name = 'rh_tgl_2'
self.store_prediction_value(rh_value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, rh_value, prediction_run, grib_info, session)
if precip_raster_band:
for apcp_value in self.yield_value_for_stations(precip_raster_band):
for station, apcp_value in self.yield_value_for_stations(precip_raster_band):
grib_info.variable_name = 'apcp_sfc_0'
self.store_prediction_value(apcp_value, prediction_run, grib_info, session)
for wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'):
self.store_prediction_value(station.code, apcp_value, prediction_run, grib_info, session)
for station, wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'):
grib_info.variable_name = 'wdir_tgl_10'
self.store_prediction_value(wdir_value, prediction_run, grib_info, session)
for wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'):
self.store_prediction_value(station.code, wdir_value, prediction_run, grib_info, session)
for station, wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'):
grib_info.variable_name = 'wind_tgl_10'
self.store_prediction_value(wind_value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, wind_value, prediction_run, grib_info, session)

def process_grib_file(self, filename, grib_info: ModelRunInfo, session: Session):
""" Process a grib file, extracting and storing relevant information. """
Expand Down

0 comments on commit 780e08e

Please sign in to comment.