diff --git a/src/emhass/command_line.py b/src/emhass/command_line.py index e6940518..bafb84f2 100644 --- a/src/emhass/command_line.py +++ b/src/emhass/command_line.py @@ -240,9 +240,9 @@ def set_input_data_dict( return False df_input_data = rh.df_final.copy() - elif set_type == "regressor-model-fit": + elif set_type == "regressor-model-fit" or set_type == "regressor-model-predict": - df_input_data_dayahead = None + df_input_data, df_input_data_dayahead = None, None P_PV_forecast, P_load_forecast = None, None params = json.loads(params) days_list = None @@ -250,7 +250,13 @@ def set_input_data_dict( features = params["passed_data"]["features"] target = params["passed_data"]["target"] timestamp = params["passed_data"]["timestamp"] - filename_path = pathlib.Path(base_path) / csv_file + if get_data_from_file: + base_path = base_path + "/data" + filename_path = pathlib.Path(base_path) / csv_file + + else: + filename_path = pathlib.Path(base_path) / csv_file + if filename_path.is_file(): df_input_data = pd.read_csv(filename_path, parse_dates=True) @@ -266,13 +272,8 @@ def set_input_data_dict( if not set(required_columns).issubset(df_input_data.columns): logger.error("The cvs file does not contain the required columns.") raise ValueError( - f"CSV file should contain the following columns: {', '.join(required_columns)}" + f"CSV file should contain the following columns: {', '.join(required_columns)}", ) - elif set_type == "regressor-model-predict": - df_input_data, df_input_data_dayahead = None, None - P_PV_forecast, P_load_forecast = None, None - days_list = None - params = json.loads(params) elif set_type == "publish-data": df_input_data, df_input_data_dayahead = None, None @@ -280,7 +281,7 @@ def set_input_data_dict( days_list = None else: logger.error( - "The passed action argument and hence the set_type parameter for setup is not valid" + "The passed action argument and hence the set_type parameter for setup is not valid", ) df_input_data, df_input_data_dayahead = None, None P_PV_forecast, P_load_forecast = None, None @@ -541,7 +542,7 @@ def forecast_model_predict( mlf = pickle.load(inp) else: logger.error( - "The ML forecaster file was not found, please run a model fit method before this predict method" + "The ML forecaster file was not found, please run a model fit method before this predict method", ) return # Make predictions @@ -629,7 +630,7 @@ def forecast_model_tune( mlf = pickle.load(inp) else: logger.error( - "The ML forecaster file was not found, please run a model fit method before this tune method" + "The ML forecaster file was not found, please run a model fit method before this tune method", ) return None, None # Tune the model @@ -643,7 +644,9 @@ def forecast_model_tune( def regressor_model_fit( - input_data_dict: dict, logger: logging.Logger, debug: Optional[bool] = False + input_data_dict: dict, + logger: logging.Logger, + debug: Optional[bool] = False, ) -> None: """Perform a forecast model fit from training data retrieved from Home Assistant. @@ -662,9 +665,16 @@ def regressor_model_fit( timestamp = input_data_dict["params"]["passed_data"]["timestamp"] date_features = input_data_dict["params"]["passed_data"]["date_features"] root = input_data_dict["root"] + # The MLRegressor object mlr = MLRegressor( - data, model_type, regression_model, features, target, timestamp, logger + data, + model_type, + regression_model, + features, + target, + timestamp, + logger, ) # Fit the ML model mlr.fit(date_features=date_features) @@ -673,10 +683,14 @@ def regressor_model_fit( filename = model_type + "_mlr.pkl" with open(pathlib.Path(root) / filename, "wb") as outp: pickle.dump(mlr, outp, pickle.HIGHEST_PROTOCOL) + return mlr def regressor_model_predict( - input_data_dict: dict, logger: logging.Logger, debug: Optional[bool] = False + input_data_dict: dict, + logger: logging.Logger, + debug: Optional[bool] = False, + mlr: Optional[MLRegressor] = None, ) -> None: """Perform a prediction from csv file. @@ -697,7 +711,7 @@ def regressor_model_predict( mlr = pickle.load(inp) else: logger.error( - "The ML forecaster file was not found, please run a model fit method before this predict method" + "The ML forecaster file was not found, please run a model fit method before this predict method", ) return new_values = input_data_dict["params"]["passed_data"]["new_values"] @@ -715,14 +729,16 @@ def regressor_model_predict( ] # Publish prediction idx = 0 - input_data_dict["rh"].post_data( - prediction, - idx, - mlr_predict_entity_id, - mlr_predict_unit_of_measurement, - mlr_predict_friendly_name, - type_var="mlregressor", - ) + if not debug: + input_data_dict["rh"].post_data( + prediction, + idx, + mlr_predict_entity_id, + mlr_predict_unit_of_measurement, + mlr_predict_friendly_name, + type_var="mlregressor", + ) + return prediction def publish_data( @@ -813,7 +829,7 @@ def publish_data( if "P_deferrable{}".format(k) not in opt_res_latest.columns: logger.error( "P_deferrable{}".format(k) - + " was not found in results DataFrame. Optimization task may need to be relaunched or it did not converge to a solution." + + " was not found in results DataFrame. Optimization task may need to be relaunched or it did not converge to a solution.", ) else: input_data_dict["rh"].post_data( @@ -830,7 +846,7 @@ def publish_data( if input_data_dict["opt"].optim_conf["set_use_battery"]: if "P_batt" not in opt_res_latest.columns: logger.error( - "P_batt was not found in results DataFrame. Optimization task may need to be relaunched or it did not converge to a solution." + "P_batt was not found in results DataFrame. Optimization task may need to be relaunched or it did not converge to a solution.", ) else: custom_batt_forecast_id = params["passed_data"]["custom_batt_forecast_id"] @@ -886,7 +902,7 @@ def publish_data( if "optim_status" not in opt_res_latest: opt_res_latest["optim_status"] = "Optimal" logger.warning( - "no optim_status in opt_res_latest, run an optimization task first" + "no optim_status in opt_res_latest, run an optimization task first", ) input_data_dict["rh"].post_data( opt_res_latest["optim_status"], @@ -957,7 +973,9 @@ def main(): naive-mpc-optim, publish-data, forecast-model-fit, forecast-model-predict, forecast-model-tune", ) parser.add_argument( - "--config", type=str, help="Define path to the config.yaml file" + "--config", + type=str, + help="Define path to the config.yaml file", ) parser.add_argument( "--costfun", @@ -984,7 +1002,10 @@ def main(): help="Pass runtime optimization parameters as dictionnary", ) parser.add_argument( - "--debug", type=strtobool, default="False", help="Use True for testing purposes" + "--debug", + type=strtobool, + default="False", + help="Use True for testing purposes", ) args = parser.parse_args() # The path to the configuration files @@ -995,12 +1016,14 @@ def main(): # Additionnal argument try: parser.add_argument( - "--version", action="version", version="%(prog)s " + version("emhass") + "--version", + action="version", + version="%(prog)s " + version("emhass"), ) args = parser.parse_args() except Exception: logger.info( - "Version not found for emhass package. Or importlib exited with PackageNotFoundError." + "Version not found for emhass package. Or importlib exited with PackageNotFoundError.", ) # Setup parameters input_data_dict = set_input_data_dict( @@ -1040,7 +1063,25 @@ def main(): else: mlf = None df_pred_optim, mlf = forecast_model_tune( - input_data_dict, logger, debug=args.debug, mlf=mlf + input_data_dict, + logger, + debug=args.debug, + mlf=mlf, + ) + opt_res = None + elif args.action == "regressor-model-fit": + mlr = regressor_model_fit(input_data_dict, logger, debug=args.debug) + opt_res = None + elif args.action == "regressor-model-predict": + if args.debug: + mlr = regressor_model_fit(input_data_dict, logger, debug=args.debug) + else: + mlr = None + prediction = regressor_model_predict( + input_data_dict, + logger, + debug=args.debug, + mlr=mlr, ) opt_res = None elif args.action == "publish-data": @@ -1063,6 +1104,10 @@ def main(): return df_fit_pred, df_fit_pred_backtest, mlf elif args.action == "forecast-model-predict": return df_pred + elif args.action == "regressor-model-fit": + return mlr + elif args.action == "regressor-model-predict": + return prediction elif args.action == "forecast-model-tune": return df_pred_optim, mlf diff --git a/tests/test_command_line_utils.py b/tests/test_command_line_utils.py index d23aeb06..597b20e7 100644 --- a/tests/test_command_line_utils.py +++ b/tests/test_command_line_utils.py @@ -5,10 +5,21 @@ from unittest.mock import patch import pandas as pd import pathlib, json, yaml, copy +import numpy as np from emhass.command_line import set_input_data_dict -from emhass.command_line import perfect_forecast_optim, dayahead_forecast_optim, naive_mpc_optim -from emhass.command_line import forecast_model_fit, forecast_model_predict, forecast_model_tune +from emhass.command_line import ( + perfect_forecast_optim, + dayahead_forecast_optim, + naive_mpc_optim, +) +from emhass.command_line import ( + forecast_model_fit, + forecast_model_predict, + forecast_model_tune, + regressor_model_fit, + regressor_model_predict, +) from emhass.command_line import publish_data from emhass.command_line import main from emhass import utils @@ -316,46 +327,183 @@ def test_forecast_model_fit_predict_tune(self): self.assertIsInstance(df_pred, pd.Series) self.assertTrue(df_pred.isnull().sum().sum() == 0) # Test the tune method - df_pred_optim, mlf = forecast_model_tune(input_data_dict, logger, debug=True, mlf=mlf) + df_pred_optim, mlf = forecast_model_tune( + input_data_dict, logger, debug=True, mlf=mlf + ) self.assertIsInstance(df_pred_optim, pd.DataFrame) self.assertTrue(mlf.is_tuned == True) - # Test ijection_dict for tune method on webui + # Test injection_dict for tune method on webui injection_dict = utils.get_injection_dict_forecast_model_tune(df_fit_pred, mlf) self.assertIsInstance(injection_dict, dict) - self.assertIsInstance(injection_dict['figure_0'], str) - - @patch('sys.argv', ['main', '--action', 'test', '--config', str(pathlib.Path(root+'/config_emhass.yaml')), - '--debug', 'True']) + self.assertIsInstance(injection_dict["figure_0"], str) + + def test_regressor_model_fit_predict(self): + config_path = pathlib.Path(root + "/config_emhass.yaml") + base_path = str(config_path.parent) + costfun = "profit" + action = "regressor-model-fit" # fit and predict methods + params = TestCommandLineUtils.get_test_params() + runtimeparams = { + "csv_file": "prediction.csv", + "features": ["dd", "solar"], + "target": "hour", + "regression_model": "AdaBoostRegression", + "model_type": "heating_dd", + "timestamp": "timestamp", + "date_features": ["month", "day_of_week"], + "mlr_predict_entity_id": "sensor.predicted_hours_test", + "mlr_predict_unit_of_measurement": "h", + "mlr_predict_friendly_name": "Predicted hours", + "new_values": [12.79, 4.766, 1, 2], + } + runtimeparams_json = json.dumps(runtimeparams) + params_json = json.dumps(params) + input_data_dict = set_input_data_dict( + config_path, + base_path, + costfun, + params_json, + runtimeparams_json, + action, + logger, + get_data_from_file=True, + ) + self.assertTrue( + input_data_dict["params"]["passed_data"]["model_type"] == "heating_dd", + ) + self.assertTrue( + input_data_dict["params"]["passed_data"]["regression_model"] + == "AdaBoostRegression", + ) + self.assertTrue( + input_data_dict["params"]["passed_data"]["csv_file"] == "prediction.csv", + ) + mlr = regressor_model_fit(input_data_dict, logger, debug=True) + + # def test_regressor_model_predict(self): + config_path = pathlib.Path(root + "/config_emhass.yaml") + base_path = str(config_path.parent) # + "/data" + costfun = "profit" + action = "regressor-model-predict" # predict methods + params = TestCommandLineUtils.get_test_params() + runtimeparams = { + "csv_file": "prediction.csv", + "features": ["dd", "solar"], + "target": "hour", + "regression_model": "AdaBoostRegression", + "model_type": "heating_dd", + "timestamp": "timestamp", + "date_features": ["month", "day_of_week"], + "mlr_predict_entity_id": "sensor.predicted_hours_test", + "mlr_predict_unit_of_measurement": "h", + "mlr_predict_friendly_name": "Predicted hours", + "new_values": [12.79, 4.766, 1, 2], + } + runtimeparams_json = json.dumps(runtimeparams) + params["passed_data"] = runtimeparams + params_json = json.dumps(params) + + input_data_dict = set_input_data_dict( + config_path, + base_path, + costfun, + params_json, + runtimeparams_json, + action, + logger, + get_data_from_file=True, + ) + self.assertTrue( + input_data_dict["params"]["passed_data"]["model_type"] == "heating_dd", + ) + self.assertTrue( + input_data_dict["params"]["passed_data"]["mlr_predict_friendly_name"] + == "Predicted hours", + ) + + regressor_model_predict(input_data_dict, logger, debug=True, mlr=mlr) + + @patch( + "sys.argv", + [ + "main", + "--action", + "test", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--debug", + "True", + ], + ) def test_main_wrong_action(self): opt_res = main() self.assertEqual(opt_res, None) - - @patch('sys.argv', ['main', '--action', 'perfect-optim', '--config', str(pathlib.Path(root+'/config_emhass.yaml')), - '--debug', 'True']) + + @patch( + "sys.argv", + [ + "main", + "--action", + "perfect-optim", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--debug", + "True", + ], + ) def test_main_perfect_forecast_optim(self): opt_res = main() self.assertIsInstance(opt_res, pd.DataFrame) - self.assertTrue(opt_res.isnull().sum().sum()==0) + self.assertTrue(opt_res.isnull().sum().sum() == 0) self.assertIsInstance(opt_res.index, pd.core.indexes.datetimes.DatetimeIndex) - self.assertIsInstance(opt_res.index.dtype, pd.core.dtypes.dtypes.DatetimeTZDtype) - + self.assertIsInstance( + opt_res.index.dtype, + pd.core.dtypes.dtypes.DatetimeTZDtype, + ) + def test_main_dayahead_forecast_optim(self): - with patch('sys.argv', ['main', '--action', 'dayahead-optim', '--config', str(pathlib.Path(root+'/config_emhass.yaml')), - '--params', self.params_json, '--runtimeparams', self.runtimeparams_json, - '--debug', 'True']): + with patch( + "sys.argv", + [ + "main", + "--action", + "dayahead-optim", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--params", + self.params_json, + "--runtimeparams", + self.runtimeparams_json, + "--debug", + "True", + ], + ): opt_res = main() self.assertIsInstance(opt_res, pd.DataFrame) - self.assertTrue(opt_res.isnull().sum().sum()==0) - + self.assertTrue(opt_res.isnull().sum().sum() == 0) + def test_main_naive_mpc_optim(self): - with patch('sys.argv', ['main', '--action', 'naive-mpc-optim', '--config', str(pathlib.Path(root+'/config_emhass.yaml')), - '--params', self.params_json, '--runtimeparams', self.runtimeparams_json, - '--debug', 'True']): + with patch( + "sys.argv", + [ + "main", + "--action", + "naive-mpc-optim", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--params", + self.params_json, + "--runtimeparams", + self.runtimeparams_json, + "--debug", + "True", + ], + ): opt_res = main() self.assertIsInstance(opt_res, pd.DataFrame) - self.assertTrue(opt_res.isnull().sum().sum()==0) - self.assertTrue(len(opt_res)==10) - + self.assertTrue(opt_res.isnull().sum().sum() == 0) + self.assertTrue(len(opt_res) == 10) + def test_main_forecast_model_fit(self): params = copy.deepcopy(json.loads(self.params_json)) runtimeparams = { @@ -386,20 +534,33 @@ def test_main_forecast_model_predict(self): "var_model": "sensor.power_load_no_var_loads", "sklearn_model": "KNeighborsRegressor", "num_lags": 48, - "split_date_delta": '48h', - "perform_backtest": False + "split_date_delta": "48h", + "perform_backtest": False, } runtimeparams_json = json.dumps(runtimeparams) - params['passed_data'] = runtimeparams - params['optim_conf']['load_forecast_method'] = 'skforecast' + params["passed_data"] = runtimeparams + params["optim_conf"]["load_forecast_method"] = "skforecast" params_json = json.dumps(params) - with patch('sys.argv', ['main', '--action', 'forecast-model-predict', '--config', str(pathlib.Path(root+'/config_emhass.yaml')), - '--params', params_json, '--runtimeparams', runtimeparams_json, - '--debug', 'True']): + with patch( + "sys.argv", + [ + "main", + "--action", + "forecast-model-predict", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--params", + params_json, + "--runtimeparams", + runtimeparams_json, + "--debug", + "True", + ], + ): df_pred = main() self.assertIsInstance(df_pred, pd.Series) self.assertTrue(df_pred.isnull().sum().sum() == 0) - + def test_main_forecast_model_tune(self): params = copy.deepcopy(json.loads(self.params_json)) runtimeparams = { @@ -408,27 +569,118 @@ def test_main_forecast_model_tune(self): "var_model": "sensor.power_load_no_var_loads", "sklearn_model": "KNeighborsRegressor", "num_lags": 48, - "split_date_delta": '48h', - "perform_backtest": False + "split_date_delta": "48h", + "perform_backtest": False, } runtimeparams_json = json.dumps(runtimeparams) - params['passed_data'] = runtimeparams - params['optim_conf']['load_forecast_method'] = 'skforecast' + params["passed_data"] = runtimeparams + params["optim_conf"]["load_forecast_method"] = "skforecast" params_json = json.dumps(params) - with patch('sys.argv', ['main', '--action', 'forecast-model-tune', '--config', str(pathlib.Path(root+'/config_emhass.yaml')), - '--params', params_json, '--runtimeparams', runtimeparams_json, - '--debug', 'True']): + with patch( + "sys.argv", + [ + "main", + "--action", + "forecast-model-tune", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--params", + params_json, + "--runtimeparams", + runtimeparams_json, + "--debug", + "True", + ], + ): df_pred_optim, mlf = main() self.assertIsInstance(df_pred_optim, pd.DataFrame) self.assertTrue(mlf.is_tuned == True) - - @patch('sys.argv', ['main', '--action', 'publish-data', '--config', str(pathlib.Path(root+'/config_emhass.yaml')), - '--debug', 'True']) + + def test_main_regressor_model_fit(self): + params = copy.deepcopy(json.loads(self.params_json)) + runtimeparams = { + "csv_file": "prediction.csv", + "features": ["dd", "solar"], + "target": "hour", + "regression_model": "AdaBoostRegression", + "model_type": "heating_dd", + "timestamp": "timestamp", + "date_features": ["month", "day_of_week"], + } + runtimeparams_json = json.dumps(runtimeparams) + params["passed_data"] = runtimeparams + params_json = json.dumps(params) + with patch( + "sys.argv", + [ + "main", + "--action", + "regressor-model-fit", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--params", + params_json, + "--runtimeparams", + runtimeparams_json, + "--debug", + "True", + ], + ): + mlr = main() + + def test_main_regressor_model_predict(self): + params = copy.deepcopy(json.loads(self.params_json)) + runtimeparams = { + "csv_file": "prediction.csv", + "features": ["dd", "solar"], + "target": "hour", + "regression_model": "AdaBoostRegression", + "model_type": "heating_dd", + "timestamp": "timestamp", + "date_features": ["month", "day_of_week"], + "new_values": [12.79, 4.766, 1, 2], + } + runtimeparams_json = json.dumps(runtimeparams) + params["passed_data"] = runtimeparams + params["optim_conf"]["load_forecast_method"] = "skforecast" + params_json = json.dumps(params) + with patch( + "sys.argv", + [ + "main", + "--action", + "regressor-model-predict", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--params", + params_json, + "--runtimeparams", + runtimeparams_json, + "--debug", + "True", + ], + ): + prediction = main() + self.assertIsInstance(prediction, np.ndarray) + + @patch( + "sys.argv", + [ + "main", + "--action", + "publish-data", + "--config", + str(pathlib.Path(root + "/config_emhass.yaml")), + "--debug", + "True", + ], + ) def test_main_publish_data(self): opt_res = main() - self.assertTrue(opt_res==None) - -if __name__ == '__main__': + self.assertTrue(opt_res == None) + + +if __name__ == "__main__": unittest.main() ch.close() logger.removeHandler(ch) diff --git a/tests/test_machine_learning_regressor.py b/tests/test_machine_learning_regressor.py new file mode 100644 index 00000000..88137b0d --- /dev/null +++ b/tests/test_machine_learning_regressor.py @@ -0,0 +1,113 @@ +"""Machine learning regressor test module.""" + +import copy +import json +import pathlib +import unittest + +import numpy as np +import pandas as pd +from sklearn.pipeline import Pipeline +import yaml +from emhass import utils +from emhass.command_line import set_input_data_dict +from emhass.machine_learning_regressor import MLRegressor +from sklearn.ensemble import ( + AdaBoostRegressor, +) + +# the root folder +root = str(utils.get_root(__file__, num_parent=2)) +# create logger +logger, ch = utils.get_logger(__name__, root, save_to_file=False) + + +class TestMLRegressor(unittest.TestCase): + @staticmethod + def get_test_params(): + with open(root + "/config_emhass.yaml", "r") as file: + params = yaml.load(file, Loader=yaml.FullLoader) + params.update( + { + "params_secrets": { + "hass_url": "http://supervisor/core/api", + "long_lived_token": "${SUPERVISOR_TOKEN}", + "time_zone": "Europe/Paris", + "lat": 45.83, + "lon": 6.86, + "alt": 8000.0, + }, + }, + ) + return params + + def setUp(self): + params = TestMLRegressor.get_test_params() + params_json = json.dumps(params) + config_path = pathlib.Path(root + "/config_emhass.yaml") + base_path = str(config_path.parent) # + "/data" + costfun = "profit" + action = "regressor-model-fit" # fit and predict methods + params = copy.deepcopy(json.loads(params_json)) + runtimeparams = { + "csv_file": "prediction.csv", + "features": ["dd", "solar"], + "target": "hour", + "regression_model": "AdaBoostRegression", + "model_type": "heating_dd", + "timestamp": "timestamp", + "date_features": ["month", "day_of_week"], + "new_values": [12.79, 4.766, 1, 2], + } + runtimeparams_json = json.dumps(runtimeparams) + params["passed_data"] = runtimeparams + params["optim_conf"]["load_forecast_method"] = "skforecast" + params_json = json.dumps(params) + self.input_data_dict = set_input_data_dict( + config_path, + base_path, + costfun, + params_json, + runtimeparams_json, + action, + logger, + get_data_from_file=True, + ) + data = copy.deepcopy(self.input_data_dict["df_input_data"]) + self.assertIsInstance(data, pd.DataFrame) + self.csv_file = self.input_data_dict["params"]["passed_data"]["csv_file"] + features = self.input_data_dict["params"]["passed_data"]["features"] + target = self.input_data_dict["params"]["passed_data"]["target"] + regression_model = self.input_data_dict["params"]["passed_data"][ + "regression_model" + ] + model_type = self.input_data_dict["params"]["passed_data"]["model_type"] + timestamp = self.input_data_dict["params"]["passed_data"]["timestamp"] + self.date_features = self.input_data_dict["params"]["passed_data"][ + "date_features" + ] + self.new_values = self.input_data_dict["params"]["passed_data"]["new_values"] + self.mlr = MLRegressor( + data, + model_type, + regression_model, + features, + target, + timestamp, + logger, + ) + + def test_fit(self): + self.mlr.fit(self.date_features) + self.assertIsInstance(self.mlr.model, Pipeline) + + def test_predict(self): + self.mlr.fit(self.date_features) + predictions = self.mlr.predict(self.new_values) + self.assertIsInstance(predictions, np.ndarray) + + +if __name__ == "__main__": + unittest.main() + ch.close() + logger.removeHandler(ch)