Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add station code field #3199

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add station_code to ModelRunPrediction

Revision ID: 5b745fe0bd7a
Revises: 505a3f03ba75
Create Date: 2023-10-25 11:34:45.429269

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '5b745fe0bd7a'
down_revision = '505a3f03ba75'
branch_labels = None
depends_on = None


def upgrade():
# Delete unusable data from the model_run_predictions table
op.execute(sa.text('DELETE FROM model_run_predictions'))
# ### commands auto generated by Alembic ###
op.add_column('model_run_predictions', sa.Column('station_code', sa.Integer(), nullable=False))
op.drop_constraint('model_run_predictions_prediction_model_run_timestamp_id_pre_key', 'model_run_predictions', type_='unique')
op.create_unique_constraint('model_run_predictions_unique_constraint', 'model_run_predictions', ['prediction_model_run_timestamp_id', 'prediction_timestamp', 'station_code'])
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic ###
op.drop_constraint('model_run_predictions_unique_constraint', 'model_run_predictions', type_='unique')
op.create_unique_constraint('model_run_predictions_prediction_model_run_timestamp_id_pre_key', 'model_run_predictions', ['prediction_model_run_timestamp_id', 'prediction_timestamp'])
op.drop_column('model_run_predictions', 'station_code')
# ### end Alembic commands ###
3 changes: 2 additions & 1 deletion api/app/db/crud/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def get_actuals_left_outer_join_with_predictions(
"""
return session.query(HourlyActual, ModelRunPrediction)\
.outerjoin(ModelRunPrediction,
and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date))\
and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date,
ModelRunPrediction.station_code == station_code))\
.outerjoin(PredictionModelRunTimestamp,
and_(PredictionModelRunTimestamp.id ==
ModelRunPrediction.prediction_model_run_timestamp_id,
Expand Down
20 changes: 15 additions & 5 deletions api/app/db/crud/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ def get_or_create_prediction_run(session, prediction_model: PredictionModel,
return prediction_run


def get_model_run_predictions(session: Session,
def get_model_run_predictions_for_station(session: Session, station_code: int,
prediction_run: PredictionModelRunTimestamp) -> List:
""" Get all the predictions for a provided model run """
logger.info("Getting model predictions for grid %s", prediction_run)
return session.query(ModelRunPrediction).\
filter(ModelRunPrediction.prediction_model_run_timestamp_id ==
prediction_run.id).\
order_by(ModelRunPrediction.prediction_timestamp)
return session.query(ModelRunPrediction)\
.filter(ModelRunPrediction.prediction_model_run_timestamp_id ==
prediction_run.id)\
.filter(ModelRunPrediction.station_code == station_code)\
.order_by(ModelRunPrediction.prediction_timestamp)


def delete_weather_station_model_predictions(session: Session, older_than: datetime):
Expand All @@ -79,6 +80,15 @@ def delete_weather_station_model_predictions(session: Session, older_than: datet
session.query(WeatherStationModelPrediction)\
.filter(WeatherStationModelPrediction.prediction_timestamp < older_than)\
.delete()


def delete_model_run_predictions(session: Session, older_than: datetime):
""" Delete any model run prediction older than a certain date.
"""
logger.info('Deleting model_run_prediction data older than %s...', older_than)
session.query(ModelRunPrediction)\
.filter(ModelRunPrediction.prediction_timestamp < older_than)\
.delete()


def get_station_model_predictions_order_by_prediction_timestamp(
Expand Down
4 changes: 3 additions & 1 deletion api/app/db/models/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ModelRunPrediction(Base):
Each value is a numeric value that corresponds to the lat lon from the model raster """
__tablename__ = 'model_run_predictions'
__table_args__ = (
UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp'),
UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp', 'station_code'),
{'comment': 'The prediction values of a particular model run.'}
)

Expand All @@ -183,6 +183,8 @@ class ModelRunPrediction(Base):
"PredictionModelRunTimestamp", foreign_keys=[prediction_model_run_timestamp_id])
# The date and time to which the prediction applies.
prediction_timestamp = Column(TZTimeStamp, nullable=False, index=True)
# The station code representing the location (aka weather station).
station_code = Column(Integer, nullable=True)
# Temperature 2m above model layer.
tmp_tgl_2 = Column(Float, nullable=True)
# Relative humidity 2m above model layer.
Expand Down
8 changes: 5 additions & 3 deletions api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from app.db.crud.weather_models import (get_processed_file_record,
get_processed_file_count,
get_prediction_model_run_timestamp_records,
get_model_run_predictions,
get_model_run_predictions_for_station,
get_weather_station_model_prediction,
delete_weather_station_model_predictions,
refresh_morecast2_materialized_view)
refresh_morecast2_materialized_view,
delete_model_run_predictions)
from app.weather_models.machine_learning import StationMachineLearning
from app.weather_models import SCALAR_MODEL_VALUE_KEYS, ModelEnum, construct_interpolated_noon_prediction
from app.schemas.stations import WeatherStation
Expand Down Expand Up @@ -162,6 +163,7 @@ def apply_data_retention_policy():
# keeping 21 days (3 weeks) of historic data is sufficient.
oldest_to_keep = time_utils.get_utc_now() - time_utils.data_retention_threshold
delete_weather_station_model_predictions(session, oldest_to_keep)
delete_model_run_predictions(session, oldest_to_keep)


def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: ModelRunPrediction, model_run_hour: int):
Expand Down Expand Up @@ -349,7 +351,7 @@ def _process_model_run_for_station(self,
machine.learn()

# Get all the predictions associated to this particular model run.
query = get_model_run_predictions(self.session, model_run)
query = get_model_run_predictions_for_station(self.session, station.code, model_run)

nam_cumulative_precip = 0.0
# Iterate through all the predictions.
Expand Down
3 changes: 2 additions & 1 deletion api/app/tests/weather_models/test_process_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ def test_read_single_raster_value():
geo_to_raster_transformer)

raster_band = dataset.GetRasterBand(1)
value = next(processor.yield_value_for_stations(raster_band))
station, value = next(processor.yield_value_for_stations(raster_band))

assert math.isclose(value, 55.976, abs_tol=0.001)
assert station.code == 322

del dataset
42 changes: 23 additions & 19 deletions api/app/weather_models/process_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def yield_value_for_stations(self, raster_band: gdal.Dataset):
logger.warning('coordinate not in raster - %s', station)
continue

yield value
yield (station, value)

def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_band: gdal.Dataset, variable: str):
""" Given a list of stations and 2 gdal datasets (one for u-component of wind, one for v-component
Expand All @@ -191,11 +191,11 @@ def yield_uv_wind_data_for_stations(self, u_raster_band: gdal.Dataset, v_raster_
v_value = v_raster_band.ReadAsArray(x_coordinate, y_coordinate, 1, 1)[0, 0]

if variable == 'wdir_tgl_10':
yield calculate_wind_dir_from_u_v(u_value, v_value)
yield (station, calculate_wind_dir_from_u_v(u_value, v_value))
elif variable == 'wind_tgl_10':
metres_per_second_speed = calculate_wind_speed_from_u_v(u_value, v_value)
kilometres_per_hour_speed = convert_mps_to_kph(metres_per_second_speed)
yield kilometres_per_hour_speed
yield (station, kilometres_per_hour_speed)
else:
logger.warning('coordinate not in u/v wind rasters - %s', station)

Expand Down Expand Up @@ -240,22 +240,26 @@ def get_variable_name(self, grib_info: ModelRunInfo) -> str:
return variable_name

def store_prediction_value(self,
station_code: int,
value: float,
preduction_model_run: PredictionModelRunTimestamp,
grib_info: ModelRunInfo,
session: Session):
""" Store the values around the area of interest.
"""
# Load the record if it exists.
prediction = session.query(ModelRunPrediction).\
filter(
ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id).\
filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp).first()
prediction = session.query(ModelRunPrediction)\
.filter(
ModelRunPrediction.prediction_model_run_timestamp_id == preduction_model_run.id)\
.filter(ModelRunPrediction.prediction_timestamp == grib_info.prediction_timestamp)\
.filter(ModelRunPrediction.station_code == station_code)\
.first()
if not prediction:
# Record doesn't exist, so we create it.
prediction = ModelRunPrediction()
prediction.prediction_model_run_timestamp_id = preduction_model_run.id
prediction.prediction_timestamp = grib_info.prediction_timestamp
prediction.station_code = station_code

variable_name = self.get_variable_name(grib_info)
setattr(prediction, variable_name, value)
Expand All @@ -267,13 +271,13 @@ def process_env_can_grib_file(self, session: Session, dataset, grib_info: ModelR
# for GDPS, RDPS, HRDPS models, always only ever 1 raster band in the dataset
raster_band = dataset.GetRasterBand(1)
# Iterate through stations:
for value in self.yield_value_for_stations(raster_band):
for station, value in self.yield_value_for_stations(raster_band):
# Convert wind speed from metres per second to kilometres per hour for Environment Canada
# models (NOAA models handled elswhere)
if grib_info.variable_name.lower().startswith("wind_agl") or grib_info.variable_name.lower().startswith('wind_tgl'):
value = convert_mps_to_kph(value)

self.store_prediction_value(value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, value, prediction_run, grib_info, session)

def get_raster_bands(self, dataset, grib_info: ModelRunInfo):
""" Returns raster bands of dataset for temperature, RH, U/V wind components, and
Expand Down Expand Up @@ -308,22 +312,22 @@ def process_noaa_grib_file(self, session: Session, dataset, grib_info: ModelRunI
tmp_raster_band, rh_raster_band, u_wind_raster_band, v_wind_raster_band, precip_raster_band = self.get_raster_bands(
dataset, grib_info)

for tmp_value in self.yield_value_for_stations(tmp_raster_band):
for station, tmp_value in self.yield_value_for_stations(tmp_raster_band):
grib_info.variable_name = 'tmp_tgl_2'
self.store_prediction_value(tmp_value, prediction_run, grib_info, session)
for rh_value in self.yield_value_for_stations(rh_raster_band):
self.store_prediction_value(station.code, tmp_value, prediction_run, grib_info, session)
for station, rh_value in self.yield_value_for_stations(rh_raster_band):
grib_info.variable_name = 'rh_tgl_2'
self.store_prediction_value(rh_value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, rh_value, prediction_run, grib_info, session)
if precip_raster_band:
for apcp_value in self.yield_value_for_stations(precip_raster_band):
for station, apcp_value in self.yield_value_for_stations(precip_raster_band):
grib_info.variable_name = 'apcp_sfc_0'
self.store_prediction_value(apcp_value, prediction_run, grib_info, session)
for wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'):
self.store_prediction_value(station.code, apcp_value, prediction_run, grib_info, session)
for station, wdir_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wdir_tgl_10'):
grib_info.variable_name = 'wdir_tgl_10'
self.store_prediction_value(wdir_value, prediction_run, grib_info, session)
for wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'):
self.store_prediction_value(station.code, wdir_value, prediction_run, grib_info, session)
for station, wind_value in self.yield_uv_wind_data_for_stations(u_wind_raster_band, v_wind_raster_band, 'wind_tgl_10'):
grib_info.variable_name = 'wind_tgl_10'
self.store_prediction_value(wind_value, prediction_run, grib_info, session)
self.store_prediction_value(station.code, wind_value, prediction_run, grib_info, session)

def process_grib_file(self, filename, grib_info: ModelRunInfo, session: Session):
""" Process a grib file, extracting and storing relevant information. """
Expand Down
Loading