diff --git a/api/app/jobs/common_model_fetchers.py b/api/app/jobs/common_model_fetchers.py index 112a09ecd..a27aced96 100644 --- a/api/app/jobs/common_model_fetchers.py +++ b/api/app/jobs/common_model_fetchers.py @@ -14,7 +14,7 @@ delete_weather_station_model_predictions, refresh_morecast2_materialized_view) from app.weather_models.machine_learning import StationMachineLearning -from app.weather_models import ModelEnum, construct_interpolated_noon_prediction +from app.weather_models import SCALAR_MODEL_VALUE_KEYS, ModelEnum, construct_interpolated_noon_prediction from app.schemas.stations import WeatherStation from app import config, configure_logging import app.utils.time as time_utils @@ -362,7 +362,8 @@ def _process_model_run_for_station(self, if (prev_prediction is not None and prev_prediction.prediction_timestamp.hour == 18 and prediction.prediction_timestamp.hour == 21): - noon_prediction = construct_interpolated_noon_prediction(prev_prediction, prediction) + noon_prediction = construct_interpolated_noon_prediction( + prev_prediction, prediction, SCALAR_MODEL_VALUE_KEYS) self._process_prediction( noon_prediction, station, model_run, machine) self._process_prediction( diff --git a/api/app/tests/weather_models/crud.py b/api/app/tests/weather_models/crud.py index 3966ea984..a19b24b62 100644 --- a/api/app/tests/weather_models/crud.py +++ b/api/app/tests/weather_models/crud.py @@ -14,16 +14,19 @@ def get_actuals_left_outer_join_with_predictions(*args): weather_date=datetime(2020, 10, 10, 18), temperature=20, temp_valid=True, + wind_direction=90, relative_humidity=50, rh_valid=True), - ModelRunPrediction( + ModelRunPrediction( tmp_tgl_2=2, rh_tgl_2=10, apcp_sfc_0=2, + wdir_tgl_10=97, prediction_timestamp=datetime(2020, 10, 10, 18))], [HourlyActual(weather_date=datetime(2020, 10, 10, 19)), None], [HourlyActual(weather_date=datetime(2020, 10, 10, 20), temperature=25, + wind_direction=270, temp_valid=True, relative_humidity=70, rh_valid=True), None], @@ -31,34 +34,40 @@ def get_actuals_left_outer_join_with_predictions(*args): weather_date=datetime(2020, 10, 10, 21), temperature=30, temp_valid=True, + wind_direction=120, relative_humidity=100, rh_valid=True), ModelRunPrediction( tmp_tgl_2=1, rh_tgl_2=20, apcp_sfc_0=3, + wdir_tgl_10=101, prediction_timestamp=datetime(2020, 10, 10, 21))], # day 2 [HourlyActual( weather_date=datetime(2020, 10, 11, 18), temperature=20, temp_valid=True, + wind_direction=121, relative_humidity=50, rh_valid=True), ModelRunPrediction( tmp_tgl_2=2, rh_tgl_2=10, apcp_sfc_0=2, + wdir_tgl_10=110, prediction_timestamp=datetime(2020, 10, 11, 18))], [HourlyActual(weather_date=datetime(2020, 10, 11, 19)), None], [HourlyActual(weather_date=datetime(2020, 10, 11, 20), temperature=27, temp_valid=True, + wind_direction=98, relative_humidity=60, rh_valid=True), None], [HourlyActual( weather_date=datetime(2020, 10, 11, 21), temperature=30, + wind_direction=118, temp_valid=True, relative_humidity=100, rh_valid=True), @@ -66,6 +75,7 @@ def get_actuals_left_outer_join_with_predictions(*args): tmp_tgl_2=1, rh_tgl_2=20, apcp_sfc_0=3, + wdir_tgl_10=111, prediction_timestamp=datetime(2020, 10, 11, 21))] ] return result diff --git a/api/app/weather_models/__init__.py b/api/app/weather_models/__init__.py index 20cbeb1aa..951c92bb9 100644 --- a/api/app/weather_models/__init__.py +++ b/api/app/weather_models/__init__.py @@ -109,7 +109,8 @@ def interpolate_wind_direction(prediction_a: ModelRunPrediction, def construct_interpolated_noon_prediction(prediction_a: ModelRunPrediction, - prediction_b: ModelRunPrediction): + prediction_b: ModelRunPrediction, + model_keys): """ Construct a noon prediction by interpolating. """ # create a noon prediction. (using utc hour 20, as that is solar noon in B.C.) @@ -121,7 +122,7 @@ def construct_interpolated_noon_prediction(prediction_a: ModelRunPrediction, timestamp_b = prediction_b.prediction_timestamp.timestamp() noon_timestamp = noon_prediction.prediction_timestamp.timestamp() # calculate interpolated values. - for key in SCALAR_MODEL_VALUE_KEYS: + for key in model_keys: value_a = getattr(prediction_a, key) value_b = getattr(prediction_b, key) if value_a is None or value_b is None: diff --git a/api/app/weather_models/fetch/predictions.py b/api/app/weather_models/fetch/predictions.py index 434fb6c6e..d39c13600 100644 --- a/api/app/weather_models/fetch/predictions.py +++ b/api/app/weather_models/fetch/predictions.py @@ -164,7 +164,7 @@ async def fetch_latest_model_run_predictions_by_station_code_and_date_range(sess temperature=bias_adjusted_temp, relative_humidity=bias_adjusted_rh, wind_speed=bias_adjusted_wind_speed, - wind_dir=bias_adjusted_wdir + wind_direction=bias_adjusted_wdir )) return post_process_fetched_predictions(results) diff --git a/api/app/weather_models/machine_learning.py b/api/app/weather_models/machine_learning.py index 202de5776..637f6f707 100644 --- a/api/app/weather_models/machine_learning.py +++ b/api/app/weather_models/machine_learning.py @@ -12,12 +12,14 @@ from app.db.models.weather_models import (PredictionModel, ModelRunPrediction) from app.db.models.observations import HourlyActual from app.db.crud.observations import get_actuals_left_outer_join_with_predictions +from app.weather_models.regression_model import RegressionModelsV2 +from app.weather_models.sample import Samples logger = getLogger(__name__) # Corresponding key values on HourlyActual and SampleCollection -SAMPLE_VALUE_KEYS = ('temperature', 'relative_humidity', 'wind_speed', 'wind_direction') +SAMPLE_VALUE_KEYS = ('temperature', 'relative_humidity', 'wind_speed') # Number of days of historical actual data to learn from when training model MAX_DAYS_TO_LEARN = 19 @@ -38,59 +40,12 @@ class RegressionModels: """ keys = ('temperature_wrapper', 'relative_humidity_wrapper', - 'wind_speed_wrapper', 'wind_direction_wrapper') + 'wind_speed_wrapper') def __init__(self): self.temperature_wrapper = LinearRegressionWrapper() self.relative_humidity_wrapper = LinearRegressionWrapper() self.wind_speed_wrapper = LinearRegressionWrapper() - self.wind_direction_wrapper = LinearRegressionWrapper() - - -class Samples: - """ Class for storing samples in buckets of hours. - e.g. a temperature sample consists of an x axis (predicted values) and a y axis (observed values) put - together in hour buckets. - """ - - def __init__(self): - self._x = defaultdict(list) - self._y = defaultdict(list) - - def hours(self): - """ Return all the hours used to bucket samples together. """ - return self._x.keys() - - def append_x(self, value, timestamp: datetime): - """ Append another predicted value. """ - self._x[timestamp.hour].append(value) - - def append_y(self, value, timestamp: datetime): - """ Append another observered values. """ - self._y[timestamp.hour].append(value) - - def np_x(self, hour): - """ Return numpy array of the predicted values, reshaped appropriately. """ - return np.array(self._x[hour]).reshape((-1, 1)) - - def np_y(self, hour): - """ Return a numpy array of the observed values """ - return np.array(self._y[hour]) - - def add_sample(self, - model_value: float, - actual_value: float, - timestamp: datetime, - model_key: str, - sample_key: str): - """ Add a sample, interpolating the model values spatially """ - # Additional logging to assist with finding errors: - logger.info('adding sample for %s->%s with: model_values %s, actual_value: %s', - model_key, sample_key, model_value, actual_value) - # Add to the data we're going to learn from: - # Using two variables, the interpolated temperature value, and the hour of the day. - self.append_x(model_value, timestamp) - self.append_y(actual_value, timestamp) class SampleCollection: @@ -100,7 +55,6 @@ def __init__(self): self.temperature = Samples() self.relative_humidity = Samples() self.wind_speed = Samples() - self.wind_direction = Samples() class StationMachineLearning: @@ -126,6 +80,7 @@ def __init__(self, self.target_coordinate = target_coordinate self.station_code = station_code self.regression_models = defaultdict(RegressionModels) + self.regression_models_v2 = RegressionModelsV2() self.max_learn_date = max_learn_date # Maximum number of days to try to learn from. Experimentation has shown that # about two weeks worth of data starts giving fairly good results compared to human forecasters. @@ -152,12 +107,9 @@ def _add_sample_to_collection(self, # are None. logger.warning('no model value for %s->%s', model_key, sample_key) - def _collect_data(self): + def _collect_data(self, start_date: datetime): """ Collect data to use for machine learning. """ - # Calculate the date to start learning from. - start_date = self.max_learn_date - \ - timedelta(days=self.max_days_to_learn) # Create a convenient structure to store samples in. sample_collection = SampleCollection() @@ -177,7 +129,8 @@ def _collect_data(self): and prev_prediction.prediction_timestamp.hour == 18): # If there's a gap in the data (like with the GLOBAL model) - then make up # a noon prediction using interpolation, and add it as a sample. - noon_prediction = construct_interpolated_noon_prediction(prev_prediction, prediction) + noon_prediction = construct_interpolated_noon_prediction( + prev_prediction, prediction, SCALAR_MODEL_VALUE_KEYS) self._add_sample_to_collection( noon_prediction, prev_actual, sample_collection) @@ -190,8 +143,12 @@ def _collect_data(self): def learn(self): """ Collect data and perform linear regression. """ + # Calculate the date to start learning from. + start_date = self.max_learn_date - \ + timedelta(days=self.max_days_to_learn) + # collect data - data = self._collect_data() + data = self._collect_data(start_date) # iterate through the data, creating a regression model for each variable # and each hour. @@ -206,6 +163,12 @@ def learn(self): # how much sample data we actually had etc., and then not mark the model as being "good". regression_model.good_model = True + # wdir specific using new structure for regression handling + query = get_actuals_left_outer_join_with_predictions( + self.session, self.model.id, self.station_code, start_date, self.max_learn_date) + self.regression_models_v2.collect_data(query) + self.regression_models_v2.train() + def predict_temperature(self, model_temperature: float, timestamp: datetime): """ Predict the bias adjusted temperature for a given point in time, given a corresponding model temperature. @@ -255,9 +218,7 @@ def predict_wind_direction(self, model_wind_dir: int, timestamp: datetime): : return: The bias-adjusted wind direction as predicted by the linear regression model. """ hour = timestamp.hour - if self.regression_models[hour].wind_direction_wrapper.good_model and model_wind_dir is not None: - predicted_wind_dir = self.regression_models[hour].wind_direction_wrapper.model.predict([[model_wind_dir]])[ - 0] - # a valid wind direction value is between 0 and 360. If the returned value is outside these bounds, correct it - return predicted_wind_dir % 360 - return None + predicted_wind_dir = self.regression_models_v2._models[0].predict(hour, [[model_wind_dir]]) + if predicted_wind_dir is None: + return None + return predicted_wind_dir % 360 diff --git a/api/app/weather_models/regression_model.py b/api/app/weather_models/regression_model.py new file mode 100644 index 000000000..3a035399a --- /dev/null +++ b/api/app/weather_models/regression_model.py @@ -0,0 +1,128 @@ +import logging +import math +from sklearn.exceptions import NotFittedError +from sklearn.linear_model import LinearRegression +from typing import Dict, List, Protocol +from collections import defaultdict +from abc import abstractmethod +from app.db.models.observations import HourlyActual +from app.db.models.weather_models import ModelRunPrediction +from app.weather_models import construct_interpolated_noon_prediction +from app.weather_models.sample import Samples + +logger = logging.getLogger(__name__) + +# maps weather orm model keys to actual weather orm model keys +model_2_actual_keys: Dict[str, str] = { + "wdir_tgl_10": "wind_direction" +} + + +class RegressionModelProto(Protocol): + _key: str + _models: defaultdict[int, LinearRegression] + _samples: Samples + + @abstractmethod + def add_sample(self, + prediction: ModelRunPrediction, + actual: HourlyActual): raise NotImplementedError + + @abstractmethod + def train(self): raise NotImplementedError + + @abstractmethod + def predict(self, hour: int, model_wind_dir: List[List[int]]): raise NotImplementedError + + +class RegressionModel(RegressionModelProto): + """ + Default class to manage a regression dataset + """ + + def __init__(self, model_key: str): + self._key = model_key + self._models = defaultdict(LinearRegression) + self._samples = Samples() + + def train(self): + for hour in self._samples.hours(): + self._models[hour].fit(self._samples.np_x(hour), self._samples.np_y(hour)) + self._is_fitted = True + + def get_model_at_hour(self, hour: int): + return self._models[hour] + + def predict(self, hour: int, model_wind_dir: List[List[int]]): + try: + prediction = self._models[hour].predict(model_wind_dir) + logger.info("Predicted wind dir for model: %s, hour: %s, prediction: %s", self._key, hour, prediction) + return prediction[0] + except NotFittedError as _: + return None + + def add_sample(self, + prediction: ModelRunPrediction, + actual: HourlyActual): + """ Add a sample, interpolating the model values spatially """ + + model_value = getattr(prediction, self._key) + actual_value = getattr(actual, model_2_actual_keys[self._key]) + + logger.info('adding sample for %s->%s with: model_values %s, actual_value: %s', + self._key, self._key, model_value, actual_value) + + if model_value is not None: + if actual_value is None or math.isnan(actual_value): + # If for whatever reason we don't have an actual value, we skip this one. + logger.warning('no actual value for model key: %s, actual key: %s', + self._key, model_2_actual_keys[self._key]) + return + + # Add to the data we're going to learn from: + # Using two variables, the interpolated temperature value, and the hour of the day. + self._samples.append_x(model_value, actual.weather_date) + self._samples.append_y(actual_value, actual.weather_date) + + +class RegressionModelsV2: + """ Class for storing regression models. + TODO: migrate other models to this once wind direction is verified + """ + + def __init__(self): + self._model_keys: List[str] = list(model_2_actual_keys.keys()) + self._models: List[RegressionModelProto] = [ + RegressionModel(model_key=self._model_keys[0]) + ] + + def add_samples(self, prediction: ModelRunPrediction, actual: HourlyActual): + for model in self._models: + model.add_sample(prediction, actual) + + def collect_data(self, query): + # We need to keep track of previous so that we can do interpolation for the global model. + prev_actual = None + prev_prediction = None + for actual, prediction in query: + if prev_actual != actual and prediction is not None: + if (prev_actual is not None + and prev_prediction is not None + and prev_actual.weather_date.hour == 20 + and prediction.prediction_timestamp.hour == 21 + and prev_prediction.prediction_timestamp.hour == 18): + # If there's a gap in the data (like with the GLOBAL model) - then make up + # a noon prediction using interpolation, and add it as a sample. + noon_prediction = construct_interpolated_noon_prediction( + prev_prediction, prediction, self._model_keys) + self.add_samples(noon_prediction, prev_actual) + + self.add_samples(prediction, actual) + prev_prediction = prediction + prev_actual = actual + + def train(self): + # iterate through the data, creating a regression model for each variable + # and each hour. + for model in self._models: + model.train() diff --git a/api/app/weather_models/sample.py b/api/app/weather_models/sample.py new file mode 100644 index 000000000..1407639ad --- /dev/null +++ b/api/app/weather_models/sample.py @@ -0,0 +1,52 @@ +import logging +import numpy as np +from datetime import datetime +from collections import defaultdict + +logger = logging.getLogger(__name__) + + +class Samples: + """ Class for storing samples in buckets of hours. + e.g. a temperature sample consists of an x axis (predicted values) and a y axis (observed values) put + together in hour buckets. + """ + + def __init__(self): + self._x = defaultdict(list) + self._y = defaultdict(list) + + def hours(self): + """ Return all the hours used to bucket samples together. """ + return self._x.keys() + + def append_x(self, value, timestamp: datetime): + """ Append another predicted value. """ + self._x[timestamp.hour].append(value) + + def append_y(self, value, timestamp: datetime): + """ Append another observered values. """ + self._y[timestamp.hour].append(value) + + def np_x(self, hour): + """ Return numpy array of the predicted values, reshaped appropriately. """ + return np.array(self._x[hour]).reshape((-1, 1)) + + def np_y(self, hour): + """ Return a numpy array of the observed values """ + return np.array(self._y[hour]) + + def add_sample(self, + model_value: float, + actual_value: float, + timestamp: datetime, + model_key: str, + sample_key: str): + """ Add a sample, interpolating the model values spatially """ + # Additional logging to assist with finding errors: + logger.info('adding sample for %s->%s with: model_values %s, actual_value: %s', + model_key, sample_key, model_value, actual_value) + # Add to the data we're going to learn from: + # Using two variables, the interpolated temperature value, and the hour of the day. + self.append_x(model_value, timestamp) + self.append_y(actual_value, timestamp) diff --git a/web/src/features/moreCast2/components/MoreCast2Column.tsx b/web/src/features/moreCast2/components/MoreCast2Column.tsx index 5483e0cfc..9ee8ac2a8 100644 --- a/web/src/features/moreCast2/components/MoreCast2Column.tsx +++ b/web/src/features/moreCast2/components/MoreCast2Column.tsx @@ -109,7 +109,7 @@ export class IndeterminateField implements ColDefGenerator, ForecastColDefGenera export const tempForecastField = new IndeterminateField('temp', 'Temp', 'number', 1, true) export const rhForecastField = new IndeterminateField('rh', 'RH', 'number', 0, true) -export const windDirForecastField = new IndeterminateField('windDirection', 'Wind Dir', 'number', 0, false) +export const windDirForecastField = new IndeterminateField('windDirection', 'Wind Dir', 'number', 0, true) export const windSpeedForecastField = new IndeterminateField('windSpeed', 'Wind Speed', 'number', 1, true) export const precipForecastField = new IndeterminateField('precip', 'Precip', 'number', 1, false) export const buiField = new IndeterminateField('bui', 'BUI', 'number', 0, false)