From 1d7b4f6b56b1e935244729f697d31f37c3e56d27 Mon Sep 17 00:00:00 2001 From: Jineet Desai Date: Tue, 19 Sep 2023 00:43:24 -0400 Subject: [PATCH 1/4] Adding support for Sklearn linear regression in EvaDB --- evadb/binder/statement_binder.py | 12 +++++ evadb/executor/create_function_executor.py | 51 +++++++++++++++++++ evadb/functions/sklearn.py | 39 ++++++++++++++ evadb/utils/generic_utils.py | 11 ++++ .../long/test_model_train.py | 17 +++++++ 5 files changed, 130 insertions(+) create mode 100644 evadb/functions/sklearn.py diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index 4df16bc72f..c9b493dcd5 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -106,6 +106,18 @@ def _bind_create_function_statement(self, node: CreateFunctionStatement): outputs.append(column) else: inputs.append(column) + elif string_comparison_case_insensitive(node.function_type, "sklearn"): + assert ( + "predict" in arg_map + ), f"Creating {node.function_type} functions expects 'predict' metadata." + # We only support a single predict column for now + predict_columns = set([arg_map["predict"]]) + for column in all_column_list: + if column.name in predict_columns: + column.name = column.name + "_predictions" + outputs.append(column) + else: + inputs.append(column) elif string_comparison_case_insensitive(node.function_type, "forecasting"): # Forecasting models have only one input column which is horizon inputs = [ColumnDefinition("horizon", ColumnType.INTEGER, None, None)] diff --git a/evadb/executor/create_function_executor.py b/evadb/executor/create_function_executor.py index 004f784a3b..939984460d 100644 --- a/evadb/executor/create_function_executor.py +++ b/evadb/executor/create_function_executor.py @@ -40,6 +40,7 @@ string_comparison_case_insensitive, try_to_import_forecast, try_to_import_ludwig, + try_to_import_sklearn, try_to_import_torch, try_to_import_ultralytics, ) @@ -117,6 +118,48 @@ def handle_ludwig_function(self): self.node.metadata, ) + def handle_sklearn_function(self): + """Handle sklearn functions + + Use Sklearn's regression to train models. + """ + try_to_import_sklearn() + from sklearn.linear_model import LinearRegression + + assert ( + len(self.children) == 1 + ), "Create sklearn function expects 1 child, finds {}.".format( + len(self.children) + ) + + aggregated_batch_list = [] + child = self.children[0] + for batch in child.exec(): + aggregated_batch_list.append(batch) + aggregated_batch = Batch.concat(aggregated_batch_list, copy=False) + aggregated_batch.drop_column_alias() + + arg_map = {arg.key: arg.value for arg in self.node.metadata} + model = LinearRegression() + model.fit(X=aggregated_batch.frames, y=arg_map["predict"]) + model_path = os.path.join( + self.db.config.get_value("storage", "model_dir"), self.node.name + ) + pickle.dump(model, open(model_path, "rb")) + self.node.metadata.append( + FunctionMetadataCatalogEntry("model_path", model_path) + ) + + impl_path = Path(f"{self.function_dir}/sklearn.py").absolute().as_posix() + io_list = self._resolve_function_io(None) + return ( + self.node.name, + impl_path, + self.node.function_type, + io_list, + self.node.metadata, + ) + def handle_ultralytics_function(self): """Handle Ultralytics functions""" try_to_import_ultralytics() @@ -332,6 +375,14 @@ def exec(self, *args, **kwargs): io_list, metadata, ) = self.handle_ludwig_function() + elif string_comparison_case_insensitive(self.node.function_type, "Sklearn"): + ( + name, + impl_path, + function_type, + io_list, + metadata, + ) = self.handle_sklearn_function() elif string_comparison_case_insensitive(self.node.function_type, "Forecasting"): ( name, diff --git a/evadb/functions/sklearn.py b/evadb/functions/sklearn.py new file mode 100644 index 0000000000..d88fbe9c52 --- /dev/null +++ b/evadb/functions/sklearn.py @@ -0,0 +1,39 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pickle + +import pandas as pd + +from evadb.functions.abstract.abstract_function import AbstractFunction +from evadb.utils.generic_utils import try_to_import_sklearn + + +class GenericSklearnModel(AbstractFunction): + @property + def name(self) -> str: + return "GenericSklearnModel" + + def setup(self, model_path: str, **kwargs): + try_to_import_sklearn() + + self.model = pickle.load(open(model_path, "wb")) + + def forward(self, frames: pd.DataFrame) -> pd.DataFrame: + predictions, _ = self.model.predict(frames, return_type=pd.DataFrame) + return predictions + + def to_device(self, device: str): + # TODO figure out how to control the GPU for ludwig models + return self diff --git a/evadb/utils/generic_utils.py b/evadb/utils/generic_utils.py index e7836f1319..ff6a99f0f2 100644 --- a/evadb/utils/generic_utils.py +++ b/evadb/utils/generic_utils.py @@ -324,6 +324,17 @@ def is_forecast_available() -> bool: return False +def try_to_import_sklearn(): + try: + import sklearn # noqa: F401 + from sklearn.linear_model import LinearRegression # noqa: F401 + except ImportError: + raise ValueError( + """Could not import sklearn. + Please install it with `pip install scikit-learn`.""" + ) + + ############################## ## VISION ############################## diff --git a/test/integration_tests/long/test_model_train.py b/test/integration_tests/long/test_model_train.py index 55ae6da9c1..f9739028c0 100644 --- a/test/integration_tests/long/test_model_train.py +++ b/test/integration_tests/long/test_model_train.py @@ -72,6 +72,23 @@ def test_ludwig_automl(self): self.assertEqual(len(result.columns), 1) self.assertEqual(len(result), 10) + def test_sklearn_regression(self): + create_predict_function = """ + CREATE FUNCTION IF NOT EXISTS PredictHouseRent FROM + ( SELECT number_of_rooms, number_of_bathrooms, days_on_market, rental_price FROM HomeRentals ) + TYPE Sklearn + PREDICT 'rental_price' + TIME_LIMIT 120; + """ + execute_query_fetch_all(self.evadb, create_predict_function) + + predict_query = """ + SELECT PredictHouseRent(*) FROM HomeRentals LIMIT 10; + """ + result = execute_query_fetch_all(self.evadb, predict_query) + self.assertEqual(len(result.columns), 1) + self.assertEqual(len(result), 10) + if __name__ == "__main__": unittest.main() From b6cd4736058d35ffc63ea5e514b17f743ec3847d Mon Sep 17 00:00:00 2001 From: Jineet Desai Date: Thu, 21 Sep 2023 01:59:29 -0400 Subject: [PATCH 2/4] Fixing the part related to renaming of the columns. --- evadb/binder/statement_binder.py | 1 - evadb/executor/create_function_executor.py | 7 +++++-- evadb/functions/sklearn.py | 10 +++++++--- test/integration_tests/long/test_model_train.py | 2 +- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index c9b493dcd5..fb9dfc40c4 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -114,7 +114,6 @@ def _bind_create_function_statement(self, node: CreateFunctionStatement): predict_columns = set([arg_map["predict"]]) for column in all_column_list: if column.name in predict_columns: - column.name = column.name + "_predictions" outputs.append(column) else: inputs.append(column) diff --git a/evadb/executor/create_function_executor.py b/evadb/executor/create_function_executor.py index 939984460d..9da2a2d844 100644 --- a/evadb/executor/create_function_executor.py +++ b/evadb/executor/create_function_executor.py @@ -141,11 +141,14 @@ def handle_sklearn_function(self): arg_map = {arg.key: arg.value for arg in self.node.metadata} model = LinearRegression() - model.fit(X=aggregated_batch.frames, y=arg_map["predict"]) + print(aggregated_batch.frames) + Y = aggregated_batch.frames[arg_map["predict"]] + aggregated_batch.frames.drop([arg_map["predict"]], axis=1, inplace=True) + model.fit(X=aggregated_batch.frames, y=Y) model_path = os.path.join( self.db.config.get_value("storage", "model_dir"), self.node.name ) - pickle.dump(model, open(model_path, "rb")) + pickle.dump(model, open(model_path, "wb")) self.node.metadata.append( FunctionMetadataCatalogEntry("model_path", model_path) ) diff --git a/evadb/functions/sklearn.py b/evadb/functions/sklearn.py index d88fbe9c52..2c25e3c3f0 100644 --- a/evadb/functions/sklearn.py +++ b/evadb/functions/sklearn.py @@ -28,11 +28,15 @@ def name(self) -> str: def setup(self, model_path: str, **kwargs): try_to_import_sklearn() - self.model = pickle.load(open(model_path, "wb")) + self.model = pickle.load(open(model_path, "rb")) def forward(self, frames: pd.DataFrame) -> pd.DataFrame: - predictions, _ = self.model.predict(frames, return_type=pd.DataFrame) - return predictions + predictions = self.model.predict(frames.iloc[:, : -1]) + predict_df = pd.DataFrame(predictions) + # Assign the prediction column name same as that for data provided. + predict_df.rename(columns={0 : frames.columns[-1]}, inplace=True) + print(predict_df) + return predict_df def to_device(self, device: str): # TODO figure out how to control the GPU for ludwig models diff --git a/test/integration_tests/long/test_model_train.py b/test/integration_tests/long/test_model_train.py index f9739028c0..d4f7b63af8 100644 --- a/test/integration_tests/long/test_model_train.py +++ b/test/integration_tests/long/test_model_train.py @@ -83,7 +83,7 @@ def test_sklearn_regression(self): execute_query_fetch_all(self.evadb, create_predict_function) predict_query = """ - SELECT PredictHouseRent(*) FROM HomeRentals LIMIT 10; + SELECT PredictHouseRent(number_of_rooms, number_of_bathrooms, days_on_market, rental_price) FROM HomeRentals LIMIT 10; """ result = execute_query_fetch_all(self.evadb, predict_query) self.assertEqual(len(result.columns), 1) From 10981a3999bdb7b4b9e60da2c2d466df37b0bf47 Mon Sep 17 00:00:00 2001 From: Jineet Desai Date: Thu, 21 Sep 2023 14:14:11 -0400 Subject: [PATCH 3/4] Adding some basic lint fixes. --- evadb/executor/create_function_executor.py | 1 - evadb/functions/sklearn.py | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/evadb/executor/create_function_executor.py b/evadb/executor/create_function_executor.py index 9da2a2d844..682ba2f23b 100644 --- a/evadb/executor/create_function_executor.py +++ b/evadb/executor/create_function_executor.py @@ -141,7 +141,6 @@ def handle_sklearn_function(self): arg_map = {arg.key: arg.value for arg in self.node.metadata} model = LinearRegression() - print(aggregated_batch.frames) Y = aggregated_batch.frames[arg_map["predict"]] aggregated_batch.frames.drop([arg_map["predict"]], axis=1, inplace=True) model.fit(X=aggregated_batch.frames, y=Y) diff --git a/evadb/functions/sklearn.py b/evadb/functions/sklearn.py index 2c25e3c3f0..04f1c8928f 100644 --- a/evadb/functions/sklearn.py +++ b/evadb/functions/sklearn.py @@ -31,11 +31,10 @@ def setup(self, model_path: str, **kwargs): self.model = pickle.load(open(model_path, "rb")) def forward(self, frames: pd.DataFrame) -> pd.DataFrame: - predictions = self.model.predict(frames.iloc[:, : -1]) + predictions = self.model.predict(frames.iloc[:, :-1]) predict_df = pd.DataFrame(predictions) # Assign the prediction column name same as that for data provided. - predict_df.rename(columns={0 : frames.columns[-1]}, inplace=True) - print(predict_df) + predict_df.rename(columns={0: frames.columns[-1]}, inplace=True) return predict_df def to_device(self, device: str): From 934b5e1d2492c7a32ef5b4982b66b94d42f3a03d Mon Sep 17 00:00:00 2001 From: Jineet Desai Date: Fri, 22 Sep 2023 18:09:20 -0400 Subject: [PATCH 4/4] Add appropriate code comments. --- evadb/functions/sklearn.py | 7 ++++++- test/integration_tests/long/test_model_train.py | 3 +-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/evadb/functions/sklearn.py b/evadb/functions/sklearn.py index 04f1c8928f..ca3676f140 100644 --- a/evadb/functions/sklearn.py +++ b/evadb/functions/sklearn.py @@ -31,9 +31,14 @@ def setup(self, model_path: str, **kwargs): self.model = pickle.load(open(model_path, "rb")) def forward(self, frames: pd.DataFrame) -> pd.DataFrame: + # The last column is the predictor variable column. Hence we do not + # pass that column in the predict method for sklearn. predictions = self.model.predict(frames.iloc[:, :-1]) predict_df = pd.DataFrame(predictions) - # Assign the prediction column name same as that for data provided. + # We need to rename the column of the output dataframe. For this we + # shall rename it to the column name same as that of the last column of + # frames. This is because the last column of frames corresponds to the + # variable we want to predict. predict_df.rename(columns={0: frames.columns[-1]}, inplace=True) return predict_df diff --git a/test/integration_tests/long/test_model_train.py b/test/integration_tests/long/test_model_train.py index d4f7b63af8..8b0843c82a 100644 --- a/test/integration_tests/long/test_model_train.py +++ b/test/integration_tests/long/test_model_train.py @@ -77,8 +77,7 @@ def test_sklearn_regression(self): CREATE FUNCTION IF NOT EXISTS PredictHouseRent FROM ( SELECT number_of_rooms, number_of_bathrooms, days_on_market, rental_price FROM HomeRentals ) TYPE Sklearn - PREDICT 'rental_price' - TIME_LIMIT 120; + PREDICT 'rental_price'; """ execute_query_fetch_all(self.evadb, create_predict_function)