From 8d55c6cfbce33fecd6e9bf92def8610e0c44f6ea Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 15:28:16 -0800 Subject: [PATCH 01/34] Try no export --- openshift/pgslice/docker/fill_partition_data.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/fill_partition_data.sh b/openshift/pgslice/docker/fill_partition_data.sh index a7556a61b..ae67ffc1f 100755 --- a/openshift/pgslice/docker/fill_partition_data.sh +++ b/openshift/pgslice/docker/fill_partition_data.sh @@ -49,7 +49,7 @@ then exit 1 fi -export PGSLICE_URL="postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}" +PGSLICE_URL="postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}" # Fill the partitions with data from the original table pgslice fill $TABLE # Analyze for query planner From cecb55c66343584137e990f39d2406e542621faf Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 15:28:37 -0800 Subject: [PATCH 02/34] Change branch to pull from --- openshift/pgslice/openshift/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/openshift/build.yaml b/openshift/pgslice/openshift/build.yaml index 9cd83ee49..7acc744c3 100644 --- a/openshift/pgslice/openshift/build.yaml +++ b/openshift/pgslice/openshift/build.yaml @@ -18,7 +18,7 @@ parameters: - name: GIT_URL value: https://github.com/bcgov/wps.git - name: GIT_BRANCH - value: main + value: task/more-fixes objects: - apiVersion: v1 kind: ImageStream From 22e3bee769b64e28136cf73a8d86175020b83f40 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 15:32:10 -0800 Subject: [PATCH 03/34] Use url parameter instead --- openshift/pgslice/docker/fill_partition_data.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/openshift/pgslice/docker/fill_partition_data.sh b/openshift/pgslice/docker/fill_partition_data.sh index ae67ffc1f..886d0e3ca 100755 --- a/openshift/pgslice/docker/fill_partition_data.sh +++ b/openshift/pgslice/docker/fill_partition_data.sh @@ -51,10 +51,10 @@ fi PGSLICE_URL="postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}" # Fill the partitions with data from the original table -pgslice fill $TABLE +pgslice fill $TABLE --url $PGSLICE_URL # Analyze for query planner -pgslice analyze $TABLE +pgslice analyze $TABLE --url $PGSLICE_URL # Swap the intermediate table with the original table -pgslice swap $TABLE +pgslice swap $TABLE --url $PGSLICE_URL # Fill the rest (rows inserted between the first fill and the swap) -pgslice fill $TABLE --swapped \ No newline at end of file +pgslice fill $TABLE --swapped --url $PGSLICE_URL \ No newline at end of file From 95034840c4c0767f9b2433f2eb7ab23431b057fb Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 15:35:33 -0800 Subject: [PATCH 04/34] No quotes --- openshift/pgslice/docker/fill_partition_data.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/fill_partition_data.sh b/openshift/pgslice/docker/fill_partition_data.sh index 886d0e3ca..16c49496b 100755 --- a/openshift/pgslice/docker/fill_partition_data.sh +++ b/openshift/pgslice/docker/fill_partition_data.sh @@ -49,7 +49,7 @@ then exit 1 fi -PGSLICE_URL="postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}" +PGSLICE_URL=postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} # Fill the partitions with data from the original table pgslice fill $TABLE --url $PGSLICE_URL # Analyze for query planner From ad54d09de9513a4482bdf583914e927267d10dd4 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 15:46:06 -0800 Subject: [PATCH 05/34] URL encode pgurl --- openshift/pgslice/docker/Dockerfile | 2 +- openshift/pgslice/docker/fill_partition_data.sh | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index d45936432..af841e9f2 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,6 @@ FROM ankane/pgslice:v0.6.1 -RUN apk update && apk add unzip bash +RUN apk update && apk add unzip bash jq # Download the Amazon CLI installer. ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip diff --git a/openshift/pgslice/docker/fill_partition_data.sh b/openshift/pgslice/docker/fill_partition_data.sh index 16c49496b..13d350739 100755 --- a/openshift/pgslice/docker/fill_partition_data.sh +++ b/openshift/pgslice/docker/fill_partition_data.sh @@ -49,7 +49,8 @@ then exit 1 fi -PGSLICE_URL=postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} +RAW_PG_URL=postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} +PGSLICE_URL=$(jq -rn --arg str $RAW_PG_URL '$str | @uri') # Fill the partitions with data from the original table pgslice fill $TABLE --url $PGSLICE_URL # Analyze for query planner From 4a8cf8fc1ef0889df7897602fe9d7edda8b2bc10 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 15:52:39 -0800 Subject: [PATCH 06/34] add psql for debugging --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index af841e9f2..bde583976 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,6 @@ FROM ankane/pgslice:v0.6.1 -RUN apk update && apk add unzip bash jq +RUN apk update && apk add unzip bash jq postgresql # Download the Amazon CLI installer. ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip From 198098b271d63f215a5c2d4436788f717f8043f2 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 16:34:28 -0800 Subject: [PATCH 07/34] Try with encoded pass using python --- openshift/pgslice/docker/Dockerfile | 2 +- openshift/pgslice/docker/fill_partition_data.sh | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index bde583976..92e8870fc 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,6 @@ FROM ankane/pgslice:v0.6.1 -RUN apk update && apk add unzip bash jq postgresql +RUN apk update && apk add unzip bash jq postgresql python3 # Download the Amazon CLI installer. ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip diff --git a/openshift/pgslice/docker/fill_partition_data.sh b/openshift/pgslice/docker/fill_partition_data.sh index 13d350739..27dcfcb69 100755 --- a/openshift/pgslice/docker/fill_partition_data.sh +++ b/openshift/pgslice/docker/fill_partition_data.sh @@ -49,8 +49,8 @@ then exit 1 fi -RAW_PG_URL=postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} -PGSLICE_URL=$(jq -rn --arg str $RAW_PG_URL '$str | @uri') +ENCODED_PASS=python3 -c "import urllib.parse; print(urllib.parse.quote('${PG_PASSWORD}'))" +PGSLICE_URL=postgresql://${PG_USER}:${ENCODED_PASS}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} # Fill the partitions with data from the original table pgslice fill $TABLE --url $PGSLICE_URL # Analyze for query planner From d88028dbdb1fe51095efeb38545ae0b27735e735 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 16:45:03 -0800 Subject: [PATCH 08/34] Capture command --- openshift/pgslice/docker/fill_partition_data.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/fill_partition_data.sh b/openshift/pgslice/docker/fill_partition_data.sh index 27dcfcb69..71fb1bd87 100755 --- a/openshift/pgslice/docker/fill_partition_data.sh +++ b/openshift/pgslice/docker/fill_partition_data.sh @@ -49,7 +49,7 @@ then exit 1 fi -ENCODED_PASS=python3 -c "import urllib.parse; print(urllib.parse.quote('${PG_PASSWORD}'))" +ENCODED_PASS=$(python3 -c "import urllib.parse; print(urllib.parse.quote('${PG_PASSWORD}'))") PGSLICE_URL=postgresql://${PG_USER}:${ENCODED_PASS}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} # Fill the partitions with data from the original table pgslice fill $TABLE --url $PGSLICE_URL From 5954fde705197f69ee7db977c266b9366dc2fa68 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 18:03:26 -0800 Subject: [PATCH 09/34] Use python to encode pgpassword, apply encoding fix to partition and archive, remove mat view lookup --- api/app/db/crud/weather_models.py | 404 ++++++++---------- openshift/pgslice/docker/Dockerfile | 2 +- .../pgslice/docker/partition_and_archive.sh | 13 +- 3 files changed, 188 insertions(+), 231 deletions(-) diff --git a/api/app/db/crud/weather_models.py b/api/app/db/crud/weather_models.py index 91db61877..eff1ace80 100644 --- a/api/app/db/crud/weather_models.py +++ b/api/app/db/crud/weather_models.py @@ -1,5 +1,5 @@ -""" CRUD operations for management of weather model data. -""" +"""CRUD operations for management of weather model data.""" + import logging import datetime from typing import List, Union @@ -13,7 +13,6 @@ PredictionModelRunTimestamp, ModelRunPrediction, WeatherStationModelPrediction, - MoreCast2MaterializedView, SavedModelRunForSFMSUrl, ModelRunForSFMS, ) @@ -22,142 +21,112 @@ logger = logging.getLogger(__name__) -def get_prediction_run(session: Session, prediction_model_id: int, - prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: - """ load the model run from the database (.e.g. for 2020 07 07 12h00). """ - logger.info('get prediction run for %s', prediction_run_timestamp) - return session.query(PredictionModelRunTimestamp).\ - filter(PredictionModelRunTimestamp.prediction_model_id == prediction_model_id).\ - filter(PredictionModelRunTimestamp.prediction_run_timestamp == - prediction_run_timestamp).first() +def get_prediction_run(session: Session, prediction_model_id: int, prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: + """load the model run from the database (.e.g. for 2020 07 07 12h00).""" + logger.info("get prediction run for %s", prediction_run_timestamp) + return ( + session.query(PredictionModelRunTimestamp) + .filter(PredictionModelRunTimestamp.prediction_model_id == prediction_model_id) + .filter(PredictionModelRunTimestamp.prediction_run_timestamp == prediction_run_timestamp) + .first() + ) -def create_prediction_run( - session: Session, - prediction_model_id: int, - prediction_run_timestamp: datetime.datetime, - complete: bool, - interpolated: bool) -> PredictionModelRunTimestamp: - """ Create a model prediction run for a particular model. - """ +def create_prediction_run(session: Session, prediction_model_id: int, prediction_run_timestamp: datetime.datetime, complete: bool, interpolated: bool) -> PredictionModelRunTimestamp: + """Create a model prediction run for a particular model.""" prediction_run = PredictionModelRunTimestamp( - prediction_model_id=prediction_model_id, - prediction_run_timestamp=prediction_run_timestamp, - complete=complete, - interpolated=interpolated) + prediction_model_id=prediction_model_id, prediction_run_timestamp=prediction_run_timestamp, complete=complete, interpolated=interpolated + ) session.add(prediction_run) session.commit() return prediction_run def update_prediction_run(session: Session, prediction_run: PredictionModelRunTimestamp): - """ Update a PredictionModelRunTimestamp record """ + """Update a PredictionModelRunTimestamp record""" session.add(prediction_run) session.commit() -def get_or_create_prediction_run(session, prediction_model: PredictionModel, - prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: - """ Get a model prediction run for a particular model, creating one if it doesn't already exist. - """ - prediction_run = get_prediction_run( - session, prediction_model.id, prediction_run_timestamp) +def get_or_create_prediction_run(session, prediction_model: PredictionModel, prediction_run_timestamp: datetime.datetime) -> PredictionModelRunTimestamp: + """Get a model prediction run for a particular model, creating one if it doesn't already exist.""" + prediction_run = get_prediction_run(session, prediction_model.id, prediction_run_timestamp) if not prediction_run: - logger.info('Creating prediction run %s for %s', - prediction_model.abbreviation, prediction_run_timestamp) - prediction_run = create_prediction_run( - session, prediction_model.id, prediction_run_timestamp, False, False) + logger.info("Creating prediction run %s for %s", prediction_model.abbreviation, prediction_run_timestamp) + prediction_run = create_prediction_run(session, prediction_model.id, prediction_run_timestamp, False, False) return prediction_run -def get_model_run_predictions_for_station(session: Session, station_code: int, - prediction_run: PredictionModelRunTimestamp) -> List: - """ Get all the predictions for a provided model run """ +def get_model_run_predictions_for_station(session: Session, station_code: int, prediction_run: PredictionModelRunTimestamp) -> List: + """Get all the predictions for a provided model run""" logger.info("Getting model predictions for grid %s", prediction_run) - return session.query(ModelRunPrediction)\ - .filter(ModelRunPrediction.prediction_model_run_timestamp_id == - prediction_run.id)\ - .filter(ModelRunPrediction.station_code == station_code)\ + return ( + session.query(ModelRunPrediction) + .filter(ModelRunPrediction.prediction_model_run_timestamp_id == prediction_run.id) + .filter(ModelRunPrediction.station_code == station_code) .order_by(ModelRunPrediction.prediction_timestamp) + ) def delete_weather_station_model_predictions(session: Session, older_than: datetime): - """ Delete any weather model prediction older than a certain date. - """ - logger.info('Deleting weather station model prediction data older than %s...', older_than) - session.query(WeatherStationModelPrediction)\ - .filter(WeatherStationModelPrediction.prediction_timestamp < older_than)\ - .delete() - + """Delete any weather model prediction older than a certain date.""" + logger.info("Deleting weather station model prediction data older than %s...", older_than) + session.query(WeatherStationModelPrediction).filter(WeatherStationModelPrediction.prediction_timestamp < older_than).delete() + def delete_model_run_predictions(session: Session, older_than: datetime): - """ Delete any model run prediction older than a certain date. - """ - logger.info('Deleting model_run_prediction data older than %s...', older_than) - session.query(ModelRunPrediction)\ - .filter(ModelRunPrediction.prediction_timestamp < older_than)\ - .delete() + """Delete any model run prediction older than a certain date.""" + logger.info("Deleting model_run_prediction data older than %s...", older_than) + session.query(ModelRunPrediction).filter(ModelRunPrediction.prediction_timestamp < older_than).delete() def get_station_model_predictions_order_by_prediction_timestamp( - session: Session, - station_codes: List, - model: ModelEnum, - start_date: datetime.datetime, - end_date: datetime.datetime) -> List[ - Union[WeatherStationModelPrediction, PredictionModel]]: - """ Fetch model predictions for given stations within given time range ordered by station code + session: Session, station_codes: List, model: ModelEnum, start_date: datetime.datetime, end_date: datetime.datetime +) -> List[Union[WeatherStationModelPrediction, PredictionModel]]: + """Fetch model predictions for given stations within given time range ordered by station code and prediction timestamp. This is useful if you're interested in seeing all the different predictions regardles of model run. """ - query = session.query(WeatherStationModelPrediction, PredictionModel).\ - join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == - WeatherStationModelPrediction.prediction_model_run_timestamp_id).\ - join(PredictionModel, PredictionModel.id == - PredictionModelRunTimestamp.prediction_model_id).\ - filter(WeatherStationModelPrediction.station_code.in_(station_codes)).\ - filter(WeatherStationModelPrediction.prediction_timestamp >= start_date).\ - filter(WeatherStationModelPrediction.prediction_timestamp <= end_date).\ - filter(PredictionModel.abbreviation == model).\ - order_by(WeatherStationModelPrediction.station_code).\ - order_by(WeatherStationModelPrediction.prediction_timestamp) + query = ( + session.query(WeatherStationModelPrediction, PredictionModel) + .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .join(PredictionModel, PredictionModel.id == PredictionModelRunTimestamp.prediction_model_id) + .filter(WeatherStationModelPrediction.station_code.in_(station_codes)) + .filter(WeatherStationModelPrediction.prediction_timestamp >= start_date) + .filter(WeatherStationModelPrediction.prediction_timestamp <= end_date) + .filter(PredictionModel.abbreviation == model) + .order_by(WeatherStationModelPrediction.station_code) + .order_by(WeatherStationModelPrediction.prediction_timestamp) + ) return query def get_station_model_predictions( - session: Session, - station_codes: List, - model: str, - start_date: datetime.datetime, - end_date: datetime.datetime) -> List[ - Union[WeatherStationModelPrediction, PredictionModelRunTimestamp, PredictionModel]]: - """ Fetches the model predictions that were most recently issued before the prediction_timestamp. + session: Session, station_codes: List, model: str, start_date: datetime.datetime, end_date: datetime.datetime +) -> List[Union[WeatherStationModelPrediction, PredictionModelRunTimestamp, PredictionModel]]: + """Fetches the model predictions that were most recently issued before the prediction_timestamp. Used to compare the most recent model predictions against forecasts and actuals for the same weather date and weather station. Only fetches WeatherStationModelPredictions for prediction_timestamps in the date range of start_date - end_date (inclusive). """ - query = session.query(WeatherStationModelPrediction, PredictionModelRunTimestamp, PredictionModel).\ - filter(WeatherStationModelPrediction.station_code.in_(station_codes)).\ - filter(WeatherStationModelPrediction.prediction_timestamp >= start_date).\ - filter(WeatherStationModelPrediction.prediction_timestamp <= end_date).\ - filter(PredictionModelRunTimestamp.id == - WeatherStationModelPrediction.prediction_model_run_timestamp_id).\ - filter(PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id, - PredictionModel.abbreviation == model).\ - order_by(WeatherStationModelPrediction.station_code).\ - order_by(WeatherStationModelPrediction.prediction_timestamp).\ - order_by(PredictionModelRunTimestamp.prediction_run_timestamp.asc()) + query = ( + session.query(WeatherStationModelPrediction, PredictionModelRunTimestamp, PredictionModel) + .filter(WeatherStationModelPrediction.station_code.in_(station_codes)) + .filter(WeatherStationModelPrediction.prediction_timestamp >= start_date) + .filter(WeatherStationModelPrediction.prediction_timestamp <= end_date) + .filter(PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .filter(PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id, PredictionModel.abbreviation == model) + .order_by(WeatherStationModelPrediction.station_code) + .order_by(WeatherStationModelPrediction.prediction_timestamp) + .order_by(PredictionModelRunTimestamp.prediction_run_timestamp.asc()) + ) return query -def get_latest_station_model_prediction_per_day(session: Session, - station_codes: List[int], - model: str, - day_start: datetime.datetime, - day_end: datetime.datetime): +def get_latest_station_model_prediction_per_day(session: Session, station_codes: List[int], model: str, day_start: datetime.datetime, day_end: datetime.datetime): """ All weather station model predictions for: - a given day @@ -173,75 +142,75 @@ def get_latest_station_model_prediction_per_day(session: Session, """ subquery = ( session.query( - func.max(WeatherStationModelPrediction.prediction_timestamp).label('latest_prediction'), + func.max(WeatherStationModelPrediction.prediction_timestamp).label("latest_prediction"), WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label('unique_day') + func.date(WeatherStationModelPrediction.prediction_timestamp).label("unique_day"), ) .filter( WeatherStationModelPrediction.station_code.in_(station_codes), WeatherStationModelPrediction.prediction_timestamp >= day_start, WeatherStationModelPrediction.prediction_timestamp <= day_end, - func.date_part('hour', WeatherStationModelPrediction.prediction_timestamp) == 20 - ) - .group_by( - WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label('unique_day') + func.date_part("hour", WeatherStationModelPrediction.prediction_timestamp) == 20, ) - .subquery('latest') + .group_by(WeatherStationModelPrediction.station_code, func.date(WeatherStationModelPrediction.prediction_timestamp).label("unique_day")) + .subquery("latest") ) - result = session.query( - WeatherStationModelPrediction.id, - WeatherStationModelPrediction.prediction_timestamp, - PredictionModel.abbreviation, - WeatherStationModelPrediction.station_code, - WeatherStationModelPrediction.rh_tgl_2, - WeatherStationModelPrediction.tmp_tgl_2, - WeatherStationModelPrediction.bias_adjusted_temperature, - WeatherStationModelPrediction.bias_adjusted_rh, - WeatherStationModelPrediction.apcp_sfc_0, - WeatherStationModelPrediction.wdir_tgl_10, - WeatherStationModelPrediction.wind_tgl_10, - WeatherStationModelPrediction.update_date)\ - .join(PredictionModelRunTimestamp, WeatherStationModelPrediction.prediction_model_run_timestamp_id == PredictionModelRunTimestamp.id)\ - .join(PredictionModel, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id)\ - .join(subquery, and_( - WeatherStationModelPrediction.prediction_timestamp == subquery.c.latest_prediction, - WeatherStationModelPrediction.station_code == subquery.c.station_code))\ - .filter(PredictionModel.abbreviation == model)\ + result = ( + session.query( + WeatherStationModelPrediction.id, + WeatherStationModelPrediction.prediction_timestamp, + PredictionModel.abbreviation, + WeatherStationModelPrediction.station_code, + WeatherStationModelPrediction.rh_tgl_2, + WeatherStationModelPrediction.tmp_tgl_2, + WeatherStationModelPrediction.bias_adjusted_temperature, + WeatherStationModelPrediction.bias_adjusted_rh, + WeatherStationModelPrediction.apcp_sfc_0, + WeatherStationModelPrediction.wdir_tgl_10, + WeatherStationModelPrediction.wind_tgl_10, + WeatherStationModelPrediction.update_date, + ) + .join(PredictionModelRunTimestamp, WeatherStationModelPrediction.prediction_model_run_timestamp_id == PredictionModelRunTimestamp.id) + .join(PredictionModel, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id) + .join(subquery, and_(WeatherStationModelPrediction.prediction_timestamp == subquery.c.latest_prediction, WeatherStationModelPrediction.station_code == subquery.c.station_code)) + .filter(PredictionModel.abbreviation == model) .order_by(WeatherStationModelPrediction.update_date.desc()) + ) return result -def get_latest_station_prediction_mat_view(session: Session, - station_codes: List[int], - day_start: datetime.datetime, - day_end: datetime.datetime): +def get_latest_station_prediction_mat_view(session: Session, station_codes: List[int], day_start: datetime.datetime, day_end: datetime.datetime): logger.info("Getting data from materialized view.") - result = session.query(MoreCast2MaterializedView.prediction_timestamp, - MoreCast2MaterializedView.abbreviation, - MoreCast2MaterializedView.station_code, - MoreCast2MaterializedView.rh_tgl_2, - MoreCast2MaterializedView.tmp_tgl_2, - MoreCast2MaterializedView.bias_adjusted_temperature, - MoreCast2MaterializedView.bias_adjusted_rh, - MoreCast2MaterializedView.bias_adjusted_wind_speed, - MoreCast2MaterializedView.bias_adjusted_wdir, - MoreCast2MaterializedView.precip_24h, - MoreCast2MaterializedView.bias_adjusted_precip_24h, - MoreCast2MaterializedView.wdir_tgl_10, - MoreCast2MaterializedView.wind_tgl_10, - MoreCast2MaterializedView.update_date).\ - filter(MoreCast2MaterializedView.station_code.in_(station_codes), - MoreCast2MaterializedView.prediction_timestamp >= day_start, - MoreCast2MaterializedView.prediction_timestamp <= day_end) + result = ( + session.query( + WeatherStationModelPrediction.prediction_timestamp, + PredictionModel.abbreviation, + WeatherStationModelPrediction.station_code, + WeatherStationModelPrediction.rh_tgl_2, + WeatherStationModelPrediction.tmp_tgl_2, + WeatherStationModelPrediction.bias_adjusted_temperature, + WeatherStationModelPrediction.bias_adjusted_rh, + WeatherStationModelPrediction.bias_adjusted_wind_speed, + WeatherStationModelPrediction.bias_adjusted_wdir, + WeatherStationModelPrediction.precip_24h, + WeatherStationModelPrediction.bias_adjusted_precip_24h, + WeatherStationModelPrediction.wdir_tgl_10, + WeatherStationModelPrediction.wind_tgl_10, + WeatherStationModelPrediction.update_date, + ) + .join(PredictionModel, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .join(PredictionModel, PredictionModel.id == PredictionModelRunTimestamp.prediction_model_id) + .filter( + WeatherStationModelPrediction.station_code.in_(station_codes), + WeatherStationModelPrediction.prediction_timestamp >= day_start, + WeatherStationModelPrediction.prediction_timestamp <= day_end, + ) + ) return result -def get_latest_station_prediction_per_day(session: Session, - station_codes: List[int], - day_start: datetime.datetime, - day_end: datetime.datetime): +def get_latest_station_prediction_per_day(session: Session, station_codes: List[int], day_start: datetime.datetime, day_end: datetime.datetime): """ All weather station model predictions for: - a given day @@ -256,84 +225,79 @@ def get_latest_station_prediction_per_day(session: Session, """ subquery = ( session.query( - func.max(WeatherStationModelPrediction.prediction_timestamp).label('latest_prediction'), + func.max(WeatherStationModelPrediction.prediction_timestamp).label("latest_prediction"), WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label('unique_day') + func.date(WeatherStationModelPrediction.prediction_timestamp).label("unique_day"), ) .filter( WeatherStationModelPrediction.station_code.in_(station_codes), WeatherStationModelPrediction.prediction_timestamp >= day_start, WeatherStationModelPrediction.prediction_timestamp <= day_end, - func.date_part('hour', WeatherStationModelPrediction.prediction_timestamp) == 20 + func.date_part("hour", WeatherStationModelPrediction.prediction_timestamp) == 20, ) - .group_by( - WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label('unique_day') - ) - .subquery('latest') + .group_by(WeatherStationModelPrediction.station_code, func.date(WeatherStationModelPrediction.prediction_timestamp).label("unique_day")) + .subquery("latest") ) - result = session.query( - WeatherStationModelPrediction.prediction_timestamp, - PredictionModel.abbreviation, - WeatherStationModelPrediction.station_code, - WeatherStationModelPrediction.rh_tgl_2, - WeatherStationModelPrediction.tmp_tgl_2, - WeatherStationModelPrediction.bias_adjusted_temperature, - WeatherStationModelPrediction.bias_adjusted_rh, - WeatherStationModelPrediction.apcp_sfc_0, - WeatherStationModelPrediction.wdir_tgl_10, - WeatherStationModelPrediction.wind_tgl_10, - WeatherStationModelPrediction.update_date)\ - .join(PredictionModelRunTimestamp, WeatherStationModelPrediction.prediction_model_run_timestamp_id == PredictionModelRunTimestamp.id)\ - .join(PredictionModel, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id)\ - .join(subquery, and_( - WeatherStationModelPrediction.prediction_timestamp == subquery.c.latest_prediction, - WeatherStationModelPrediction.station_code == subquery.c.station_code))\ + result = ( + session.query( + WeatherStationModelPrediction.prediction_timestamp, + PredictionModel.abbreviation, + WeatherStationModelPrediction.station_code, + WeatherStationModelPrediction.rh_tgl_2, + WeatherStationModelPrediction.tmp_tgl_2, + WeatherStationModelPrediction.bias_adjusted_temperature, + WeatherStationModelPrediction.bias_adjusted_rh, + WeatherStationModelPrediction.apcp_sfc_0, + WeatherStationModelPrediction.wdir_tgl_10, + WeatherStationModelPrediction.wind_tgl_10, + WeatherStationModelPrediction.update_date, + ) + .join(PredictionModelRunTimestamp, WeatherStationModelPrediction.prediction_model_run_timestamp_id == PredictionModelRunTimestamp.id) + .join(PredictionModel, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id) + .join(subquery, and_(WeatherStationModelPrediction.prediction_timestamp == subquery.c.latest_prediction, WeatherStationModelPrediction.station_code == subquery.c.station_code)) .order_by(WeatherStationModelPrediction.update_date.desc()) + ) return result def get_station_model_prediction_from_previous_model_run( - session: Session, - station_code: int, - model: ModelEnum, - prediction_timestamp: datetime.datetime, - prediction_model_run_timestamp: datetime.datetime) -> List[WeatherStationModelPrediction]: - """ Fetches the one model prediction for the specified station_code, model, and prediction_timestamp + session: Session, station_code: int, model: ModelEnum, prediction_timestamp: datetime.datetime, prediction_model_run_timestamp: datetime.datetime +) -> List[WeatherStationModelPrediction]: + """Fetches the one model prediction for the specified station_code, model, and prediction_timestamp from the prediction model run immediately previous to the given prediction_model_run_timestamp. """ # create a lower_bound for time range so that we're not querying timestamps all the way back to the # beginning of time lower_bound = prediction_model_run_timestamp - datetime.timedelta(days=1) - response = session.query(WeatherStationModelPrediction).\ - join(PredictionModelRunTimestamp, - PredictionModelRunTimestamp.id == - WeatherStationModelPrediction.prediction_model_run_timestamp_id).\ - join(PredictionModel, PredictionModel.id == - PredictionModelRunTimestamp.prediction_model_id).\ - filter(WeatherStationModelPrediction.station_code == station_code).\ - filter(WeatherStationModelPrediction.prediction_timestamp == prediction_timestamp).\ - filter(PredictionModel.abbreviation == model).\ - filter(PredictionModelRunTimestamp.prediction_run_timestamp < prediction_model_run_timestamp).\ - filter(PredictionModelRunTimestamp.prediction_run_timestamp > lower_bound).\ - order_by(PredictionModelRunTimestamp.prediction_run_timestamp.desc()).\ - limit(1).first() + response = ( + session.query(WeatherStationModelPrediction) + .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .join(PredictionModel, PredictionModel.id == PredictionModelRunTimestamp.prediction_model_id) + .filter(WeatherStationModelPrediction.station_code == station_code) + .filter(WeatherStationModelPrediction.prediction_timestamp == prediction_timestamp) + .filter(PredictionModel.abbreviation == model) + .filter(PredictionModelRunTimestamp.prediction_run_timestamp < prediction_model_run_timestamp) + .filter(PredictionModelRunTimestamp.prediction_run_timestamp > lower_bound) + .order_by(PredictionModelRunTimestamp.prediction_run_timestamp.desc()) + .limit(1) + .first() + ) return response def get_processed_file_count(session: Session, urls: List[str]) -> int: - """ Return the number of matching urls """ + """Return the number of matching urls""" return session.query(ProcessedModelRunUrl).filter(ProcessedModelRunUrl.url.in_(urls)).count() def get_processed_file_record(session: Session, url: str) -> ProcessedModelRunUrl: - """ Get record corresponding to a processed file. """ - processed_file = session.query(ProcessedModelRunUrl).\ - filter(ProcessedModelRunUrl.url == url).first() + """Get record corresponding to a processed file.""" + processed_file = session.query(ProcessedModelRunUrl).filter(ProcessedModelRunUrl.url == url).first() return processed_file + def get_saved_model_run_for_sfms(session: Session, url: str) -> SavedModelRunForSFMSUrl: """Get record corresponding to a processed model run url for sfms""" return session.query(SavedModelRunForSFMSUrl).filter(SavedModelRunForSFMSUrl.url == url).first() @@ -346,6 +310,7 @@ def create_saved_model_run_for_sfms_url(session: Session, url: str, key: str): session.add(saved_model_run_for_sfms_url) session.commit() + def get_rdps_sfms_urls_for_deletion(session: Session, threshold: datetime): """Gets all records older than the provided threshold.""" return session.query(SavedModelRunForSFMSUrl).filter(SavedModelRunForSFMSUrl.create_date < threshold).all() @@ -362,9 +327,7 @@ def create_model_run_for_sfms(session: Session, model: ModelEnum, model_run_date date and model has been stored in S3. """ prediction_model = get_prediction_model_by_model_enum(session, model) - model_run_timestamp = datetime.datetime( - year=model_run_date.year, month=model_run_date.month, day=model_run_date.day, hour=model_run_hour, tzinfo=datetime.timezone.utc - ) + model_run_timestamp = datetime.datetime(year=model_run_date.year, month=model_run_date.month, day=model_run_date.day, hour=model_run_hour, tzinfo=datetime.timezone.utc) model_run_for_sfms = ModelRunForSFMS( prediction_model_id=prediction_model.id, model_run_timestamp=model_run_timestamp, @@ -380,42 +343,35 @@ def get_prediction_model_by_model_enum(session: Session, model_enum: ModelEnum): return session.query(PredictionModel).filter(PredictionModel.abbreviation == model_enum.value).first() -def get_prediction_model(session: Session, - model_enum: ModelEnum, - projection: ProjectionEnum) -> PredictionModel: - """ Get the prediction model corresponding to a particular abbreviation and projection. """ - return session.query(PredictionModel).\ - filter(PredictionModel.abbreviation == model_enum.value).\ - filter(PredictionModel.projection == projection.value).first() +def get_prediction_model(session: Session, model_enum: ModelEnum, projection: ProjectionEnum) -> PredictionModel: + """Get the prediction model corresponding to a particular abbreviation and projection.""" + return session.query(PredictionModel).filter(PredictionModel.abbreviation == model_enum.value).filter(PredictionModel.projection == projection.value).first() -def get_prediction_model_run_timestamp_records( - session: Session, model_type: ModelEnum, complete: bool = True, interpolated: bool = True): - """ Get prediction model run timestamps (filter on complete and interpolated if provided.) """ - query = session.query(PredictionModelRunTimestamp, PredictionModel) \ - .join(PredictionModelRunTimestamp, - PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id)\ +def get_prediction_model_run_timestamp_records(session: Session, model_type: ModelEnum, complete: bool = True, interpolated: bool = True): + """Get prediction model run timestamps (filter on complete and interpolated if provided.)""" + query = ( + session.query(PredictionModelRunTimestamp, PredictionModel) + .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id) .filter(PredictionModel.abbreviation == model_type.value) + ) if interpolated is not None: - query = query.filter( - PredictionModelRunTimestamp.interpolated == interpolated) + query = query.filter(PredictionModelRunTimestamp.interpolated == interpolated) if complete is not None: query = query.filter(PredictionModelRunTimestamp.complete == complete) query = query.order_by(PredictionModelRunTimestamp.prediction_run_timestamp) return query -def get_weather_station_model_prediction(session: Session, - station_code: int, - prediction_model_run_timestamp_id: int, - prediction_timestamp: datetime) -> WeatherStationModelPrediction: - """ Get the model prediction for a weather station given a model run and a timestamp. """ - return session.query(WeatherStationModelPrediction).\ - filter(WeatherStationModelPrediction.station_code == station_code).\ - filter(WeatherStationModelPrediction.prediction_model_run_timestamp_id == - prediction_model_run_timestamp_id).\ - filter(WeatherStationModelPrediction.prediction_timestamp == - prediction_timestamp).first() +def get_weather_station_model_prediction(session: Session, station_code: int, prediction_model_run_timestamp_id: int, prediction_timestamp: datetime) -> WeatherStationModelPrediction: + """Get the model prediction for a weather station given a model run and a timestamp.""" + return ( + session.query(WeatherStationModelPrediction) + .filter(WeatherStationModelPrediction.station_code == station_code) + .filter(WeatherStationModelPrediction.prediction_model_run_timestamp_id == prediction_model_run_timestamp_id) + .filter(WeatherStationModelPrediction.prediction_timestamp == prediction_timestamp) + .first() + ) def refresh_morecast2_materialized_view(session: Session): diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 92e8870fc..bacd2177f 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,6 @@ FROM ankane/pgslice:v0.6.1 -RUN apk update && apk add unzip bash jq postgresql python3 +RUN apk update && apk add unzip bash postgresql python3 # Download the Amazon CLI installer. ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip diff --git a/openshift/pgslice/docker/partition_and_archive.sh b/openshift/pgslice/docker/partition_and_archive.sh index bd1821980..3111d4d59 100755 --- a/openshift/pgslice/docker/partition_and_archive.sh +++ b/openshift/pgslice/docker/partition_and_archive.sh @@ -49,17 +49,18 @@ then exit 1 fi -export PGSLICE_URL = "postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE}" +ENCODED_PASS=$(python3 -c "import urllib.parse; print(urllib.parse.quote('${PG_PASSWORD}'))") +PGSLICE_URL=postgresql://${PG_USER}:${ENCODED_PASS}@${PG_HOSTNAME}:${PG_PORT}/${PG_DATABASE} # Add partitions to the intermediate table (assumes it already exists) -pgslice add_partitions $TABLE --intermediate --future 1 +pgslice add_partitions $TABLE --intermediate --future 1 --url $PGSLICE_URL # Fill the partitions with data from the original table -pgslice fill $TABLE +pgslice fill $TABLE --url $PGSLICE_URL # Analyze for query planner -pgslice analyze $TABLE +pgslice analyze $TABLE --url $PGSLICE_URL # Swap the intermediate table with the original table -pgslice swap $TABLE +pgslice swap $TABLE --url $PGSLICE_URL # Fill the rest (rows inserted between the first fill and the swap) -pgslice fill $TABLE --swapped +pgslice fill $TABLE --swapped --url $PGSLICE_URL # Dump any retired tables to S3 and drop pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY}" AWS_SECRET_ACCESS_KEY="${AWS_SECRET_KEY}" aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${TABLE}_retired.dump.gz" psql -c "DROP TABLE ${TABLE}_retired" $PGSLICE_URL \ No newline at end of file From 648171c6df11c260b243c370687f751414d4c3cb Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 18:04:18 -0800 Subject: [PATCH 10/34] point back to main --- openshift/pgslice/openshift/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/openshift/build.yaml b/openshift/pgslice/openshift/build.yaml index 7acc744c3..9cd83ee49 100644 --- a/openshift/pgslice/openshift/build.yaml +++ b/openshift/pgslice/openshift/build.yaml @@ -18,7 +18,7 @@ parameters: - name: GIT_URL value: https://github.com/bcgov/wps.git - name: GIT_BRANCH - value: task/more-fixes + value: main objects: - apiVersion: v1 kind: ImageStream From bf0273fae53c862104cba361494bf03e0646fd85 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 18:29:39 -0800 Subject: [PATCH 11/34] Use awscli from apk --- openshift/pgslice/docker/Dockerfile | 11 +---------- openshift/pgslice/openshift/build.yaml | 2 +- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index bacd2177f..e6826847b 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,15 +1,6 @@ FROM ankane/pgslice:v0.6.1 -RUN apk update && apk add unzip bash postgresql python3 - -# Download the Amazon CLI installer. -ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip - -# Switch to root user for package installs. -USER root -RUN unzip /tmp/awscliv2.zip -d /tmp/ &&\ - /tmp/aws/install - +RUN apk update && apk add unzip bash postgresql python3 aws-cli COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file diff --git a/openshift/pgslice/openshift/build.yaml b/openshift/pgslice/openshift/build.yaml index 9cd83ee49..7acc744c3 100644 --- a/openshift/pgslice/openshift/build.yaml +++ b/openshift/pgslice/openshift/build.yaml @@ -18,7 +18,7 @@ parameters: - name: GIT_URL value: https://github.com/bcgov/wps.git - name: GIT_BRANCH - value: main + value: task/more-fixes objects: - apiVersion: v1 kind: ImageStream From 259ce946d69151211e27b83adf6520ca9cd2236f Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 18:40:05 -0800 Subject: [PATCH 12/34] specify postgres 16 --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index e6826847b..929dddccd 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,6 @@ FROM ankane/pgslice:v0.6.1 -RUN apk update && apk add unzip bash postgresql python3 aws-cli +RUN apk update && apk add unzip bash postgresql16 python3 aws-cli COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file From 18adbefa90c89206e7389be7e452f5854354e113 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 18:44:55 -0800 Subject: [PATCH 13/34] Try more deps --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 929dddccd..d95ecdd42 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,6 @@ FROM ankane/pgslice:v0.6.1 -RUN apk update && apk add unzip bash postgresql16 python3 aws-cli +RUN apk update && apk add unzip bash postgresql16 postgresql16-contrib postgresql16-openrc python3 aws-cli COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file From a8df936283f73192591e2bb812404aed75feeaf7 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 18:49:09 -0800 Subject: [PATCH 14/34] Add newwer repo --- openshift/pgslice/docker/Dockerfile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index d95ecdd42..647bfd8ab 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,8 @@ FROM ankane/pgslice:v0.6.1 -RUN apk update && apk add unzip bash postgresql16 postgresql16-contrib postgresql16-openrc python3 aws-cli +RUN echo "https://dl-cdn.alpinelinux.org/alpine/v3.21/main" >> /etc/apk/repositories + +RUN apk update && apk add unzip bash postgresql16 python3 aws-cli COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file From 404260078e54a1ec02bb363f06ce385f8877341d Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 18:55:08 -0800 Subject: [PATCH 15/34] try with client --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 647bfd8ab..a5231a924 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -2,7 +2,7 @@ FROM ankane/pgslice:v0.6.1 RUN echo "https://dl-cdn.alpinelinux.org/alpine/v3.21/main" >> /etc/apk/repositories -RUN apk update && apk add unzip bash postgresql16 python3 aws-cli +RUN apk update && apk add unzip bash postgresql16-client python3 aws-cli COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file From 2eafcda7b60fa5a5d1abb458344d9e8385f25856 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 18:58:39 -0800 Subject: [PATCH 16/34] Remove pg15 client --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index a5231a924..5971df05b 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -2,7 +2,7 @@ FROM ankane/pgslice:v0.6.1 RUN echo "https://dl-cdn.alpinelinux.org/alpine/v3.21/main" >> /etc/apk/repositories -RUN apk update && apk add unzip bash postgresql16-client python3 aws-cli +RUN apk update && apk del postgresql15-client && apk add unzip bash postgresql16-client python3 aws-cli COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file From ee716a8403ecbe7c5dffa01bb40c05febaf38f12 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:01:43 -0800 Subject: [PATCH 17/34] Delete separately --- openshift/pgslice/docker/Dockerfile | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 5971df05b..dfcf6ecde 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,8 +1,9 @@ FROM ankane/pgslice:v0.6.1 RUN echo "https://dl-cdn.alpinelinux.org/alpine/v3.21/main" >> /etc/apk/repositories - -RUN apk update && apk del postgresql15-client && apk add unzip bash postgresql16-client python3 aws-cli +RUN apk update +RUN apk del postgresql-common postgresql15-client postgresql15 +RUN apk add unzip bash postgresql16-client python3 aws-cli COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file From 479ee2c7f56b15d97a5eef726c40d5ff71a06757 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:16:25 -0800 Subject: [PATCH 18/34] Try ubuntu --- openshift/pgslice/docker/Dockerfile | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index dfcf6ecde..e872cc83f 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,9 +1,8 @@ -FROM ankane/pgslice:v0.6.1 +FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 -RUN echo "https://dl-cdn.alpinelinux.org/alpine/v3.21/main" >> /etc/apk/repositories -RUN apk update -RUN apk del postgresql-common postgresql15-client postgresql15 -RUN apk add unzip bash postgresql16-client python3 aws-cli +RUN apt-get update && \ + apt-get install -y build-base libpq-dev postgresql16-client python3 aws-cli && \ + gem install pgslice COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file From 11a4213af3414724a2e0b19e3930211d5d6e5d03 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:20:37 -0800 Subject: [PATCH 19/34] slim down deps --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index e872cc83f..df84b5145 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,7 +1,7 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ - apt-get install -y build-base libpq-dev postgresql16-client python3 aws-cli && \ + apt-get install -y build-base libpq-dev awscli && \ gem install pgslice COPY fill_partition_data.sh . From 18aae913bdee416fb199662123386594ca44088a Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:23:27 -0800 Subject: [PATCH 20/34] awscli from another source --- openshift/pgslice/docker/Dockerfile | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index df84b5145..8a2da5cb3 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,8 +1,15 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ - apt-get install -y build-base libpq-dev awscli && \ gem install pgslice +# Download the Amazon CLI installer. +ADD "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" /tmp/awscliv2.zip + +# Switch to root user for package installs. +USER root +RUN unzip /tmp/awscliv2.zip -d /tmp/ &&\ + /tmp/aws/install + COPY fill_partition_data.sh . COPY partition_and_archive.sh . \ No newline at end of file From 6a801803a43749a65faa3613784efd8b10e54a9c Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:24:57 -0800 Subject: [PATCH 21/34] Install ruby --- openshift/pgslice/docker/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 8a2da5cb3..c10d9f82d 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,6 +1,7 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ + apt-get install -y ruby-full && \ gem install pgslice # Download the Amazon CLI installer. From 038eb9474a2231d3e3f4fcf4db2a66a4aed43ecd Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:26:18 -0800 Subject: [PATCH 22/34] Add libpq --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index c10d9f82d..e83dc97a7 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,7 +1,7 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ - apt-get install -y ruby-full && \ + apt-get install -y libpq ruby-full && \ gem install pgslice # Download the Amazon CLI installer. From 34081f6b0cb3303d005ee8b84441b6384719311f Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:27:33 -0800 Subject: [PATCH 23/34] libpq-dev --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index e83dc97a7..32fbc82f0 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,7 +1,7 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ - apt-get install -y libpq ruby-full && \ + apt-get install -y libpq-dev ruby-full && \ gem install pgslice # Download the Amazon CLI installer. From 08d32c705625aa7030e9808eb71b9bdf7b7ff267 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:29:32 -0800 Subject: [PATCH 24/34] build-essential --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 32fbc82f0..5031772fb 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,7 +1,7 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ - apt-get install -y libpq-dev ruby-full && \ + apt-get install -y build-essential libpq-dev ruby-full && \ gem install pgslice # Download the Amazon CLI installer. From 0661d7cb7588ce85a614392b6bfb92916ac27dc7 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:33:48 -0800 Subject: [PATCH 25/34] postgresql-client --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 5031772fb..2213323d9 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,7 +1,7 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ - apt-get install -y build-essential libpq-dev ruby-full && \ + apt-get install -y build-essential libpq-dev postgresql-client ruby-full && \ gem install pgslice # Download the Amazon CLI installer. From 60fd5591d731dba3c1d20a7ae0c7b0a89bddbd51 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:40:34 -0800 Subject: [PATCH 26/34] postgresql16-client --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 2213323d9..29c433480 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,7 +1,7 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ - apt-get install -y build-essential libpq-dev postgresql-client ruby-full && \ + apt-get install -y build-essential libpq-dev postgresql16-client ruby-full && \ gem install pgslice # Download the Amazon CLI installer. From 65b6faaae44ed594f1a3cc0ee0b7ac4035898e20 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Tue, 19 Nov 2024 19:45:25 -0800 Subject: [PATCH 27/34] postgresql-client-16 --- openshift/pgslice/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/Dockerfile b/openshift/pgslice/docker/Dockerfile index 29c433480..3638c085e 100644 --- a/openshift/pgslice/docker/Dockerfile +++ b/openshift/pgslice/docker/Dockerfile @@ -1,7 +1,7 @@ FROM artifacts.developer.gov.bc.ca/docker-remote/ubuntu:24.04 RUN apt-get update && \ - apt-get install -y build-essential libpq-dev postgresql16-client ruby-full && \ + apt-get install -y build-essential libpq-dev postgresql-client-16 ruby-full && \ gem install pgslice # Download the Amazon CLI installer. From e6c91a36f6f659bb70bd6fddff90cd9b8514853a Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Wed, 20 Nov 2024 15:52:25 -0800 Subject: [PATCH 28/34] Remove quotes --- openshift/pgslice/docker/partition_and_archive.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/partition_and_archive.sh b/openshift/pgslice/docker/partition_and_archive.sh index 3111d4d59..ae8490902 100755 --- a/openshift/pgslice/docker/partition_and_archive.sh +++ b/openshift/pgslice/docker/partition_and_archive.sh @@ -62,5 +62,5 @@ pgslice swap $TABLE --url $PGSLICE_URL # Fill the rest (rows inserted between the first fill and the swap) pgslice fill $TABLE --swapped --url $PGSLICE_URL # Dump any retired tables to S3 and drop -pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY}" AWS_SECRET_ACCESS_KEY="${AWS_SECRET_KEY}" aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${TABLE}_retired.dump.gz" +pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY AWS_SECRET_ACCESS_KEY=$AWS_SECRET_KEY aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${TABLE}_retired.dump.gz" psql -c "DROP TABLE ${TABLE}_retired" $PGSLICE_URL \ No newline at end of file From 8f5fa4fec46e1c1120e60861831a0f58588b4993 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 21 Nov 2024 08:30:57 -0800 Subject: [PATCH 29/34] Update naming scheme for retired backups --- openshift/pgslice/docker/partition_and_archive.sh | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/openshift/pgslice/docker/partition_and_archive.sh b/openshift/pgslice/docker/partition_and_archive.sh index ae8490902..b34e3bed7 100755 --- a/openshift/pgslice/docker/partition_and_archive.sh +++ b/openshift/pgslice/docker/partition_and_archive.sh @@ -61,6 +61,14 @@ pgslice analyze $TABLE --url $PGSLICE_URL pgslice swap $TABLE --url $PGSLICE_URL # Fill the rest (rows inserted between the first fill and the swap) pgslice fill $TABLE --swapped --url $PGSLICE_URL + + # Dump any retired tables to S3 and drop -pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY AWS_SECRET_ACCESS_KEY=$AWS_SECRET_KEY aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${TABLE}_retired.dump.gz" +# borrowing a lot from https://github.com/BCDevOps/backup-container +_timestamp=`date +\%Y-\%m-\%d_%H-%M-%S` +_datestamp=`date +\%Y/\%m` +_target_filename="${PG_HOSTNAME}_${TABLE}_retired_${_timestamp}.sql.gz" +_target_folder="${PG_HOSTNAME}_${PG_DATABASE}/${_datestamp}" + +pg_dump -c -Fc -t ${TABLE}_retired $PGSLICE_URL | gzip | AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY AWS_SECRET_ACCESS_KEY=$AWS_SECRET_KEY aws --endpoint="https://${AWS_HOSTNAME}" s3 cp - "s3://${AWS_BUCKET}/retired/${_target_folder}/${_target_filename}" psql -c "DROP TABLE ${TABLE}_retired" $PGSLICE_URL \ No newline at end of file From 1c0e459dcaba621c2c78f43aaf6738888dc9cd66 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 21 Nov 2024 10:19:30 -0800 Subject: [PATCH 30/34] Fix query, dedup subquery --- api/app/db/crud/weather_models.py | 54 ++--- api/app/jobs/common_model_fetchers.py | 256 +++++++------------- api/app/weather_models/fetch/predictions.py | 146 +++++------ 3 files changed, 182 insertions(+), 274 deletions(-) diff --git a/api/app/db/crud/weather_models.py b/api/app/db/crud/weather_models.py index eff1ace80..ceb42bb45 100644 --- a/api/app/db/crud/weather_models.py +++ b/api/app/db/crud/weather_models.py @@ -126,20 +126,7 @@ def get_station_model_predictions( return query -def get_latest_station_model_prediction_per_day(session: Session, station_codes: List[int], model: str, day_start: datetime.datetime, day_end: datetime.datetime): - """ - All weather station model predictions for: - - a given day - - a given model - - each station in the given list - ordered by update_timestamp - - This is done by joining the predictions on their runs, - that are filtered by the day and the 20:00UTC predictions. - - In turn prediction runs are filtered via a join - on runs that are for the selected model. - """ +def get_latest_model_predictions_subquery(session: Session, station_codes: List[int], day_start: datetime.datetime, day_end: datetime.datetime): subquery = ( session.query( func.max(WeatherStationModelPrediction.prediction_timestamp).label("latest_prediction"), @@ -156,6 +143,25 @@ def get_latest_station_model_prediction_per_day(session: Session, station_codes: .subquery("latest") ) + return subquery + + +def get_latest_station_model_prediction_per_day(session: Session, station_codes: List[int], model: str, day_start: datetime.datetime, day_end: datetime.datetime): + """ + All weather station model predictions for: + - a given day + - a given model + - each station in the given list + ordered by update_timestamp + + This is done by joining the predictions on their runs, + that are filtered by the day and the 20:00UTC predictions. + + In turn prediction runs are filtered via a join + on runs that are for the selected model. + """ + subquery = get_latest_model_predictions_subquery(session, station_codes, day_start, day_end) + result = ( session.query( WeatherStationModelPrediction.id, @@ -180,7 +186,7 @@ def get_latest_station_model_prediction_per_day(session: Session, station_codes: return result -def get_latest_station_prediction_mat_view(session: Session, station_codes: List[int], day_start: datetime.datetime, day_end: datetime.datetime): +def get_latest_station_prediction(session: Session, station_codes: List[int], day_start: datetime.datetime, day_end: datetime.datetime): logger.info("Getting data from materialized view.") result = ( session.query( @@ -199,7 +205,7 @@ def get_latest_station_prediction_mat_view(session: Session, station_codes: List WeatherStationModelPrediction.wind_tgl_10, WeatherStationModelPrediction.update_date, ) - .join(PredictionModel, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) + .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) .join(PredictionModel, PredictionModel.id == PredictionModelRunTimestamp.prediction_model_id) .filter( WeatherStationModelPrediction.station_code.in_(station_codes), @@ -223,21 +229,7 @@ def get_latest_station_prediction_per_day(session: Session, station_codes: List[ In turn prediction runs are filtered via a join on runs that are for the selected model. """ - subquery = ( - session.query( - func.max(WeatherStationModelPrediction.prediction_timestamp).label("latest_prediction"), - WeatherStationModelPrediction.station_code, - func.date(WeatherStationModelPrediction.prediction_timestamp).label("unique_day"), - ) - .filter( - WeatherStationModelPrediction.station_code.in_(station_codes), - WeatherStationModelPrediction.prediction_timestamp >= day_start, - WeatherStationModelPrediction.prediction_timestamp <= day_end, - func.date_part("hour", WeatherStationModelPrediction.prediction_timestamp) == 20, - ) - .group_by(WeatherStationModelPrediction.station_code, func.date(WeatherStationModelPrediction.prediction_timestamp).label("unique_day")) - .subquery("latest") - ) + subquery = get_latest_model_predictions_subquery(session, station_codes, day_start, day_end) result = ( session.query( diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index 8141949e8..ff36ce90c 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -14,7 +14,6 @@ get_weather_station_model_prediction, delete_weather_station_model_predictions, delete_model_run_predictions, - refresh_morecast2_materialized_view, ) from app.weather_models.machine_learning import StationMachineLearning from app.weather_models import ModelEnum @@ -24,8 +23,7 @@ import app.utils.time as time_utils from app.utils.redis import create_redis from app.stations import get_stations_synchronously -from app.db.models.weather_models import (ProcessedModelRunUrl, PredictionModelRunTimestamp, - WeatherStationModelPrediction, ModelRunPrediction) +from app.db.models.weather_models import ProcessedModelRunUrl, PredictionModelRunTimestamp, WeatherStationModelPrediction, ModelRunPrediction import app.db.database from app.db.crud.observations import get_accumulated_precipitation @@ -40,11 +38,11 @@ class UnhandledPredictionModelType(Exception): - """ Exception raised when an unknown model type is encountered. """ + """Exception raised when an unknown model type is encountered.""" class CompletedWithSomeExceptions(Exception): - """ Exception raised when processing completed, but there were some non critical exceptions """ + """Exception raised when processing completed, but there were some non critical exceptions""" def download(url: str, path: str, config_cache_var: str, model_name: str, config_cache_expiry_var=None) -> str: @@ -55,7 +53,7 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config is a security concern. TODO: Would be nice to make this an async """ - if model_name == 'GFS': + if model_name == "GFS": original_filename = os.path.split(url)[-1] # NOTE: This is a very not-ideal way to interpolate the filename. # The original_filename that we get from the url is too long and must be condensed. @@ -63,7 +61,7 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config # As long as NOAA's API remains unchanged, we'll have all the info we need (run datetimes, # projections, etc.) in the first 81 characters of original_filename. # An alternative would be to build out a regex to look for - filename = original_filename[:81].replace('.', '') + filename = original_filename[:81].replace(".", "") else: # Infer filename from url. filename = os.path.split(url)[-1] @@ -74,7 +72,7 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config # saves having to re-download the file all the time. # It also save a lot of bandwidth in our dev environment, where we have multiple workers downloading # the same files over and over. - if config.get(config_cache_var) == 'True': + if config.get(config_cache_var) == "True": cache = create_redis() try: cached_object = cache.get(url) @@ -85,13 +83,13 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config cached_object = None cache = None if cached_object: - logger.info('Cache hit %s', url) + logger.info("Cache hit %s", url) # Store the cached object in a file - with open(target, 'wb') as file_object: + with open(target, "wb") as file_object: # Write the file. file_object.write(cached_object) else: - logger.info('Downloading %s', url) + logger.info("Downloading %s", url) # It's important to have a timeout on the get, otherwise the call may get stuck for an indefinite # amount of time - there is no default value for timeout. During testing, it was observed that # downloads usually complete in less than a second. @@ -99,17 +97,17 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config # If the response is 200/OK. if response.status_code == 200: # Store the response. - with open(target, 'wb') as file_object: + with open(target, "wb") as file_object: # Write the file. file_object.write(response.content) # Cache the response if cache: - with open(target, 'rb') as file_object: + with open(target, "rb") as file_object: # Cache for 6 hours (21600 seconds) cache.set(url, file_object.read(), ex=config.get(config_cache_expiry_var, 21600)) elif response.status_code == 404: # We expect this to happen frequently - just log for info. - logger.info('404 error for %s', url) + logger.info("404 error for %s", url) target = None else: # Raise an exception @@ -119,40 +117,34 @@ def download(url: str, path: str, config_cache_var: str, model_name: str, config def get_closest_index(coordinate: List, points: List): - """ Get the index of the point closest to the coordinate """ + """Get the index of the point closest to the coordinate""" # https://pyproj4.github.io/pyproj/stable/api/geod.html # Use GRS80 ellipsoid (it's what NAD83 uses) geod = Geod(ellps="GRS80") # Calculate the distance each point is from the coordinate. - _, _, distances = geod.inv([coordinate[0] for _ in range(4)], - [coordinate[1] for _ in range(4)], - [x[0] for x in points], - [x[1] for x in points]) + _, _, distances = geod.inv([coordinate[0] for _ in range(4)], [coordinate[1] for _ in range(4)], [x[0] for x in points], [x[1] for x in points]) # Return the index of the point with the shortest distance. return numpy.argmin(distances) def flag_file_as_processed(url: str, session: Session): - """ Flag the file as processed in the database """ + """Flag the file as processed in the database""" processed_file = get_processed_file_record(session, url) if processed_file: - logger.info('re-procesed %s', url) + logger.info("re-procesed %s", url) else: - logger.info('file processed %s', url) - processed_file = ProcessedModelRunUrl( - url=url, - create_date=time_utils.get_utc_now()) + logger.info("file processed %s", url) + processed_file = ProcessedModelRunUrl(url=url, create_date=time_utils.get_utc_now()) processed_file.update_date = time_utils.get_utc_now() session.add(processed_file) session.commit() def check_if_model_run_complete(session: Session, urls): - """ Check if a particular model run is complete """ + """Check if a particular model run is complete""" actual_count = get_processed_file_count(session, urls) expected_count = len(urls) - logger.info('we have processed %s/%s files', - actual_count, expected_count) + logger.info("we have processed %s/%s files", actual_count, expected_count) return actual_count == expected_count and actual_count > 0 @@ -170,7 +162,7 @@ def apply_data_retention_policy(): def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: ModelRunPrediction, model_run_hour: int): - """ Calculate overall cumulative precip and cumulative precip for the current prediction. """ + """Calculate overall cumulative precip and cumulative precip for the current prediction.""" # 00 and 12 hour model runs accumulate precipitation in 12 hour intervals, 06 and 18 hour accumulate in # 3 hour intervals nam_accumulation_interval = 3 if model_run_hour == 6 or model_run_hour == 18 else 12 @@ -184,117 +176,78 @@ def accumulate_nam_precipitation(nam_cumulative_precip: float, prediction: Model class ModelValueProcessor: - """ Iterate through model runs that have completed, and calculate the interpolated weather predictions. - """ + """Iterate through model runs that have completed, and calculate the interpolated weather predictions.""" def __init__(self, session): - """ Prepare variables we're going to use throughout """ + """Prepare variables we're going to use throughout""" self.session = session self.stations = get_stations_synchronously() self.station_count = len(self.stations) def _process_model_run(self, model_run: PredictionModelRunTimestamp, model_type: ModelEnum): - """ Interpolate predictions in the provided model run for all stations. """ - logger.info('Interpolating values for model run: %s', model_run) + """Interpolate predictions in the provided model run for all stations.""" + logger.info("Interpolating values for model run: %s", model_run) # Iterate through stations. for index, station in enumerate(self.stations): - logger.info('Interpolating model run %s (%s/%s) for %s:%s', - model_run.id, - index, self.station_count, - station.code, station.name) + logger.info("Interpolating model run %s (%s/%s) for %s:%s", model_run.id, index, self.station_count, station.code, station.name) # Process this model run for station. self._process_model_run_for_station(model_run, station, model_type) # Commit all the weather station model predictions (it's fast if we line them all up and commit # them in one go.) - logger.info('commit to database...') + logger.info("commit to database...") self.session.commit() - logger.info('done commit.') + logger.info("done commit.") def _add_interpolated_bias_adjustments_to_prediction(self, station_prediction: WeatherStationModelPrediction, machine: StationMachineLearning): # We need to interpolate prediction for 2000 using predictions for 1800 and 2100 # Predict the temperature - temp_at_1800 = machine.predict_temperature(station_prediction.tmp_tgl_2, - station_prediction.prediction_timestamp.replace(hour=18)) - temp_at_2100 = machine.predict_temperature(station_prediction.tmp_tgl_2, - station_prediction.prediction_timestamp.replace(hour=21)) - station_prediction.bias_adjusted_temperature = interpolate_between_two_points(18, 21, temp_at_1800, - temp_at_2100, 20) + temp_at_1800 = machine.predict_temperature(station_prediction.tmp_tgl_2, station_prediction.prediction_timestamp.replace(hour=18)) + temp_at_2100 = machine.predict_temperature(station_prediction.tmp_tgl_2, station_prediction.prediction_timestamp.replace(hour=21)) + station_prediction.bias_adjusted_temperature = interpolate_between_two_points(18, 21, temp_at_1800, temp_at_2100, 20) # Predict the rh - rh_at_1800 = machine.predict_rh(station_prediction.rh_tgl_2, - station_prediction.prediction_timestamp.replace(hour=18)) - rh_at_2100 = machine.predict_rh(station_prediction.rh_tgl_2, - station_prediction.prediction_timestamp.replace(hour=21)) + rh_at_1800 = machine.predict_rh(station_prediction.rh_tgl_2, station_prediction.prediction_timestamp.replace(hour=18)) + rh_at_2100 = machine.predict_rh(station_prediction.rh_tgl_2, station_prediction.prediction_timestamp.replace(hour=21)) station_prediction.bias_adjusted_rh = interpolate_between_two_points(18, 21, rh_at_1800, rh_at_2100, 20) # Predict the wind speed - wind_speed_at_1800 = machine.predict_wind_speed(station_prediction.wind_tgl_10, - station_prediction.prediction_timestamp.replace(hour=18)) - wind_speed_at_2100 = machine.predict_wind_speed(station_prediction.wind_tgl_10, - station_prediction.prediction_timestamp.replace(hour=21)) - station_prediction.bias_adjusted_wind_speed = interpolate_between_two_points(18, 21, wind_speed_at_1800, - wind_speed_at_2100, 20) - + wind_speed_at_1800 = machine.predict_wind_speed(station_prediction.wind_tgl_10, station_prediction.prediction_timestamp.replace(hour=18)) + wind_speed_at_2100 = machine.predict_wind_speed(station_prediction.wind_tgl_10, station_prediction.prediction_timestamp.replace(hour=21)) + station_prediction.bias_adjusted_wind_speed = interpolate_between_two_points(18, 21, wind_speed_at_1800, wind_speed_at_2100, 20) + # Predict the wind direction wind_direction_at_1800 = station_prediction.bias_adjusted_wdir = machine.predict_wind_direction( - station_prediction.wind_tgl_10, - station_prediction.wdir_tgl_10, - station_prediction.prediction_timestamp.replace(hour=18) + station_prediction.wind_tgl_10, station_prediction.wdir_tgl_10, station_prediction.prediction_timestamp.replace(hour=18) ) wind_direction_at_2100 = station_prediction.bias_adjusted_wdir = machine.predict_wind_direction( - station_prediction.wind_tgl_10, - station_prediction.wdir_tgl_10, - station_prediction.prediction_timestamp.replace(hour=21) + station_prediction.wind_tgl_10, station_prediction.wdir_tgl_10, station_prediction.prediction_timestamp.replace(hour=21) ) - station_prediction.bias_adjusted_wdir = interpolate_between_two_points(18, 21, wind_direction_at_1800, - wind_direction_at_2100, 20) + station_prediction.bias_adjusted_wdir = interpolate_between_two_points(18, 21, wind_direction_at_1800, wind_direction_at_2100, 20) # Predict the 24h precipitation. No interpolation necessary due to the underlying model training. - station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation( - station_prediction.precip_24h, - station_prediction.prediction_timestamp - ) - - def _add_bias_adjustments_to_prediction(self, station_prediction: WeatherStationModelPrediction, - machine: StationMachineLearning): + station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation(station_prediction.precip_24h, station_prediction.prediction_timestamp) + + def _add_bias_adjustments_to_prediction(self, station_prediction: WeatherStationModelPrediction, machine: StationMachineLearning): # Predict the temperature - station_prediction.bias_adjusted_temperature = machine.predict_temperature( - station_prediction.tmp_tgl_2, - station_prediction.prediction_timestamp) - + station_prediction.bias_adjusted_temperature = machine.predict_temperature(station_prediction.tmp_tgl_2, station_prediction.prediction_timestamp) + # Predict the rh - station_prediction.bias_adjusted_rh = machine.predict_rh(station_prediction.rh_tgl_2, - station_prediction.prediction_timestamp) - + station_prediction.bias_adjusted_rh = machine.predict_rh(station_prediction.rh_tgl_2, station_prediction.prediction_timestamp) + # Predict the wind speed - station_prediction.bias_adjusted_wind_speed = machine.predict_wind_speed( - station_prediction.wind_tgl_10, - station_prediction.prediction_timestamp - ) + station_prediction.bias_adjusted_wind_speed = machine.predict_wind_speed(station_prediction.wind_tgl_10, station_prediction.prediction_timestamp) # Predict the wind direction - station_prediction.bias_adjusted_wdir = machine.predict_wind_direction( - station_prediction.wind_tgl_10, - station_prediction.wdir_tgl_10, - station_prediction.prediction_timestamp - ) + station_prediction.bias_adjusted_wdir = machine.predict_wind_direction(station_prediction.wind_tgl_10, station_prediction.wdir_tgl_10, station_prediction.prediction_timestamp) # Predict the 24h precipitation - station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation( - station_prediction.precip_24h, - station_prediction.prediction_timestamp - ) + station_prediction.bias_adjusted_precip_24h = machine.predict_precipitation(station_prediction.precip_24h, station_prediction.prediction_timestamp) - def _process_prediction(self, - prediction: ModelRunPrediction, - station: WeatherStation, - model_run: PredictionModelRunTimestamp, - machine: StationMachineLearning, - prediction_is_interpolated: bool): - """ Create a WeatherStationModelPrediction from the ModelRunPrediction data. - """ + def _process_prediction( + self, prediction: ModelRunPrediction, station: WeatherStation, model_run: PredictionModelRunTimestamp, machine: StationMachineLearning, prediction_is_interpolated: bool + ): + """Create a WeatherStationModelPrediction from the ModelRunPrediction data.""" # If there's already a prediction, we want to update it - station_prediction = get_weather_station_model_prediction( - self.session, station.code, model_run.id, prediction.prediction_timestamp) + station_prediction = get_weather_station_model_prediction(self.session, station.code, model_run.id, prediction.prediction_timestamp) if station_prediction is None: station_prediction = WeatherStationModelPrediction() # Populate the weather station prediction object. @@ -306,7 +259,7 @@ def _process_prediction(self, # NOTE: Not sure why this value would ever be None. This could happen if for whatever reason, the # tmp_tgl_2 layer failed to download and process, while other layers did. if prediction.tmp_tgl_2 is None: - logger.warning('tmp_tgl_2 is None for ModelRunPrediction.id == %s', prediction.id) + logger.warning("tmp_tgl_2 is None for ModelRunPrediction.id == %s", prediction.id) else: station_prediction.tmp_tgl_2 = prediction.tmp_tgl_2 @@ -315,7 +268,7 @@ def _process_prediction(self, # rh_tgl_2 layer failed to download and process, while other layers did. if prediction.rh_tgl_2 is None: # This is unexpected, so we log it. - logger.warning('rh_tgl_2 is None for ModelRunPrediction.id == %s', prediction.id) + logger.warning("rh_tgl_2 is None for ModelRunPrediction.id == %s", prediction.id) station_prediction.rh_tgl_2 = None else: station_prediction.rh_tgl_2 = prediction.rh_tgl_2 @@ -328,10 +281,8 @@ def _process_prediction(self, # Calculate the delta_precipitation and 24 hour precip based on station's previous prediction_timestamp # for the same model run self.session.flush() - station_prediction.precip_24h = self._calculate_past_24_hour_precip( - station, model_run, prediction, station_prediction) - station_prediction.delta_precip = self._calculate_delta_precip( - station, model_run, prediction, station_prediction) + station_prediction.precip_24h = self._calculate_past_24_hour_precip(station, model_run, prediction, station_prediction) + station_prediction.delta_precip = self._calculate_delta_precip(station, model_run, prediction, station_prediction) # Get the closest wind speed if prediction.wind_tgl_10 is not None: @@ -344,7 +295,7 @@ def _process_prediction(self, # Dealing with a numerical weather model that only has predictions at 3 hour intervals, # so no 20:00 UTC prediction available in the trained linear regression self._add_interpolated_bias_adjustments_to_prediction(station_prediction, machine) - + else: # No interpolation required self._add_bias_adjustments_to_prediction(station_prediction, machine) @@ -354,15 +305,15 @@ def _process_prediction(self, # Add this prediction to the session (we'll commit it later.) self.session.add(station_prediction) - def _calculate_past_24_hour_precip(self, station: WeatherStation, model_run: PredictionModelRunTimestamp, - prediction: ModelRunPrediction, station_prediction: WeatherStationModelPrediction): - """ Calculate the predicted precipitation over the previous 24 hours within the specified model run. + def _calculate_past_24_hour_precip( + self, station: WeatherStation, model_run: PredictionModelRunTimestamp, prediction: ModelRunPrediction, station_prediction: WeatherStationModelPrediction + ): + """Calculate the predicted precipitation over the previous 24 hours within the specified model run. If the model run does not contain a prediction timestamp for 24 hours prior to the current prediction, - return the predicted precipitation from the previous run of the same model for the same time frame. """ + return the predicted precipitation from the previous run of the same model for the same time frame.""" start_prediction_timestamp = prediction.prediction_timestamp - timedelta(days=1) # Check if a prediction exists for this model run 24 hours in the past - previous_prediction_from_same_model_run = get_weather_station_model_prediction(self.session, station.code, - model_run.id, start_prediction_timestamp) + previous_prediction_from_same_model_run = get_weather_station_model_prediction(self.session, station.code, model_run.id, start_prediction_timestamp) # If a prediction from 24 hours ago from the same model run exists, return the difference in cumulative precipitation # between now and then as our total for the past 24 hours. We can end up with very very small negative numbers due # to floating point math, so return absolute value to avoid displaying -0.0. @@ -373,24 +324,23 @@ def _calculate_past_24_hour_precip(self, station: WeatherStation, model_run: Pre # We use actual precipitation from our API hourly_actuals table to make up the missing hours. prediction_timestamp = station_prediction.prediction_timestamp # Create new datetime with time of 00:00 hours as the end time. - end_prediction_timestamp = datetime(year=prediction_timestamp.year, - month=prediction_timestamp.month, - day=prediction_timestamp.day, - tzinfo=timezone.utc) - actual_precip = get_accumulated_precipitation( - self.session, station.code, start_prediction_timestamp, end_prediction_timestamp) + end_prediction_timestamp = datetime(year=prediction_timestamp.year, month=prediction_timestamp.month, day=prediction_timestamp.day, tzinfo=timezone.utc) + actual_precip = get_accumulated_precipitation(self.session, station.code, start_prediction_timestamp, end_prediction_timestamp) return actual_precip + station_prediction.apcp_sfc_0 def _calculate_delta_precip(self, station, model_run, prediction, station_prediction): - """ Calculate the station_prediction's delta_precip based on the previous precip + """Calculate the station_prediction's delta_precip based on the previous precip prediction for the station """ - results = self.session.query(WeatherStationModelPrediction).\ - filter(WeatherStationModelPrediction.station_code == station.code).\ - filter(WeatherStationModelPrediction.prediction_model_run_timestamp_id == model_run.id).\ - filter(WeatherStationModelPrediction.prediction_timestamp < prediction.prediction_timestamp).\ - order_by(WeatherStationModelPrediction.prediction_timestamp.desc()).\ - limit(1).first() + results = ( + self.session.query(WeatherStationModelPrediction) + .filter(WeatherStationModelPrediction.station_code == station.code) + .filter(WeatherStationModelPrediction.prediction_model_run_timestamp_id == model_run.id) + .filter(WeatherStationModelPrediction.prediction_timestamp < prediction.prediction_timestamp) + .order_by(WeatherStationModelPrediction.prediction_timestamp.desc()) + .limit(1) + .first() + ) # If there exists a previous prediction for the station from the same model run if results is not None: return station_prediction.apcp_sfc_0 - results.apcp_sfc_0 @@ -399,23 +349,15 @@ def _calculate_delta_precip(self, station, model_run, prediction, station_predic # model type). In this case, delta_precip will be equal to the apcp return station_prediction.apcp_sfc_0 - def _process_model_run_for_station(self, - model_run: PredictionModelRunTimestamp, - station: WeatherStation, - model_type: ModelEnum): - """ Process the model run for the provided station. - """ + def _process_model_run_for_station(self, model_run: PredictionModelRunTimestamp, station: WeatherStation, model_type: ModelEnum): + """Process the model run for the provided station.""" # Extract the coordinate. coordinate = [station.long, station.lat] # Lookup the grid our weather station is in. - logger.info("Getting grid for coordinate %s and model %s", - coordinate, model_run.prediction_model) + logger.info("Getting grid for coordinate %s and model %s", coordinate, model_run.prediction_model) machine = StationMachineLearning( - session=self.session, - model=model_run.prediction_model, - target_coordinate=coordinate, - station_code=station.code, - max_learn_date=model_run.prediction_run_timestamp) + session=self.session, model=model_run.prediction_model, target_coordinate=coordinate, station_code=station.code, max_learn_date=model_run.prediction_run_timestamp + ) machine.learn() # Get all the predictions associated to this particular model run. @@ -428,41 +370,29 @@ def _process_model_run_for_station(self, for prediction in query: # NAM model requires manual calculation of cumulative precip if model_type == ModelEnum.NAM: - nam_cumulative_precip, prediction.apcp_sfc_0 = accumulate_nam_precipitation( - nam_cumulative_precip, prediction, model_run.prediction_run_timestamp.hour) - if (prev_prediction is not None - and prev_prediction.prediction_timestamp.hour == 18 - and prediction.prediction_timestamp.hour == 21): + nam_cumulative_precip, prediction.apcp_sfc_0 = accumulate_nam_precipitation(nam_cumulative_precip, prediction, model_run.prediction_run_timestamp.hour) + if prev_prediction is not None and prev_prediction.prediction_timestamp.hour == 18 and prediction.prediction_timestamp.hour == 21: noon_prediction = construct_interpolated_noon_prediction(prev_prediction, prediction, SCALAR_MODEL_VALUE_KEYS_FOR_INTERPOLATION) - self._process_prediction( - noon_prediction, station, model_run, machine, True) - self._process_prediction( - prediction, station, model_run, machine, False) + self._process_prediction(noon_prediction, station, model_run, machine, True) + self._process_prediction(prediction, station, model_run, machine, False) prev_prediction = prediction def _mark_model_run_interpolated(self, model_run: PredictionModelRunTimestamp): - """ Having completely processed a model run, we can mark it has having been interpolated. - """ + """Having completely processed a model run, we can mark it has having been interpolated.""" model_run.interpolated = True - logger.info('marking %s as interpolated', model_run) + logger.info("marking %s as interpolated", model_run) self.session.add(model_run) self.session.commit() def process(self, model_type: ModelEnum): - """ Entry point to start processing model runs that have not yet had their predictions interpolated - """ + """Entry point to start processing model runs that have not yet had their predictions interpolated""" # Get model runs that are complete (fully downloaded), but not yet interpolated. - query = get_prediction_model_run_timestamp_records( - self.session, complete=True, interpolated=False, model_type=model_type) - model_processed = False + query = get_prediction_model_run_timestamp_records(self.session, complete=True, interpolated=False, model_type=model_type) for model_run, model in query: - model_processed = True - logger.info('model %s', model) - logger.info('model_run %s', model_run) + logger.info("model %s", model) + logger.info("model_run %s", model_run) # Process the model run. self._process_model_run(model_run, model_type) # Mark the model run as interpolated. self._mark_model_run_interpolated(model_run) - if model_processed: - refresh_morecast2_materialized_view(self.session) diff --git a/api/app/weather_models/fetch/predictions.py b/api/app/weather_models/fetch/predictions.py index 1426ed85a..a094fc5db 100644 --- a/api/app/weather_models/fetch/predictions.py +++ b/api/app/weather_models/fetch/predictions.py @@ -1,5 +1,4 @@ -""" Code for fetching data for API. -""" +"""Code for fetching data for API.""" from itertools import groupby import logging @@ -11,12 +10,14 @@ from sqlalchemy.orm import Session import app.db.database from app.schemas.morecast_v2 import WeatherIndeterminate -from app.schemas.weather_models import (WeatherStationModelPredictionValues, WeatherModelPredictionValues, WeatherModelRun, - ModelRunPredictions, - WeatherStationModelRunsPredictions) +from app.schemas.weather_models import WeatherStationModelPredictionValues, WeatherModelPredictionValues, WeatherModelRun, ModelRunPredictions, WeatherStationModelRunsPredictions from app.db.models.weather_models import WeatherStationModelPrediction -from app.db.crud.weather_models import (get_latest_station_model_prediction_per_day, get_station_model_predictions, - get_station_model_prediction_from_previous_model_run, get_latest_station_prediction_mat_view) +from app.db.crud.weather_models import ( + get_latest_station_model_prediction_per_day, + get_station_model_predictions, + get_station_model_prediction_from_previous_model_run, + get_latest_station_prediction, +) import app.stations from app.utils.time import get_days_from_range from app.weather_models import ModelEnum @@ -25,34 +26,26 @@ class MatchingStationNotFoundException(Exception): - """ Exception raised when station cannot be found. """ + """Exception raised when station cannot be found.""" def _fetch_delta_precip_for_prev_model_run( - session: Session, - model: ModelEnum, - prediction: WeatherStationModelPrediction, - prev_station_predictions: dict, - prediction_model_run_timestamp: datetime.datetime): + session: Session, model: ModelEnum, prediction: WeatherStationModelPrediction, prev_station_predictions: dict, prediction_model_run_timestamp: datetime.datetime +): # Look if we can find the previous value in memory if prediction.prediction_timestamp in prev_station_predictions[prediction.station_code]: prev_station_prediction = prev_station_predictions[prediction.station_code] - return prev_station_prediction[prediction.prediction_timestamp]['prediction'].delta_precipitation + return prev_station_prediction[prediction.prediction_timestamp]["prediction"].delta_precipitation # Uh oh - couldn't find it - let's go look in the database. # This should only happen in extreme edge cases! - prev_prediction = get_station_model_prediction_from_previous_model_run( - session, prediction.station_code, model, prediction.prediction_timestamp, - prediction_model_run_timestamp) + prev_prediction = get_station_model_prediction_from_previous_model_run(session, prediction.station_code, model, prediction.prediction_timestamp, prediction_model_run_timestamp) if prev_prediction: return prev_prediction.delta_precip return None -async def fetch_model_run_predictions_by_station_code( - model: ModelEnum, - station_codes: List[int], - time_of_interest: datetime) -> List[WeatherStationModelRunsPredictions]: - """ Fetch model predictions from database based on list of station codes, for a specified datetime. +async def fetch_model_run_predictions_by_station_code(model: ModelEnum, station_codes: List[int], time_of_interest: datetime) -> List[WeatherStationModelRunsPredictions]: + """Fetch model predictions from database based on list of station codes, for a specified datetime. Predictions are grouped by station and model run. """ # We're interested in the 5 days prior to and 10 days following the time_of_interest. @@ -62,31 +55,26 @@ async def fetch_model_run_predictions_by_station_code( async def fetch_model_run_predictions_by_station_code_and_date_range( - model: ModelEnum, - station_codes: List[int], - start_time: datetime.datetime, - end_time: datetime.datetime) -> List[WeatherStationModelRunsPredictions]: - """ Fetch model predictions from database based on list of station codes and date range. + model: ModelEnum, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime +) -> List[WeatherStationModelRunsPredictions]: + """Fetch model predictions from database based on list of station codes and date range. Predictions are grouped by station and model run. """ # send the query (ordered by prediction date.) with app.db.database.get_read_session_scope() as session: - historic_predictions = get_station_model_predictions( - session, station_codes, model, start_time, end_time) + historic_predictions = get_station_model_predictions(session, station_codes, model, start_time, end_time) return await marshall_predictions(session, model, station_codes, historic_predictions) -async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_range(model: ModelEnum, - station_codes: List[int], - start_time: datetime.datetime, - end_time: datetime.datetime) -> List[WeatherStationModelRunsPredictions]: +async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_range( + model: ModelEnum, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime +) -> List[WeatherStationModelRunsPredictions]: results = [] days = get_days_from_range(start_time, end_time) stations = {station.code: station for station in await app.stations.get_stations_by_codes(station_codes)} with app.db.database.get_read_session_scope() as session: - for day in days: day_results = [] vancouver_tz = pytz.timezone("America/Vancouver") @@ -94,8 +82,7 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang day_start = vancouver_tz.localize(datetime.datetime.combine(day, time.min)) day_end = vancouver_tz.localize(datetime.datetime.combine(day, time.max)) - daily_result = get_latest_station_model_prediction_per_day( - session, station_codes, model, day_start, day_end) + daily_result = get_latest_station_model_prediction_per_day(session, station_codes, model, day_start, day_end) for id, timestamp, model_abbrev, station_code, rh, temp, bias_adjusted_temp, bias_adjusted_rh, precip_24hours, wind_dir, wind_speed, update_date in daily_result: day_results.append( WeatherStationModelPredictionValues( @@ -110,8 +97,9 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang wind_speed=wind_speed, wind_direction=wind_dir, datetime=timestamp, - update_date=update_date - )) + update_date=update_date, + ) + ) # sort the list by station_code day_results.sort(key=lambda x: x.station.code) @@ -124,10 +112,9 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang return results -async def fetch_latest_model_run_predictions_by_station_code_and_date_range(session: Session, - station_codes: List[int], - start_time: datetime.datetime, - end_time: datetime.datetime) -> List[WeatherIndeterminate]: +async def fetch_latest_model_run_predictions_by_station_code_and_date_range( + session: Session, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime +) -> List[WeatherIndeterminate]: results: List[WeatherIndeterminate] = [] days = get_days_from_range(start_time, end_time) stations = {station.code: station for station in await app.stations.get_stations_by_codes(station_codes)} @@ -138,10 +125,23 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess day_start = vancouver_tz.localize(datetime.datetime.combine(day, time.min)) day_end = vancouver_tz.localize(datetime.datetime.combine(day, time.max)) - daily_result = get_latest_station_prediction_mat_view( - session, active_station_codes, day_start, day_end) - for timestamp, model_abbrev, station_code, rh, temp, bias_adjusted_temp, bias_adjusted_rh, bias_adjusted_wind_speed, bias_adjusted_wdir, precip_24hours, bias_adjusted_precip_24h, wind_dir, wind_speed, update_date in daily_result: - + daily_result = get_latest_station_prediction(session, active_station_codes, day_start, day_end) + for ( + timestamp, + model_abbrev, + station_code, + rh, + temp, + bias_adjusted_temp, + bias_adjusted_rh, + bias_adjusted_wind_speed, + bias_adjusted_wdir, + precip_24hours, + bias_adjusted_precip_24h, + wind_dir, + wind_speed, + update_date, + ) in daily_result: # Create two WeatherIndeterminates, one for model predictions and one for bias corrected predictions results.append( WeatherIndeterminate( @@ -153,20 +153,22 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess relative_humidity=rh, precipitation=precip_24hours, wind_direction=wind_dir, - wind_speed=wind_speed - )) + wind_speed=wind_speed, + ) + ) results.append( WeatherIndeterminate( station_code=station_code, station_name=stations[station_code].name, - determinate=f'{model_abbrev}_BIAS', + determinate=f"{model_abbrev}_BIAS", utc_timestamp=timestamp, temperature=bias_adjusted_temp, relative_humidity=bias_adjusted_rh, precipitation=bias_adjusted_precip_24h, wind_speed=bias_adjusted_wind_speed, - wind_direction=bias_adjusted_wdir - )) + wind_direction=bias_adjusted_wdir, + ) + ) return post_process_fetched_predictions(results) @@ -193,26 +195,17 @@ async def marshall_predictions(session: Session, model: ModelEnum, station_codes # day, so we need to look at the accumulated precip from the previous model run to calculate the # delta_precip precip_value = None - if prediction.prediction_timestamp == prediction_model_run_timestamp.prediction_run_timestamp and \ - prediction.prediction_timestamp.hour > 0: - precip_value = _fetch_delta_precip_for_prev_model_run( - session, - model, - prediction, - station_predictions, - prediction_model_run_timestamp.prediction_run_timestamp) + if prediction.prediction_timestamp == prediction_model_run_timestamp.prediction_run_timestamp and prediction.prediction_timestamp.hour > 0: + precip_value = _fetch_delta_precip_for_prev_model_run(session, model, prediction, station_predictions, prediction_model_run_timestamp.prediction_run_timestamp) # This condition catches situations where we are not at hour 000 of the model run, or where it is # hour 000 but there was nothing returned from _fetch_delta_precip_for_prev_model_run() if precip_value is None: precip_value = prediction.delta_precip station_predictions[prediction.station_code][prediction.prediction_timestamp] = { - 'model_run': WeatherModelRun( - datetime=prediction_model_run_timestamp.prediction_run_timestamp, - name=prediction_model.name, - abbreviation=model, - projection=prediction_model.projection + "model_run": WeatherModelRun( + datetime=prediction_model_run_timestamp.prediction_run_timestamp, name=prediction_model.name, abbreviation=model, projection=prediction_model.projection ), - 'prediction': WeatherModelPredictionValues( + "prediction": WeatherModelPredictionValues( temperature=prediction.tmp_tgl_2, bias_adjusted_temperature=prediction.bias_adjusted_temperature, relative_humidity=prediction.rh_tgl_2, @@ -220,8 +213,8 @@ async def marshall_predictions(session: Session, model: ModelEnum, station_codes delta_precipitation=precip_value, wind_speed=prediction.wind_tgl_10, wind_direction=prediction.wdir_tgl_10, - datetime=prediction.prediction_timestamp - ) + datetime=prediction.prediction_timestamp, + ), } # Re-structure the data, grouping data by station and model run. @@ -231,19 +224,12 @@ async def marshall_predictions(session: Session, model: ModelEnum, station_codes for station_code, predictions in station_predictions.items(): model_run_dict = {} for prediction in predictions.values(): - - if prediction['model_run'].datetime in model_run_dict: - model_run_predictions = model_run_dict[prediction['model_run'].datetime] + if prediction["model_run"].datetime in model_run_dict: + model_run_predictions = model_run_dict[prediction["model_run"].datetime] else: - model_run_predictions = ModelRunPredictions( - model_run=prediction['model_run'], - values=[] - ) - model_run_dict[prediction['model_run'].datetime] = model_run_predictions - model_run_predictions.values.append(prediction['prediction']) + model_run_predictions = ModelRunPredictions(model_run=prediction["model_run"], values=[]) + model_run_dict[prediction["model_run"].datetime] = model_run_predictions + model_run_predictions.values.append(prediction["prediction"]) - response.append(WeatherStationModelRunsPredictions( - station=stations[station_code], - model_runs=list(model_run_dict.values()) - )) + response.append(WeatherStationModelRunsPredictions(station=stations[station_code], model_runs=list(model_run_dict.values()))) return response From 3d8ad27a8f7a39d5c19d15f0ed316fbaaa279fb9 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 21 Nov 2024 11:07:25 -0800 Subject: [PATCH 31/34] Remove unused function --- api/app/db/crud/weather_models.py | 71 ++++++------------------------- 1 file changed, 14 insertions(+), 57 deletions(-) diff --git a/api/app/db/crud/weather_models.py b/api/app/db/crud/weather_models.py index ceb42bb45..bb1a65b23 100644 --- a/api/app/db/crud/weather_models.py +++ b/api/app/db/crud/weather_models.py @@ -126,7 +126,20 @@ def get_station_model_predictions( return query -def get_latest_model_predictions_subquery(session: Session, station_codes: List[int], day_start: datetime.datetime, day_end: datetime.datetime): +def get_latest_station_model_prediction_per_day(session: Session, station_codes: List[int], model: str, day_start: datetime.datetime, day_end: datetime.datetime): + """ + All weather station model predictions for: + - a given day + - a given model + - each station in the given list + ordered by update_timestamp + + This is done by joining the predictions on their runs, + that are filtered by the day and the 20:00UTC predictions. + + In turn prediction runs are filtered via a join + on runs that are for the selected model. + """ subquery = ( session.query( func.max(WeatherStationModelPrediction.prediction_timestamp).label("latest_prediction"), @@ -143,25 +156,6 @@ def get_latest_model_predictions_subquery(session: Session, station_codes: List[ .subquery("latest") ) - return subquery - - -def get_latest_station_model_prediction_per_day(session: Session, station_codes: List[int], model: str, day_start: datetime.datetime, day_end: datetime.datetime): - """ - All weather station model predictions for: - - a given day - - a given model - - each station in the given list - ordered by update_timestamp - - This is done by joining the predictions on their runs, - that are filtered by the day and the 20:00UTC predictions. - - In turn prediction runs are filtered via a join - on runs that are for the selected model. - """ - subquery = get_latest_model_predictions_subquery(session, station_codes, day_start, day_end) - result = ( session.query( WeatherStationModelPrediction.id, @@ -216,43 +210,6 @@ def get_latest_station_prediction(session: Session, station_codes: List[int], da return result -def get_latest_station_prediction_per_day(session: Session, station_codes: List[int], day_start: datetime.datetime, day_end: datetime.datetime): - """ - All weather station model predictions for: - - a given day - - each station in the given list - ordered by update_timestamp - - This is done by joining the predictions on their runs, - that are filtered by the day and the 20:00UTC predictions. - - In turn prediction runs are filtered via a join - on runs that are for the selected model. - """ - subquery = get_latest_model_predictions_subquery(session, station_codes, day_start, day_end) - - result = ( - session.query( - WeatherStationModelPrediction.prediction_timestamp, - PredictionModel.abbreviation, - WeatherStationModelPrediction.station_code, - WeatherStationModelPrediction.rh_tgl_2, - WeatherStationModelPrediction.tmp_tgl_2, - WeatherStationModelPrediction.bias_adjusted_temperature, - WeatherStationModelPrediction.bias_adjusted_rh, - WeatherStationModelPrediction.apcp_sfc_0, - WeatherStationModelPrediction.wdir_tgl_10, - WeatherStationModelPrediction.wind_tgl_10, - WeatherStationModelPrediction.update_date, - ) - .join(PredictionModelRunTimestamp, WeatherStationModelPrediction.prediction_model_run_timestamp_id == PredictionModelRunTimestamp.id) - .join(PredictionModel, PredictionModelRunTimestamp.prediction_model_id == PredictionModel.id) - .join(subquery, and_(WeatherStationModelPrediction.prediction_timestamp == subquery.c.latest_prediction, WeatherStationModelPrediction.station_code == subquery.c.station_code)) - .order_by(WeatherStationModelPrediction.update_date.desc()) - ) - return result - - def get_station_model_prediction_from_previous_model_run( session: Session, station_code: int, model: ModelEnum, prediction_timestamp: datetime.datetime, prediction_model_run_timestamp: datetime.datetime ) -> List[WeatherStationModelPrediction]: From 8b6dd8c7c493bfa8a2fccf7a7113eab96fa44d5b Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 21 Nov 2024 13:10:16 -0800 Subject: [PATCH 32/34] Add perf measurement to latest model query code path --- api/app/weather_models/fetch/predictions.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/api/app/weather_models/fetch/predictions.py b/api/app/weather_models/fetch/predictions.py index a094fc5db..9a6e648e2 100644 --- a/api/app/weather_models/fetch/predictions.py +++ b/api/app/weather_models/fetch/predictions.py @@ -5,6 +5,7 @@ from typing import List import datetime from datetime import time +from time import perf_counter from collections import defaultdict import pytz from sqlalchemy.orm import Session @@ -115,6 +116,7 @@ async def fetch_latest_daily_model_run_predictions_by_station_code_and_date_rang async def fetch_latest_model_run_predictions_by_station_code_and_date_range( session: Session, station_codes: List[int], start_time: datetime.datetime, end_time: datetime.datetime ) -> List[WeatherIndeterminate]: + cffdrs_start = perf_counter() results: List[WeatherIndeterminate] = [] days = get_days_from_range(start_time, end_time) stations = {station.code: station for station in await app.stations.get_stations_by_codes(station_codes)} @@ -140,7 +142,6 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range( bias_adjusted_precip_24h, wind_dir, wind_speed, - update_date, ) in daily_result: # Create two WeatherIndeterminates, one for model predictions and one for bias corrected predictions results.append( @@ -169,7 +170,13 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range( wind_direction=bias_adjusted_wdir, ) ) - return post_process_fetched_predictions(results) + post_processed_results = post_process_fetched_predictions(results) + cffdrs_end = perf_counter() + delta = cffdrs_end - cffdrs_start + # Any delta below 100 milliseconds is just noise in the logs. + if delta > 0.1: + logger.info("%f delta count before and after latest prediction model query", delta) + return post_processed_results def post_process_fetched_predictions(weather_indeterminates: List[WeatherIndeterminate]): From 9ce78518f53ae6308a0c142d6cdce35e2f7e4892 Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 21 Nov 2024 13:34:16 -0800 Subject: [PATCH 33/34] point back to main --- openshift/pgslice/openshift/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openshift/pgslice/openshift/build.yaml b/openshift/pgslice/openshift/build.yaml index 7acc744c3..9cd83ee49 100644 --- a/openshift/pgslice/openshift/build.yaml +++ b/openshift/pgslice/openshift/build.yaml @@ -18,7 +18,7 @@ parameters: - name: GIT_URL value: https://github.com/bcgov/wps.git - name: GIT_BRANCH - value: task/more-fixes + value: main objects: - apiVersion: v1 kind: ImageStream From 0e428fca6e7ad746295be4b0da6a110b69825afa Mon Sep 17 00:00:00 2001 From: Conor Brady Date: Thu, 21 Nov 2024 14:20:50 -0800 Subject: [PATCH 34/34] Remove update_date from query --- api/app/db/crud/weather_models.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/app/db/crud/weather_models.py b/api/app/db/crud/weather_models.py index bb1a65b23..a46540581 100644 --- a/api/app/db/crud/weather_models.py +++ b/api/app/db/crud/weather_models.py @@ -197,7 +197,6 @@ def get_latest_station_prediction(session: Session, station_codes: List[int], da WeatherStationModelPrediction.bias_adjusted_precip_24h, WeatherStationModelPrediction.wdir_tgl_10, WeatherStationModelPrediction.wind_tgl_10, - WeatherStationModelPrediction.update_date, ) .join(PredictionModelRunTimestamp, PredictionModelRunTimestamp.id == WeatherStationModelPrediction.prediction_model_run_timestamp_id) .join(PredictionModel, PredictionModel.id == PredictionModelRunTimestamp.prediction_model_id)