Skip to content

Commit

Permalink
Bias adjusted precip (#3202)
Browse files Browse the repository at this point in the history
Co-authored-by: Conor Brady <[email protected]>
  • Loading branch information
dgboss and conbrad authored Nov 1, 2023
1 parent e5445a9 commit b781237
Show file tree
Hide file tree
Showing 22 changed files with 376 additions and 59 deletions.
68 changes: 68 additions & 0 deletions api/alembic/versions/f2e027a47a3f_bias_adjusted_precip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""bias adjusted precip
Revision ID: f2e027a47a3f
Revises: 5b745fe0bd7a
Create Date: 2023-10-30 11:34:21.603046
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = 'f2e027a47a3f'
down_revision = '5b745fe0bd7a'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('weather_station_model_predictions', sa.Column('bias_adjusted_precip_24h', sa.Float(), nullable=True))
# Drop morecast_2_materialized view and recreate with the bias_adjusted_precip_24h field
op.execute("DROP MATERIALIZED VIEW morecast_2_materialized_view;")
op.execute("""
CREATE MATERIALIZED VIEW morecast_2_materialized_view AS
SELECT weather_station_model_predictions.prediction_timestamp, prediction_models.abbreviation, weather_station_model_predictions.station_code,
weather_station_model_predictions.rh_tgl_2, weather_station_model_predictions.tmp_tgl_2, weather_station_model_predictions.bias_adjusted_temperature,
weather_station_model_predictions.bias_adjusted_rh, weather_station_model_predictions.precip_24h, weather_station_model_predictions.wdir_tgl_10,
weather_station_model_predictions.wind_tgl_10, weather_station_model_predictions.bias_adjusted_wind_speed, weather_station_model_predictions.bias_adjusted_wdir,
weather_station_model_predictions.bias_adjusted_precip_24h, weather_station_model_predictions.update_date,
weather_station_model_predictions.prediction_model_run_timestamp_id
FROM weather_station_model_predictions
JOIN prediction_model_run_timestamps
ON weather_station_model_predictions.prediction_model_run_timestamp_id = prediction_model_run_timestamps.id JOIN prediction_models
ON prediction_model_run_timestamps.prediction_model_id = prediction_models.id
JOIN (
SELECT max(weather_station_model_predictions.prediction_timestamp) AS latest_prediction, weather_station_model_predictions.station_code AS station_code,
date(weather_station_model_predictions.prediction_timestamp) AS unique_day
FROM weather_station_model_predictions
WHERE date_part('hour', weather_station_model_predictions.prediction_timestamp) = 20
GROUP BY weather_station_model_predictions.station_code, date(weather_station_model_predictions.prediction_timestamp)
) AS latest
ON weather_station_model_predictions.prediction_timestamp = latest.latest_prediction AND weather_station_model_predictions.station_code = latest.station_code
ORDER BY weather_station_model_predictions.update_date DESC;""")


def downgrade():
# Drop morecast_2_materialized view before dropping the column in the table in order to avoid dependency issues
op.execute("DROP MATERIALIZED VIEW morecast_2_materialized_view;")
op.drop_column('weather_station_model_predictions', 'bias_adjusted_precip_24h')
op.execute("""
CREATE MATERIALIZED VIEW morecast_2_materialized_view AS
SELECT weather_station_model_predictions.prediction_timestamp, prediction_models.abbreviation, weather_station_model_predictions.station_code,
weather_station_model_predictions.rh_tgl_2, weather_station_model_predictions.tmp_tgl_2, weather_station_model_predictions.bias_adjusted_temperature,
weather_station_model_predictions.bias_adjusted_rh, weather_station_model_predictions.precip_24h, weather_station_model_predictions.wdir_tgl_10,
weather_station_model_predictions.wind_tgl_10, weather_station_model_predictions.bias_adjusted_wind_speed, weather_station_model_predictions.bias_adjusted_wdir,
weather_station_model_predictions.update_date
FROM weather_station_model_predictions
JOIN prediction_model_run_timestamps
ON weather_station_model_predictions.prediction_model_run_timestamp_id = prediction_model_run_timestamps.id JOIN prediction_models
ON prediction_model_run_timestamps.prediction_model_id = prediction_models.id
JOIN (
SELECT max(weather_station_model_predictions.prediction_timestamp) AS latest_prediction, weather_station_model_predictions.station_code AS station_code,
date(weather_station_model_predictions.prediction_timestamp) AS unique_day
FROM weather_station_model_predictions
WHERE date_part('hour', weather_station_model_predictions.prediction_timestamp) = 20
GROUP BY weather_station_model_predictions.station_code, date(weather_station_model_predictions.prediction_timestamp)
) AS latest
ON weather_station_model_predictions.prediction_timestamp = latest.latest_prediction AND weather_station_model_predictions.station_code = latest.station_code
ORDER BY weather_station_model_predictions.update_date DESC;""")
76 changes: 74 additions & 2 deletions api/app/db/crud/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
"""
import datetime
from typing import List
from sqlalchemy import and_, select
from sqlalchemy import and_, select, text
from sqlalchemy.sql import func
from sqlalchemy.orm import Session
from app.db.models.weather_models import ModelRunPrediction, PredictionModelRunTimestamp
from app.db.models.weather_models import (ModelRunPrediction, PredictionModel, PredictionModelRunTimestamp,
WeatherStationModelPrediction)
from app.db.models.observations import HourlyActual


Expand Down Expand Up @@ -69,3 +70,74 @@ def get_accumulated_precipitation(session: Session, station_code: int, start_dat
if result is None:
return 0
return result


def get_accumulated_precip_by_24h_interval(session: Session, station_code: int, start_datetime: datetime, end_datetime: datetime):
""" Get the accumulated precip for 24 hour intervals for a given station code within the specified time interval.
:param session: The ORM/database session.
:param station_code: The numeric code identifying the weather station of interest.
:param start_datetime: The earliest date and time of interest.
:param end_datetime: The latest date and time of interest.
Note: I couldn't construct this query in SQLAlchemy, hence the need for the 'text' based query.
generate_series(\'{}\', \'{}\', '24 hours'::interval)
This gives us a one column table of dates separated by 24 hours between the start and end dates. For example, if start and end dates are 2023-10-31 20:00:00 to 2023-11-03 20:00:00 we would have a table like:
day
2023-10-31 20:00:00
2023-11-01 20:00:00
2023-11-02 20:00:00
2023-11-03 20:00:00
We then join the HourlyActuals table so that we can sum hourly precip in a 24 hour period. The join is based on the weather_date field in the HourlyActuals table being in a 24 hour range using this odd looking syntax:
weather_date <@ tstzrange(day, day + '24 hours', '(]')
Using 2023-10-31 20:00:00 as an example, rows with the following dates would match. The (] syntax means the lower bound is excluded but the upper bound is included.
2023-10-31 21:00:00
2023-10-31 22:00:00
2023-10-31 23:00:00
2023-11-01 00:00:00
2023-11-01 01:00:00
....
2023-11-01 19:00:00
2023-11-01 20:00:00
"""
stmt = """
SELECT day, station_code, sum(precipitation) actual_precip_24h
FROM
generate_series(\'{}\', \'{}\', '24 hours'::interval) day
LEFT JOIN
hourly_actuals
ON
weather_date <@ tstzrange(day - INTERVAL '24 hours', day, '(]')
WHERE
station_code = {}
GROUP BY
day, station_code;
""".format(start_datetime, end_datetime, station_code)
result = session.execute(text(stmt))
return result.all()


def get_predicted_daily_precip(session: Session, model: PredictionModel, station_code: int, start_datetime: datetime, end_datetime: datetime):
""" Gets rows from WeatherStationModelPrediction for the given model and station within the
specified time interval at 20:00:00 UTC each day.
:param session: The ORM/database session
:param model: The numeric weather prediction model
:param station_code: The code identifying the weather station.
:param start_datetime: The earliest date and time of interest.
:param end_datetime: The latest date and time of interest.
"""
result = session.query(WeatherStationModelPrediction)\
.join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id)\
.filter(PredictionModelRunTimestamp.prediction_model_id == model.id)\
.filter(WeatherStationModelPrediction.station_code == station_code)\
.filter(WeatherStationModelPrediction.prediction_timestamp >= start_datetime)\
.filter(WeatherStationModelPrediction.prediction_timestamp < end_datetime)\
.filter(func.date_part('hour', WeatherStationModelPrediction.prediction_timestamp) == 20)\
.order_by(WeatherStationModelPrediction.prediction_timestamp)
return result.all()
1 change: 1 addition & 0 deletions api/app/db/crud/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def get_latest_station_prediction_mat_view(session: Session,
MoreCast2MaterializedView.bias_adjusted_wind_speed,
MoreCast2MaterializedView.bias_adjusted_wdir,
MoreCast2MaterializedView.precip_24h,
MoreCast2MaterializedView.bias_adjusted_precip_24h,
MoreCast2MaterializedView.wdir_tgl_10,
MoreCast2MaterializedView.wind_tgl_10,
MoreCast2MaterializedView.update_date).\
Expand Down
5 changes: 4 additions & 1 deletion api/app/db/models/weather_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,13 @@ class WeatherStationModelPrediction(Base):
default=time_utils.get_utc_now(), index=True)
# Precipitation predicted for the previous 24 hour period.
precip_24h = Column(Float, nullable=True)
# Precipitation predicted for the previous 24 hour period adjusted for bias.
bias_adjusted_precip_24h = Column(Float, nullable=True)

def __str__(self):
return ('{self.station_code} {self.prediction_timestamp} {self.tmp_tgl_2} {self.bias_adjusted_temperature} '
'{self.rh_tgl_2} {self.bias_adjusted_rh} {self.wdir_tgl_10} {self.bias_adjusted_wdir} {self.wind_tgl_10} '
'{self.bias_adjusted_wind_speed} {self.apcp_sfc_0} {self.delta_precip}').format(self=self)
'{self.bias_adjusted_wind_speed} {self.apcp_sfc_0} {self.delta_precip} {self.bias_adjusted_precip_24h}').format(self=self)


class MoreCast2MaterializedView(Base):
Expand All @@ -267,6 +269,7 @@ class MoreCast2MaterializedView(Base):
primary_key=True, nullable=False, index=True)
abbreviation = Column(String, nullable=False)
apcp_sfc_0 = Column(Float, nullable=False)
bias_adjusted_precip_24h = Column(Float, nullable=False)
bias_adjusted_rh = Column(Float, nullable=False)
bias_adjusted_temperature = Column(Float, nullable=False)
bias_adjusted_wind_speed = Column(Float, nullable=False)
Expand Down
7 changes: 6 additions & 1 deletion api/app/jobs/common_model_fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def _process_prediction(self,
station_prediction.apcp_sfc_0 = 0.0
else:
station_prediction.apcp_sfc_0 = prediction.apcp_sfc_0
# Calculate the delta_precipitation based on station's previous prediction_timestamp
# Calculate the delta_precipitation and 24 hour precip based on station's previous prediction_timestamp
# for the same model run
self.session.flush()
station_prediction.precip_24h = self._calculate_past_24_hour_precip(
Expand Down Expand Up @@ -280,6 +280,11 @@ def _process_prediction(self,
station_prediction.wdir_tgl_10,
station_prediction.prediction_timestamp
)
# Predict the 24 hour precipitation
station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation(
station_prediction.precip_24h,
station_prediction.prediction_timestamp
)

# Update the update time (this might be an update)
station_prediction.update_date = time_utils.get_utc_now()
Expand Down
4 changes: 2 additions & 2 deletions api/app/tests/fixtures/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def guess_filename(self, url: str, params: dict):
if params:
filename = filename + '_'
for key, value in params.items():
if type(value) == str:
if type(value) == str: # noqa: E721
value = value.replace(' ', '_')
filename = '{}_{}_{}'.format(filename, key, value)
filename = filename[:200] + '.json'
Expand All @@ -110,7 +110,7 @@ def lookup_fixture_filename(self,
lookup = nested_defaultdict(lookup)
fixture = lookup[url][verb][str(params)][str(data)]

if type(fixture) is not str:
if type(fixture) is not str: # noqa: E721
if config.get('AUTO_MAKE_FIXTURES'):
logger.warning('inserting fixture into lookup')
lookup[url][verb][str(params)][str(data)] = self.guess_filename(url, params)
Expand Down
41 changes: 40 additions & 1 deletion api/app/tests/weather_models/crud.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
""" Some crud responses used to mock our calls to app.db.crud
"""
from datetime import datetime
from app.db.models.weather_models import ModelRunPrediction
from app.db.models.weather_models import ModelRunPrediction, WeatherStationModelPrediction
from app.db.models.observations import HourlyActual


class MockActualPrecip:
day: datetime
actual_precip_24h: float

def __init__(self, day, actual_precip_24h):
self.day=day
self.actual_precip_24h=actual_precip_24h


def get_actuals_left_outer_join_with_predictions(*args):
""" Fixed response as replacement for app.db.crud.observations.get_actuals_left_outer_join_with_predictions
"""
Expand Down Expand Up @@ -89,3 +98,33 @@ def get_actuals_left_outer_join_with_predictions(*args):
prediction_timestamp=datetime(2020, 10, 11, 21))]
]
return result

def get_accumulated_precip_by_24h_interval(*args):
""" Fixed response as replacement for app.db.crud.observations.get_accumulated_precip_by_24h_interval
"""
return [
MockActualPrecip(
day=datetime(2023,10,10,20,0,0),
actual_precip_24h=3

),
MockActualPrecip(
day=datetime(2023,10,11,20,0,0),
actual_precip_24h=3
)
]


def get_predicted_daily_precip(*args):
return [
WeatherStationModelPrediction(
bias_adjusted_precip_24h=None,
precip_24h=3,
prediction_timestamp=datetime(2023,10,10,20,0,0)
),
WeatherStationModelPrediction(
bias_adjusted_precip_24h=None,
precip_24h=3,
prediction_timestamp=datetime(2023,10,11,20,0,0)
)
]
4 changes: 0 additions & 4 deletions api/app/tests/weather_models/test_env_canada_gdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import pytest
import requests
from sqlalchemy.orm import Session
from shapely import wkt
from app.jobs import env_canada
from app.jobs import common_model_fetchers
import app.utils.time as time_utils
Expand Down Expand Up @@ -48,9 +47,6 @@ def mock_get_actuals_left_outer_join_with_predictions(monkeypatch):
@pytest.fixture()
def mock_database(monkeypatch):
""" Mocked out database queries """
geom = ("POLYGON ((-120.525 50.77500000000001, -120.375 50.77500000000001,-120.375 50.62500000000001,"
" -120.525 50.62500000000001, -120.525 50.77500000000001))")
shape = wkt.loads(geom)
gdps_url = ('https://dd.weather.gc.ca/model_gem_global/15km/grib2/lat_lon/00/000/'
'CMC_glb_TMP_TGL_2_latlon.15x.15_2020052100_P000.grib2')
gdps_processed_model_run = ProcessedModelRunUrl(url=gdps_url)
Expand Down
5 changes: 0 additions & 5 deletions api/app/tests/weather_models/test_env_canada_hrdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Optional
import pytest
import requests
import shapely
import datetime
from sqlalchemy.orm import Session
from pytest_mock import MockerFixture
Expand All @@ -29,10 +28,6 @@
@pytest.fixture()
def mock_database(monkeypatch):
""" Mocked out database queries """
geom = ("POLYGON ((-120.525 50.77500000000001, -120.375 50.77500000000001,-120.375 50.62500000000001,"
" -120.525 50.62500000000001, -120.525 50.77500000000001))")
shape = shapely.wkt.loads(geom)

hrdps_url = 'https://dd.weather.gc.ca/model_hrdps/continental/2.5km/' \
'00/001/20200521T00Z_MSC_HRDPS_RH_AGL-2m_RLatLon0.0225_PT001H.grib2'
hrdps_processed_model_run = ProcessedModelRunUrl(url=hrdps_url)
Expand Down
5 changes: 0 additions & 5 deletions api/app/tests/weather_models/test_env_canada_rdps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pytest
import requests
from typing import Optional
from shapely import wkt
from sqlalchemy.orm import Session
import app.utils.time as time_utils
import app.weather_models.process_grib
Expand All @@ -24,10 +23,6 @@
@pytest.fixture()
def mock_database(monkeypatch):
""" Mocked out database queries """
geom = ("POLYGON ((-120.525 50.77500000000001, -120.375 50.77500000000001,-120.375 50.62500000000001,"
" -120.525 50.62500000000001, -120.525 50.77500000000001))")
shape = wkt.loads(geom)

rdps_url = ('https://dd.weather.gc.ca/model_gem_regional/10km/grib2/00/034/'
'CMC_reg_RH_TGL_2_ps10km_2020052100_P034.grib2')
rdps_processed_model_run = ProcessedModelRunUrl(url=rdps_url)
Expand Down
Loading

0 comments on commit b781237

Please sign in to comment.