Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single values from weather model rasters #3133

Merged
merged 12 commits into from
Oct 5, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""singular value weather model prediction table

Revision ID: 2442f07d975c
Revises: f49418d95584
Create Date: 2023-09-26 16:14:01.658515

"""
from alembic import op
import sqlalchemy as sa
from app.db.models.common import TZTimeStamp


# revision identifiers, used by Alembic.
revision = '2442f07d975c'
down_revision = 'f49418d95584'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('model_run_predictions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('prediction_model_run_timestamp_id', sa.Integer(), nullable=False),
sa.Column('prediction_timestamp', TZTimeStamp, nullable=False),
sa.Column('tmp_tgl_2', sa.Float(), nullable=True),
sa.Column('rh_tgl_2', sa.Float(), nullable=True),
sa.Column('apcp_sfc_0', sa.Float(), nullable=True),
sa.Column('wdir_tgl_10', sa.Float(), nullable=True),
sa.Column('wind_tgl_10', sa.Float(), nullable=True),
sa.ForeignKeyConstraint(['prediction_model_run_timestamp_id'], [
'prediction_model_run_timestamps.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp'),
comment='The prediction values of a particular model run.'
)
op.create_index(op.f('ix_model_run_predictions_id'), 'model_run_predictions', ['id'], unique=False)
op.create_index(op.f('ix_model_run_predictions_prediction_model_run_timestamp_id'),
'model_run_predictions', ['prediction_model_run_timestamp_id'], unique=False)
op.create_index(op.f('ix_model_run_predictions_prediction_timestamp'),
'model_run_predictions', ['prediction_timestamp'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_model_run_predictions_prediction_timestamp'), table_name='model_run_predictions')
op.drop_index(op.f('ix_model_run_predictions_prediction_model_run_timestamp_id'),
table_name='model_run_predictions')
op.drop_index(op.f('ix_model_run_predictions_id'), table_name='model_run_predictions')
op.drop_table('model_run_predictions')
# ### end Alembic commands ###
13 changes: 6 additions & 7 deletions api/app/db/crud/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import and_, select
from sqlalchemy.sql import func
from sqlalchemy.orm import Session
from app.db.models.weather_models import ModelRunGridSubsetPrediction, PredictionModelRunTimestamp
from app.db.models.weather_models import ModelRunPrediction, PredictionModelRunTimestamp
from app.db.models.observations import HourlyActual


Expand All @@ -31,19 +31,18 @@ def get_hourly_actuals(


def get_actuals_left_outer_join_with_predictions(
session: Session, model_id: int, grid_id: int, station_code: int,
session: Session, model_id: int, station_code: int,
start_date: datetime, end_date: datetime):
"""
NOTE: Can improve this query by only returning the most recent prediction, maybe using nested
queries. It works for now - but things could be faster.
"""
return session.query(HourlyActual, ModelRunGridSubsetPrediction)\
.outerjoin(ModelRunGridSubsetPrediction,
and_(ModelRunGridSubsetPrediction.prediction_timestamp == HourlyActual.weather_date,
ModelRunGridSubsetPrediction.prediction_model_grid_subset_id == grid_id))\
return session.query(HourlyActual, ModelRunPrediction)\
.outerjoin(ModelRunPrediction,
and_(ModelRunPrediction.prediction_timestamp == HourlyActual.weather_date))\
.outerjoin(PredictionModelRunTimestamp,
and_(PredictionModelRunTimestamp.id ==
ModelRunGridSubsetPrediction.prediction_model_run_timestamp_id,
ModelRunPrediction.prediction_model_run_timestamp_id,
PredictionModelRunTimestamp.prediction_model_id == model_id))\
.filter(HourlyActual.station_code == station_code)\
.filter(HourlyActual.weather_date >= start_date)\
Expand Down
137 changes: 10 additions & 127 deletions api/app/db/crud/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,16 @@
import logging
import datetime
from typing import List, Union
from sqlalchemy import or_, and_, func
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from sqlalchemy.sql import text
from app.weather_models import ModelEnum, ProjectionEnum
from app.db.models.weather_models import (
ProcessedModelRunUrl, PredictionModel, PredictionModelRunTimestamp, PredictionModelGridSubset,
ModelRunGridSubsetPrediction, WeatherStationModelPrediction, MoreCast2MaterializedView)
import app.utils.time as time_utils
ProcessedModelRunUrl, PredictionModel, PredictionModelRunTimestamp,
ModelRunPrediction, WeatherStationModelPrediction, MoreCast2MaterializedView)

logger = logging.getLogger(__name__)

# -------------- COMMON UTILITY FUNCTIONS ---------------------------


def _construct_grid_filter(coordinates):
# Run through each coordinate, adding it to the "or" construct.
geom_or = None
for coordinate in coordinates:
condition = PredictionModelGridSubset.geom.ST_Contains(
'POINT({longitude} {latitude})'.format(longitude=coordinate[0], latitude=coordinate[1]))
if geom_or is None:
geom_or = or_(condition)
else:
geom_or = or_(condition, geom_or)
return geom_or


# ----------- end of UTILITY FUNCTIONS ------------------------


def get_or_create_grid_subset(session: Session,
prediction_model: PredictionModel,
geographic_points) -> PredictionModelGridSubset:
""" Get the subset of grid points of interest. """
geom = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {} {}))'.format(
geographic_points[0][0], geographic_points[0][1],
geographic_points[1][0], geographic_points[1][1],
geographic_points[2][0], geographic_points[2][1],
geographic_points[3][0], geographic_points[3][1],
geographic_points[0][0], geographic_points[0][1])
grid_subset = session.query(PredictionModelGridSubset).\
filter(PredictionModelGridSubset.prediction_model_id == prediction_model.id).\
filter(PredictionModelGridSubset.geom == geom).first()
if not grid_subset:
logger.info('creating grid subset %s', geographic_points)
grid_subset = PredictionModelGridSubset(
prediction_model_id=prediction_model.id, geom=geom)
session.add(grid_subset)
session.commit()
return grid_subset


def get_prediction_run(session: Session, prediction_model_id: int,
prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp:
Expand Down Expand Up @@ -103,40 +62,14 @@ def get_or_create_prediction_run(session, prediction_model: PredictionModel,
return prediction_run


def get_grids_for_coordinate(session: Session,
prediction_model: PredictionModel,
coordinate) -> PredictionModelGridSubset:
""" Given a specified coordinate and model, return the appropriate grids.
There should only every be one grid per coordinate - but it's conceivable that there are more than one.
"""
logger.info("Model %s, coords %s,%s", prediction_model.id,
coordinate[1], coordinate[0])
query = session.query(PredictionModelGridSubset).\
filter(PredictionModelGridSubset.geom.ST_Contains(
'POINT({longitude} {latitude})'.format(longitude=coordinate[0], latitude=coordinate[1]))).\
filter(PredictionModelGridSubset.prediction_model_id == prediction_model.id)
return query


def get_model_run_predictions_for_grid(session: Session,
prediction_run: PredictionModelRunTimestamp,
grid: PredictionModelGridSubset) -> List:
""" Get all the predictions for a provided model run and grid. """
logger.info("Getting model predictions for grid %s", grid)
return session.query(ModelRunGridSubsetPrediction).\
filter(ModelRunGridSubsetPrediction.prediction_model_grid_subset_id == grid.id).\
filter(ModelRunGridSubsetPrediction.prediction_model_run_timestamp_id ==
def get_model_run_predictions(session: Session,
prediction_run: PredictionModelRunTimestamp) -> List:
""" Get all the predictions for a provided model run """
logger.info("Getting model predictions for grid %s", prediction_run)
return session.query(ModelRunPrediction).\
filter(ModelRunPrediction.prediction_model_run_timestamp_id ==
prediction_run.id).\
order_by(ModelRunGridSubsetPrediction.prediction_timestamp)


def delete_model_run_grid_subset_predictions(session: Session, older_than: datetime):
""" Delete any grid subset prediction older than a certain date.
"""
logger.info('Deleting grid subset data older than %s...', older_than)
session.query(ModelRunGridSubsetPrediction)\
.filter(ModelRunGridSubsetPrediction.prediction_timestamp < older_than)\
.delete()
order_by(ModelRunPrediction.prediction_timestamp)


def delete_weather_station_model_predictions(session: Session, older_than: datetime):
Expand All @@ -148,56 +81,6 @@ def delete_weather_station_model_predictions(session: Session, older_than: datet
.delete()


def get_model_run_predictions(
session: Session,
prediction_run: PredictionModelRunTimestamp,
coordinates) -> List:
"""
Get the predictions for a particular model run, for a specified geographical coordinate.

Returns a PredictionModelGridSubset with joined Prediction and PredictionValueType."""
# condition for query: are coordinates within the saved grids
geom_or = _construct_grid_filter(coordinates)

# We are only interested in predictions from now onwards
now = time_utils.get_utc_now()

# Build up the query:
query = session.query(PredictionModelGridSubset, ModelRunGridSubsetPrediction).\
filter(geom_or).\
filter(ModelRunGridSubsetPrediction.prediction_model_run_timestamp_id == prediction_run.id).\
filter(ModelRunGridSubsetPrediction.prediction_model_grid_subset_id == PredictionModelGridSubset.id).\
filter(ModelRunGridSubsetPrediction.prediction_timestamp >= now).\
order_by(PredictionModelGridSubset.id,
ModelRunGridSubsetPrediction.prediction_timestamp.asc())
return query


def get_predictions_from_coordinates(session: Session, coordinates: List, model: str) -> List:
""" Get the predictions for a particular model, at a specified geographical coordinate. """
# condition for query: are coordinates within the saved grids
geom_or = _construct_grid_filter(coordinates)

# We are only interested in the last 5 days.
now = time_utils.get_utc_now()
back_5_days = now - datetime.timedelta(days=5)

# Build the query:
query = session.query(PredictionModelGridSubset,
ModelRunGridSubsetPrediction,
PredictionModel).\
filter(geom_or).\
filter(ModelRunGridSubsetPrediction.prediction_timestamp >= back_5_days,
ModelRunGridSubsetPrediction.prediction_timestamp <= now).\
filter(PredictionModelGridSubset.id ==
ModelRunGridSubsetPrediction.prediction_model_grid_subset_id).\
filter(PredictionModelGridSubset.prediction_model_id == PredictionModel.id,
PredictionModel.abbreviation == model).\
order_by(PredictionModelGridSubset.id,
ModelRunGridSubsetPrediction.prediction_timestamp.asc())
return query


def get_station_model_predictions_order_by_prediction_timestamp(
session: Session,
station_codes: List,
Expand Down
30 changes: 30 additions & 0 deletions api/app/db/models/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,36 @@ def __str__(self):
'wind_tgl_10={self.wind_tgl_10}').format(self=self)


class ModelRunPrediction(Base):
""" The prediction for a particular model.
Each value is a numeric value that corresponds to the lat lon from the model raster """
__tablename__ = 'model_run_predictions'
__table_args__ = (
UniqueConstraint('prediction_model_run_timestamp_id', 'prediction_timestamp'),
{'comment': 'The prediction values of a particular model run.'}
)

id = Column(Integer, Sequence('model_run_predictions_id_seq'),
primary_key=True, nullable=False, index=True)
# Which model run does this forecacst apply to? E.g. The GDPS 15x.15 run from 2020 07 07 12h00.
prediction_model_run_timestamp_id = Column(Integer, ForeignKey(
'prediction_model_run_timestamps.id'), nullable=False, index=True)
prediction_model_run_timestamp = relationship(
"PredictionModelRunTimestamp", foreign_keys=[prediction_model_run_timestamp_id])
# The date and time to which the prediction applies.
prediction_timestamp = Column(TZTimeStamp, nullable=False, index=True)
# Temperature 2m above model layer.
tmp_tgl_2 = Column(Float, nullable=True)
# Relative humidity 2m above model layer.
rh_tgl_2 = Column(Float, nullable=True)
# Accumulated precipitation (units kg.m^-2)
apcp_sfc_0 = Column(Float, nullable=True)
# Wind direction 10m above ground.
wdir_tgl_10 = Column(Float, nullable=True)
# Wind speed 10m above ground.
wind_tgl_10 = Column(Float, nullable=True)


class WeatherStationModelPrediction(Base):
""" The model prediction for a particular weather station.
Based on values from ModelRunGridSubsetPrediction, but captures linear interpolations based on weather
Expand Down
Loading
Loading