Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add station_code field to ModelRunPrediction table #3191

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add station_code to ModelRunPrediction

Revision ID: 5b745fe0bd7a
Revises: 2442f07d975c
Create Date: 2023-10-25 11:34:45.429269

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '5b745fe0bd7a'
down_revision = '2442f07d975c'
branch_labels = None
depends_on = None


def upgrade():
# Delete unusable data from the model_run_predictions table
op.execute(sa.text('DELETE FROM model_run_predictions'))
# ### commands auto generated by Alembic ###
op.add_column('model_run_predictions', sa.Column('station_code', sa.Integer(), nullable=False))
op.drop_constraint('model_run_predictions_prediction_model_run_timestamp_id_pre_key', 'model_run_predictions', type_='unique')
op.create_unique_constraint('model_run_predictions_unique_constraint', 'model_run_predictions', ['prediction_model_run_timestamp_id', 'prediction_timestamp', 'station_code'])
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic ###
op.drop_constraint('model_run_predictions_unique_constraint', 'model_run_predictions', type_='unique')
op.create_unique_constraint('model_run_predictions_prediction_model_run_timestamp_id_pre_key', 'model_run_predictions', ['prediction_model_run_timestamp_id', 'prediction_timestamp'])
op.drop_column('model_run_predictions', 'station_code')
# ### end Alembic commands ###
3 changes: 2 additions & 1 deletion api/app/db/crud/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def get_actuals_left_outer_join_with_predictions(
"""
return session.query(HourlyActual, ModelRunPrediction)\
.outerjoin(ModelRunPrediction,
and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date))\
and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date,
ModelRunPrediction.station_code == station_code))\
.outerjoin(PredictionModelRunTimestamp,
and_(PredictionModelRunTimestamp.id ==
ModelRunPrediction.prediction_model_run_timestamp_id,
Expand Down
20 changes: 15 additions & 5 deletions api/app/db/crud/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ def get_or_create_prediction_run(session, prediction_model: PredictionModel,
return prediction_run


def get_model_run_predictions(session: Session,
def get_model_run_predictions_for_station(session: Session, station_code: int,
prediction_run: PredictionModelRunTimestamp) -> List:
""" Get all the predictions for a provided model run """
logger.info("Getting model predictions for grid %s", prediction_run)
return session.query(ModelRunPrediction).\
filter(ModelRunPrediction.prediction_model_run_timestamp_id ==
prediction_run.id).\
order_by(ModelRunPrediction.prediction_timestamp)
return session.query(ModelRunPrediction)\
.filter(ModelRunPrediction.prediction_model_run_timestamp_id ==
prediction_run.id)\
.filter(ModelRunPrediction.station_code == station_code)\
.order_by(ModelRunPrediction.prediction_timestamp)


def delete_weather_station_model_predictions(session: Session, older_than: datetime):
Expand All @@ -79,6 +80,15 @@ def delete_weather_station_model_predictions(session: Session, older_than: datet
session.query(WeatherStationModelPrediction)\
.filter(WeatherStationModelPrediction.prediction_timestamp < older_than)\
.delete()


def delete_model_run_predictions(session: Session, older_than: datetime):
""" Delete any model run prediction older than a certain date.
"""
logger.info('Deleting model_run_prediction data older than %s...', older_than)
session.query(ModelRunPrediction)\
.filter(ModelRunPrediction.prediction_timestamp < older_than)\
.delete()


def get_station_model_predictions_order_by_prediction_timestamp(
Expand Down
4 changes: 3 additions & 1 deletion api/app/db/models/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ModelRunPrediction(Base):
Each value is a numeric value that corresponds to the lat lon from the model raster """
__tablename__ = 'model_run_predictions'
__table_args__ = (
UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp'),
UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp', 'station_code'),
{'comment': 'The prediction values of a particular model run.'}
)

Expand All @@ -183,6 +183,8 @@ class ModelRunPrediction(Base):
"PredictionModelRunTimestamp", foreign_keys=[prediction_model_run_timestamp_id])
# The date and time to which the prediction applies.
prediction_timestamp = Column(TZTimeStamp, nullable=False, index=True)
# The station code representing the location (aka weather station).
station_code = Column(Integer, nullable=True)
dgboss marked this conversation as resolved.
Show resolved Hide resolved
# Temperature 2m above model layer.
tmp_tgl_2 = Column(Float, nullable=True)
# Relative humidity 2m above model layer.
Expand Down
8 changes: 5 additions & 3 deletions api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from app.db.crud.weather_models import (get_processed_file_record,
get_processed_file_count,
get_prediction_model_run_timestamp_records,
get_model_run_predictions,
get_model_run_predictions_for_station,
get_weather_station_model_prediction,
delete_weather_station_model_predictions,
refresh_morecast2_materialized_view)
refresh_morecast2_materialized_view,
delete_model_run_predictions)
from app.weather_models.machine_learning import StationMachineLearning
from app.weather_models import SCALAR_MODEL_VALUE_KEYS, ModelEnum, construct_interpolated_noon_prediction
from app.schemas.stations import WeatherStation
Expand Down Expand Up @@ -162,6 +163,7 @@
# keeping 21 days (3 weeks) of historic data is sufficient.
oldest_to_keep = time_utils.get_utc_now() - time_utils.data_retention_threshold
delete_weather_station_model_predictions(session, oldest_to_keep)
delete_model_run_predictions(session, oldest_to_keep)

Check warning on line 166 in api/app/jobs/common_model_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/common_model_fetchers.py#L166

Added line #L166 was not covered by tests


def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: ModelRunPrediction, model_run_hour: int):
Expand Down Expand Up @@ -349,7 +351,7 @@
machine.learn()

# Get all the predictions associated to this particular model run.
query = get_model_run_predictions(self.session, model_run)
query = get_model_run_predictions_for_station(self.session, station.code, model_run)

nam_cumulative_precip = 0.0
# Iterate through all the predictions.
Expand Down
3 changes: 2 additions & 1 deletion api/app/tests/weather_models/test_process_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ def test_read_single_raster_value():
geo_to_raster_transformer)

raster_band = dataset.GetRasterBand(1)
value = next(processor.yield_value_for_stations(raster_band))
station, value = next(processor.yield_value_for_stations(raster_band))

assert math.isclose(value, 55.976, abs_tol=0.001)
assert station.code == 322

del dataset
42 changes: 23 additions & 19 deletions api/app/weather_models/process_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
logger.warning('coordinate not in raster - %s', station)
continue

yield value
yield (station, value)

def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_band: gdal.Dataset, variable: str):
""" Given a list of stations and 2 gdal datasets (one for u-component of wind, one for v-component
Expand All @@ -191,11 +191,11 @@
v_value = v_raster_band.ReadAsArray(x_coordinate, y_coordinate, 1, 1)[0, 0]

if variable == 'wdir_tgl_10':
yield calculate_wind_dir_from_u_v(u_value, v_value)
yield (station, calculate_wind_dir_from_u_v(u_value, v_value))

Check warning on line 194 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L194

Added line #L194 was not covered by tests
elif variable == 'wind_tgl_10':
metres_per_second_speed = calculate_wind_speed_from_u_v(u_value, v_value)
kilometres_per_hour_speed = convert_mps_to_kph(metres_per_second_speed)
yield kilometres_per_hour_speed
yield (station, kilometres_per_hour_speed)

Check warning on line 198 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L198

Added line #L198 was not covered by tests
else:
logger.warning('coordinate not in u/v wind rasters - %s', station)

Expand Down Expand Up @@ -240,22 +240,26 @@
return variable_name

def store_prediction_value(self,
station_code: int,
value: float,
preduction_model_run: PredictionModelRunTimestamp,
grib_info: ModelRunInfo,
session: Session):
""" Store the values around the area of interest.
"""
# Load the record if it exists.
prediction = session.query(ModelRunPrediction).\
filter(
ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id).\
filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp).first()
prediction = session.query(ModelRunPrediction)\
.filter(
ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id)\
.filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp)\
.filter(ModelRunPrediction.station_code == station_code)\
.first()
if not prediction:
# Record doesn't exist, so we create it.
prediction = ModelRunPrediction()
prediction.prediction_model_run_timestamp_id = preduction_model_run.id
prediction.prediction_timestamp = grib_info.prediction_timestamp
prediction.station_code = station_code

Check warning on line 262 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L262

Added line #L262 was not covered by tests

variable_name = self.get_variable_name(grib_info)
setattr(prediction, variable_name, value)
Expand All @@ -267,13 +271,13 @@
# for GDPS, RDPS, HRDPS models, always only ever 1 raster band in the dataset
raster_band = dataset.GetRasterBand(1)
# Iterate through stations:
for value in self.yield_value_for_stations(raster_band):
for station, value in self.yield_value_for_stations(raster_band):
# Convert wind speed from metres per second to kilometres per hour for Environment Canada
# models (NOAA models handled elswhere)
if grib_info.variable_name.lower().startswith("wind_agl") or grib_info.variable_name.lower().startswith('wind_tgl'):
value = convert_mps_to_kph(value)

self.store_prediction_value(value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, value, prediction_run, grib_info, session)

def get_raster_bands(self, dataset, grib_info: ModelRunInfo):
""" Returns raster bands of dataset for temperature, RH, U/V wind components, and
Expand Down Expand Up @@ -308,22 +312,22 @@
tmp_raster_band, rh_raster_band, u_wind_raster_band, v_wind_raster_band, precip_raster_band = self.get_raster_bands(
dataset, grib_info)

for tmp_value in self.yield_value_for_stations(tmp_raster_band):
for station, tmp_value in self.yield_value_for_stations(tmp_raster_band):

Check warning on line 315 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L315

Added line #L315 was not covered by tests
grib_info.variable_name = 'tmp_tgl_2'
self.store_prediction_value(tmp_value, prediction_run, grib_info, session)
for rh_value in self.yield_value_for_stations(rh_raster_band):
self.store_prediction_value(station.code, tmp_value, prediction_run, grib_info, session)
for station, rh_value in self.yield_value_for_stations(rh_raster_band):

Check warning on line 318 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L317-L318

Added lines #L317 - L318 were not covered by tests
grib_info.variable_name = 'rh_tgl_2'
self.store_prediction_value(rh_value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, rh_value, prediction_run, grib_info, session)

Check warning on line 320 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L320

Added line #L320 was not covered by tests
if precip_raster_band:
for apcp_value in self.yield_value_for_stations(precip_raster_band):
for station, apcp_value in self.yield_value_for_stations(precip_raster_band):

Check warning on line 322 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L322

Added line #L322 was not covered by tests
grib_info.variable_name = 'apcp_sfc_0'
self.store_prediction_value(apcp_value, prediction_run, grib_info, session)
for wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'):
self.store_prediction_value(station.code, apcp_value, prediction_run, grib_info, session)
for station, wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'):

Check warning on line 325 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L324-L325

Added lines #L324 - L325 were not covered by tests
grib_info.variable_name = 'wdir_tgl_10'
self.store_prediction_value(wdir_value, prediction_run, grib_info, session)
for wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'):
self.store_prediction_value(station.code, wdir_value, prediction_run, grib_info, session)
for station, wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'):

Check warning on line 328 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L327-L328

Added lines #L327 - L328 were not covered by tests
grib_info.variable_name = 'wind_tgl_10'
self.store_prediction_value(wind_value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, wind_value, prediction_run, grib_info, session)

Check warning on line 330 in api/app/weather_models/process_grib.py

View check run for this annotation

Codecov / codecov/patch

api/app/weather_models/process_grib.py#L330

Added line #L330 was not covered by tests

def process_grib_file(self, filename, grib_info: ModelRunInfo, session: Session):
""" Process a grib file, extracting and storing relevant information. """
Expand Down
Loading