From 8911df7608f1329e3f23fbe6961862995c8a2201 Mon Sep 17 00:00:00 2001 From: amesar Date: Wed, 11 Oct 2023 01:25:30 +0000 Subject: [PATCH] Issue #17: First pass commit for Mini_MLOps_Pipeline UC notebooks --- .../Mini_MLOps_Pipeline_UC/01_Train_Model.py | 151 +++++++++++++ .../02_Register_Model.py | 176 +++++++++++++++ .../03a_Batch_Scoring.py | 89 ++++++++ .../03b_Inference-Autogenerated.py | 114 ++++++++++ .../04a_RT_Serving_Start.py | 102 +++++++++ .../04b_RT_Serving_Score.py | 59 +++++ .../04c_RT_Serving_Stop.py | 30 +++ .../Mini_MLOps_Pipeline_UC/_README.py | 41 ++++ .../Mini_MLOps_Pipeline_UC/includes/Common.py | 213 ++++++++++++++++++ .../includes/HttpClient.py | 89 ++++++++ .../includes/ModelServingClient.py | 56 +++++ 11 files changed, 1120 insertions(+) create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/01_Train_Model.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/02_Register_Model.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/03a_Batch_Scoring.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/03b_Inference-Autogenerated.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/04a_RT_Serving_Start.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/04b_RT_Serving_Score.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/04c_RT_Serving_Stop.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/_README.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/Common.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/HttpClient.py create mode 100644 databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/ModelServingClient.py diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/01_Train_Model.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/01_Train_Model.py new file mode 100644 index 0000000..0156170 --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/01_Train_Model.py @@ -0,0 +1,151 @@ +# Databricks notebook source +# MAGIC %md # Train Sklearn Model +# MAGIC +# MAGIC **Overview** +# MAGIC * Option to use [MLflow Autologging](https://docs.databricks.com/applications/mlflow/databricks-autologging.html). +# MAGIC * Train a Sklearn model several times with different `maxDepth` hyperparameter values. +# MAGIC * Runs will be in the notebook experiment. +# MAGIC * Algorithm: DecisionTreeRegressor. +# MAGIC +# MAGIC **Widgets** +# MAGIC * Autologging - yes or no. +# MAGIC * Delete existing runs - Deletes existing experiment runs before executing training runs. +# MAGIC * Delta table - If specified will read data from a Delta table. The table will be created if it doesn't already exist. Otherwise read wine quality data via HTTP download. + +# COMMAND ---------- + +# MAGIC %md ### Setup + +# COMMAND ---------- + +# MAGIC %run ./includes/Common + +# COMMAND ---------- + +dbutils.widgets.dropdown("1. Autologging","no",["yes","no"]) +autologging = dbutils.widgets.get("1. Autologging") == "yes" + +dbutils.widgets.dropdown("2. Delete existing runs","yes",["yes","no"]) +do_delete_runs = dbutils.widgets.get("2. Delete existing runs") == "yes" + +dbutils.widgets.text("3. Delta table", "") +delta_table = dbutils.widgets.get("3. Delta table") + +autologging, do_delete_runs +print("autologging:", autologging) +print("do_delete_runs:", do_delete_runs) +print("delta_table:", delta_table) + +# COMMAND ---------- + +experiment = init() + +# COMMAND ---------- + +# MAGIC %md ### Delete any existing runs + +# COMMAND ---------- + +if do_delete_runs: + delete_runs(experiment.experiment_id) + +# COMMAND ---------- + +# MAGIC %md ### Prepare data + +# COMMAND ---------- + +data = get_wine_quality_data(delta_table) +display(data) + +# COMMAND ---------- + +data.describe() + +# COMMAND ---------- + +from sklearn.model_selection import train_test_split + +train, test = train_test_split(data, test_size=0.30, random_state=42) +X_train = train.drop([_col_label], axis=1) +X_test = test.drop([_col_label], axis=1) +y_train = train[[_col_label]] +y_test = test[[_col_label]] + +# COMMAND ---------- + +# MAGIC %md ### Training Pipeline + +# COMMAND ---------- + +import numpy as np +from sklearn.tree import DecisionTreeRegressor +from sklearn.metrics import mean_squared_error + +# COMMAND ---------- + +# MAGIC %md #### Train with explicit calls to MLflow API + +# COMMAND ---------- + +from mlflow.models.signature import infer_signature + +# COMMAND ---------- + + +def train_no_autologging(max_depth): + import time + now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time())) + with mlflow.start_run(run_name=f"No_autolog") as run: + mlflow.set_tag("mlflow_version", mlflow.__version__) + mlflow.set_tag("experiment_name", experiment.name) + mlflow.log_param("max_depth", max_depth) + model = DecisionTreeRegressor(max_depth=max_depth) + model.fit(X_train, y_train) + predictions = model.predict(X_test) + rmse = np.sqrt(mean_squared_error(y_test, predictions)) + mlflow.log_metric("training_rmse", rmse) + print(f"rmse={rmse:5.3f} max_depth={max_depth:02d} run_id={run.info.run_id}") + signature = infer_signature(X_train, predictions) + mlflow.sklearn.log_model(model, "model", signature=signature) + +# COMMAND ---------- + +# MAGIC %md #### Train with MLflow Autologging + +# COMMAND ---------- + +def train_with_autologging(max_depth): + model = DecisionTreeRegressor(max_depth=max_depth) + model.fit(X_train, y_train) + predictions = model.predict(X_test) + rmse = np.sqrt(mean_squared_error(y_test, predictions)) + print(f"rmse={rmse:5.3f} max_depth={max_depth:02d}") + +# COMMAND ---------- + +# MAGIC %md ### Train model with different hyperparameters +# MAGIC * In a realistic scenario you would use hyperopt or similar. See: +# MAGIC * [Hyperopt concepts](https://docs.databricks.com/machine-learning/automl-hyperparam-tuning/hyperopt-concepts.html) +# MAGIC * [Hyperparameter tuning](https://docs.databricks.com/machine-learning/automl-hyperparam-tuning/index.html) + +# COMMAND ---------- + +max_depths = [1, 2, 4, 8, 16] +#max_depths = [1] + +for max_depth in max_depths: + if autologging: + train_with_autologging(max_depth) + else: + train_no_autologging(max_depth) + +# COMMAND ---------- + +display_experiment_uri(experiment) + +# COMMAND ---------- + +# MAGIC %md ### Next notebook +# MAGIC +# MAGIC Next go to the **[02_Register_Model]($02_Register_Model)** notebook. diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/02_Register_Model.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/02_Register_Model.py new file mode 100644 index 0000000..634dde4 --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/02_Register_Model.py @@ -0,0 +1,176 @@ +# Databricks notebook source +# MAGIC %md # Register Best Model in Model Registry +# MAGIC * Creates a new registered model if it doesn't already exist. +# MAGIC * Deletes all current model versions (optional). +# MAGIC * Finds the best model (lowest RMSE metric) generated from [01_Train_Model]($01_Train_Model) notebook experiment. +# MAGIC * Adds the best run as a registered model version and promotes it to the `production` stage. + +# COMMAND ---------- + +# MAGIC %md ### Setup + +# COMMAND ---------- + +# MAGIC %run ./includes/Common + +# COMMAND ---------- + +dbutils.widgets.text("1. Registered model","") +_model_name = dbutils.widgets.get("1. Registered model") +_write_model_name() + +dbutils.widgets.dropdown("2. Delete existing versions","yes",["yes","no"]) +delete_existing_versions = dbutils.widgets.get("2. Delete existing versions") == "yes" + +print("_model_name:", _model_name) +print("delete_existing_versions:", delete_existing_versions) + +# COMMAND ---------- + +assert_widget(_model_name, "1. Registered model") + +# COMMAND ---------- + +# MAGIC %md ### Display experiment + +# COMMAND ---------- + +experiment = mlflow_client.get_experiment_by_name(_experiment_name) +experiment_id = experiment.experiment_id +display_experiment_uri(experiment) + +# COMMAND ---------- + +# MAGIC %md ### Show all the RMSE metrics and max_depth params of all runs + +# COMMAND ---------- + +df = mlflow.search_runs(experiment_id, order_by=["metrics.rmse ASC"]) +df = df[["run_id","metrics.training_rmse","params.max_depth"]] +df = df.round({"metrics.training_rmse": 3}) +display(df) + +# COMMAND ---------- + +# MAGIC %md ### Find best run +# MAGIC +# MAGIC Search for the run with lowest RMSE metric. + +# COMMAND ---------- + +runs = mlflow_client.search_runs(experiment_id, order_by=["metrics.training_rmse ASC"], max_results=1) +best_run = runs[0] +best_run.info.run_id, round(best_run.data.metrics["training_rmse"],3), best_run.data.params + +# COMMAND ---------- + +# MAGIC %md ##### If you used a Delta table to train your model, its value is displayed here as in: +# MAGIC +# MAGIC `path=dbfs:/user/hive/warehouse/andre.db/wine_quality,version=0,format=delta` + +# COMMAND ---------- + +best_run.data.tags.get("sparkDatasourceInfo") + +# COMMAND ---------- + +# MAGIC %md ### Create registered model +# MAGIC +# MAGIC If model already exists remove all existing versions. + +# COMMAND ---------- + +from mlflow.exceptions import MlflowException, RestException +from mlflow.exceptions import MlflowException, RestException + +try: + registered_model = mlflow_client.get_registered_model(_model_name) + print(f"Found existing {_model_name}") + versions = mlflow_client.search_model_versions(f"name='{_model_name}'") + print(f"Found {len(versions)} versions") + if delete_existing_versions: + for vr in versions: + print(f" version={vr.version} status={vr.status} stage={vr.current_stage} run_id={vr.run_id}") + mlflow_client.delete_model_version(_model_name, vr.version) +except RestException as e: + print("INFO:",e) + if e.error_code == "RESOURCE_DOES_NOT_EXIST": + print(f"Creating {_model_name}") + registered_model = mlflow_client.create_registered_model(_model_name) + else: + raise RuntimeError(e) + +# COMMAND ---------- + +type(registered_model),registered_model.__dict__ + +# COMMAND ---------- + +display_registered_model_uri(_model_name) + +# COMMAND ---------- + +# MAGIC %md **Create the version** + +# COMMAND ---------- + +source = f"{best_run.info.artifact_uri}/model" +source + +# COMMAND ---------- + + mlflow_client.tracking_uri, mlflow_client._registry_uri, _model_name + +# COMMAND ---------- + +version = mlflow_client.create_model_version(_model_name, source, best_run.info.run_id) +type(version), version.__dict__ + +# COMMAND ---------- + +# MAGIC %md ### Promote best run version to Production stage + +# COMMAND ---------- + +# MAGIC %md **Wait until version is in READY status** + +# COMMAND ---------- + +_model_name + +# COMMAND ---------- + +wait_until_version_ready(_model_name, version, sleep_time=2) + +# COMMAND ---------- + +version = mlflow_client.get_model_version(_model_name, version.version) +version_id = version.version + +print("version.version:", version.version) +print("version.status:", version.status) +print("version.current_stage:", version.current_stage) +print("version.aliases:", version.aliases) +print("version.run_id:", version.run_id) + +# COMMAND ---------- + +# MAGIC %md **Transition to production stage** + +# COMMAND ---------- + +##mlflow_client.transition_model_version_stage(_model_name, version_id, "Production") +mlflow_client.set_registered_model_alias(_model_name, "production", version.version) + +# COMMAND ---------- + +version = mlflow_client.get_model_version(_model_name, version_id) +type(version), version.__dict__ + +# COMMAND ---------- + +# MAGIC %md ### Next notebook +# MAGIC +# MAGIC Now either go to: +# MAGIC * **[03a_Batch_Scoring]($03a_Batch_Scoring)** notebook for batch scoring +# MAGIC * **[04a_RT_Serving_Start ]($04a_RT_Serving_Start )** notebook for real-time model serving diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/03a_Batch_Scoring.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/03a_Batch_Scoring.py new file mode 100644 index 0000000..b8a8371 --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/03a_Batch_Scoring.py @@ -0,0 +1,89 @@ +# Databricks notebook source +# MAGIC %md #Batch Scoring +# MAGIC * Scores the best model run from the [01_Train_Model]($01_Train_Model) notebook. +# MAGIC * Uses the model URI: `models:/mini_mlops_pipeline/production`. +# MAGIC * Scores with native Sklearn, Pyfunc and UDF flavors. +# MAGIC * Sklearn and Pyfunc scoring is executed only on the driver node, whereas UDF scoring leverages all nodes of the cluster. + +# COMMAND ---------- + +# MAGIC %md ### Setup + +# COMMAND ---------- + +# MAGIC %run ./includes/Common + +# COMMAND ---------- + +# MAGIC %md ### Prepare scoring data +# MAGIC * Drop the label column. + +# COMMAND ---------- + +data = get_wine_quality_data() +data = data.drop([_col_label], axis=1) + +# COMMAND ---------- + +display(data) + +# COMMAND ---------- + +# MAGIC %md ### Build model URI using model alias `@a` + +# COMMAND ---------- + +_model_name + +# COMMAND ---------- + +model_uri = f"models:/{_model_name}@production" +model_uri + +# COMMAND ---------- + +# MAGIC %md ### Score with native Sklearn flavor +# MAGIC * Executes only on the driver node of the cluster. + +# COMMAND ---------- + +import pandas as pd +import mlflow.sklearn + +model = mlflow.sklearn.load_model(model_uri) +predictions = model.predict(data) +display(pd.DataFrame(predictions, columns=[_col_prediction])) + +# COMMAND ---------- + +# MAGIC %md ### Score with Pyfunc flavor +# MAGIC * Executes only on the driver node of the cluster. + +# COMMAND ---------- + +import mlflow.pyfunc + +model = mlflow.pyfunc.load_model(model_uri) +predictions = model.predict(data) +display(pd.DataFrame(predictions, columns=[_col_prediction])) + +# COMMAND ---------- + +# MAGIC %md ### Distributed scoring with UDF +# MAGIC * Executes on all worker nodes of the cluster. +# MAGIC * UDF wraps the Sklearn model. +# MAGIC * Pass a Spark dataframe to the UDF. +# MAGIC * The dataframe is split into multiple pieces and sent to each worker in the cluster for scoring. + +# COMMAND ---------- + +df = spark.createDataFrame(data) +udf = mlflow.pyfunc.spark_udf(spark, model_uri) +predictions = df.withColumn(_col_prediction, udf(*df.columns)) +display(predictions.select(_col_prediction)) + +# COMMAND ---------- + +# MAGIC %md ### Next notebook +# MAGIC +# MAGIC Optionally, go to the **[04a_RT_Serving_Start]($04a_RT_Serving_Start)** notebook for real-time scoring. diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/03b_Inference-Autogenerated.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/03b_Inference-Autogenerated.py new file mode 100644 index 0000000..882d4db --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/03b_Inference-Autogenerated.py @@ -0,0 +1,114 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC This is an auto-generated notebook to perform batch inference on a Spark DataFrame using a selected model from the model registry. This feature is in preview, and we would greatly appreciate any feedback through this form: https://databricks.sjc1.qualtrics.com/jfe/form/SV_1H6Ovx38zgCKAR0. +# MAGIC +# MAGIC ## Instructions: +# MAGIC 1. Run the notebook against a cluster with Databricks ML Runtime version 13.2.x-cpu, to best re-create the training environment. +# MAGIC 2. Add additional data processing on your loaded table to match the model schema if necessary (see the "Define input and output" section below). +# MAGIC 3. "Run All" the notebook. +# MAGIC 4. Note: If the `%pip` does not work for your model (i.e. it does not have a `requirements.txt` file logged), modify to use `%conda` if possible. + +# COMMAND ---------- + +model_name = "mini_mlops_pipeline" + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Environment Recreation +# MAGIC Run the notebook against a cluster with Databricks ML Runtime version 13.2.x-cpu, to best re-create the training environment.. The cell below downloads the model artifacts associated with your model in the remote registry, which include `conda.yaml` and `requirements.txt` files. In this notebook, `pip` is used to reinstall dependencies by default. +# MAGIC +# MAGIC ### (Optional) Conda Instructions +# MAGIC Models logged with an MLflow client version earlier than 1.18.0 do not have a `requirements.txt` file. If you are using a Databricks ML runtime (versions 7.4-8.x), you can replace the `pip install` command below with the following lines to recreate your environment using `%conda` instead of `%pip`. +# MAGIC ``` +# MAGIC conda_yml = os.path.join(local_path, "conda.yaml") +# MAGIC %conda env update -f $conda_yml +# MAGIC ``` + +# COMMAND ---------- + +from mlflow.store.artifact.models_artifact_repo import ModelsArtifactRepository +import os + +model_uri = f"models:/{model_name}/Production" +local_path = ModelsArtifactRepository(model_uri).download_artifacts("") # download model from remote registry + +requirements_path = os.path.join(local_path, "requirements.txt") +if not os.path.exists(requirements_path): + dbutils.fs.put("file:" + requirements_path, "", True) + +# COMMAND ---------- + +# MAGIC %pip install -r $requirements_path + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Define input and output +# MAGIC The table path assigned to`input_table_name` will be used for batch inference and the predictions will be saved to `output_table_path`. After the table has been loaded, you can perform additional data processing, such as renaming or removing columns, to ensure the model and table schema matches. + +# COMMAND ---------- + +# redefining key variables here because %pip and %conda restarts the Python interpreter +model_name = "mini_mlops_pipeline" +input_table_name = "andre_catalog.default.white_wine" +output_table_path = "/FileStore/batch-inference/mini_mlops_pipeline" + +# COMMAND ---------- + +# load table as a Spark DataFrame +table = spark.table(input_table_name) + +# optionally, perform additional data processing (may be necessary to conform the schema) + + +# COMMAND ---------- + +if "quality" in table.columns: ## Note: added + table = table.drop("quality") + +# COMMAND ---------- + +# MAGIC %md ## Load model and run inference +# MAGIC **Note**: If the model does not return double values, override `result_type` to the desired type. + +# COMMAND ---------- + +import mlflow +from pyspark.sql.functions import struct + +model_uri = f"models:/{model_name}/Production" + +# create spark user-defined function for model prediction +predict = mlflow.pyfunc.spark_udf(spark, model_uri, result_type="double") + +# COMMAND ---------- + +output_df = table.withColumn("prediction", predict(struct(*table.columns))) + +# COMMAND ---------- + +# MAGIC %md ## Save predictions +# MAGIC **The default output path on DBFS is accessible to everyone in this Workspace. If you want to limit access to the output you must change the path to a protected location.** +# MAGIC The cell below will save the output table to the specified FileStore path. `datetime.now()` is appended to the path to prevent overwriting the table in the event that this notebook is run in a batch inference job. To overwrite existing tables at the path, replace the cell below with: +# MAGIC ```python +# MAGIC output_df.write.mode("overwrite").save(output_table_path) +# MAGIC ``` +# MAGIC +# MAGIC ### (Optional) Write predictions to Unity Catalog +# MAGIC If you have access to any UC catalogs, you can also save predictions to UC by specifying a table in the format `..`. +# MAGIC ```python +# MAGIC output_table = "" # Example: "ml.batch-inference.mini_mlops_pipeline" +# MAGIC output_df.write.saveAsTable(output_table) +# MAGIC ``` + +# COMMAND ---------- + +from datetime import datetime + +# To write to a unity catalog table, see instructions above +output_df.write.save(f"{output_table_path}_{datetime.now().isoformat()}".replace(":", ".")) + +# COMMAND ---------- + +output_df.display() diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/04a_RT_Serving_Start.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/04a_RT_Serving_Start.py new file mode 100644 index 0000000..e8ddfb3 --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/04a_RT_Serving_Start.py @@ -0,0 +1,102 @@ +# Databricks notebook source +# MAGIC %md ## Start a model serving endpoint + +# COMMAND ---------- + +# MAGIC %md ### Setup includes + +# COMMAND ---------- + +# MAGIC %run ./includes/Common + +# COMMAND ---------- + +# MAGIC %md ### Setup widgets + +# COMMAND ---------- + +dbutils.widgets.text("Registered Model Version", "1") +registered_model_version = dbutils.widgets.get("Registered Model Version") +print("registered_model_version:", registered_model_version) + +# COMMAND ---------- + +# MAGIC %md #### List all endpoints + +# COMMAND ---------- + +endpoints = model_serving_client.list_endpoints() +for e in endpoints: + print(f" {e['name']} - {e['creator']}") + +# COMMAND ---------- + +# MAGIC %md #### See if our endpoint is running + +# COMMAND ---------- + +endpoint = model_serving_client.get_endpoint(_endpoint_name) +if endpoint: + print(f"Endpoint '{_endpoint_name}' is running") + print(endpoint) +else: + print(f"Endpoint '{_endpoint_name}' is not running") + +# COMMAND ---------- + +# MAGIC %md #### If our endpoint is running then exit notebook + +# COMMAND ---------- + +if endpoint: + dbutils.notebook.exit(0) +print(f"About to launch endpoint '{_endpoint_name}'") + +# COMMAND ---------- + +# MAGIC %md #### Define endpoint spec + +# COMMAND ---------- + +served_model = "my-model" +spec = { + "name": f"{_endpoint_name}", + "config": { + "served_models": [ + { + "name": f"{served_model}", + "model_name": f"{_model_name}", + f"model_version": f"{registered_model_version}", + "workload_size": "Small", + "scale_to_zero_enabled": False + } + ] + } +} +spec + +# COMMAND ---------- + +# MAGIC %md #### Start the endpoint + +# COMMAND ---------- + +model_serving_client.start_endpoint(spec) + +# COMMAND ---------- + +# MAGIC %md #### Wait until endpoint is in READY state + +# COMMAND ---------- + +model_serving_client.wait_until(_endpoint_name, max=50, sleep_time=4) + +# COMMAND ---------- + +model_serving_client.get_endpoint(_endpoint_name) + +# COMMAND ---------- + +# MAGIC %md ### Next notebook +# MAGIC +# MAGIC Once the serving endpoint has started, go to the **[04b_RT_Serving_Score]($04b_RT_Serving_Score)** notebook to submit scoring requests. diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/04b_RT_Serving_Score.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/04b_RT_Serving_Score.py new file mode 100644 index 0000000..c119c86 --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/04b_RT_Serving_Score.py @@ -0,0 +1,59 @@ +# Databricks notebook source +# MAGIC %md ## Score a model with RT Model Serving + +# COMMAND ---------- + +# MAGIC %run ./includes/Common + +# COMMAND ---------- + +# Example: https://e2-erie.mist.databricks.com/serving-endpoints/mini_mlops_wine_quality/invocations + +endpoint_uri = f"https://{_host_name}/serving-endpoints/{_endpoint_name}/invocations" +import os +os.environ["ENDPOINT_URI"] = endpoint_uri +os.environ["TOKEN"] = token +endpoint_uri + +# COMMAND ---------- + +# MAGIC %md ### Score with curL + +# COMMAND ---------- + +# MAGIC %sh +# MAGIC curl -s $ENDPOINT_URI \ +# MAGIC -H "Authorization: Bearer $TOKEN" \ +# MAGIC -H 'Content-Type: application/json' \ +# MAGIC -d '{"dataframe_split": { "columns": [ "fixed acidity", "volatile acidity", "citric acid", "residual sugar", "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density", "pH", "sulphates", "alcohol" ], "data": [ [ 7, 0.27, 0.36, 20.7, 0.045, 45, 170, 1.001, 3, 0.45, 8.8 ], [ 6.3, 0.3, 0.34, 1.6, 0.049, 14, 132, 0.994, 3.3, 0.49, 9.5 ], [ 8.1, 0.28, 0.4, 6.9, 0.05, 30, 97, 0.9951, 3.26, 0.44, 10.1 ] ] } } +# MAGIC ' + +# COMMAND ---------- + +# MAGIC %md ### Score with Python requests + +# COMMAND ---------- + +import requests +data = { "dataframe_split": { + "columns": + [ "fixed acidity", "volatile acidity", "citric acid", "residual sugar", "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density", "pH", "sulphates", "alcohol" ], + "data": [ + [ 7, 0.27, 0.36, 20.7, 0.045, 45, 170, 1.001, 3, 0.45, 8.8 ], + [ 6.3, 0.3, 0.34, 1.6, 0.049, 14, 132, 0.994, 3.3, 0.49, 9.5 ], + [ 8.1, 0.28, 0.4, 6.9, 0.05, 30, 97, 0.9951, 3.26, 0.44, 10.1 ] + ] + } +} +import json +data = json.dumps(data) + +headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } +rsp = requests.post(endpoint_uri, headers=headers, data=data, timeout=15) +rsp.text + +# COMMAND ---------- + +# MAGIC %md ### Next notebook +# MAGIC +# MAGIC When finished scoring, go to the **[04c_RT_Serving_Stop]($04c_RT_Serving_Stop)** notebook to shut down the serving endpoint. diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/04c_RT_Serving_Stop.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/04c_RT_Serving_Stop.py new file mode 100644 index 0000000..331187e --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/04c_RT_Serving_Stop.py @@ -0,0 +1,30 @@ +# Databricks notebook source +# MAGIC %md ## Stop a model serving endpoint + +# COMMAND ---------- + +# MAGIC %run ./includes/Common + +# COMMAND ---------- + +model_serving_client.get_endpoint(_endpoint_name) + +# COMMAND ---------- + +model_serving_client.stop_endpoint(_endpoint_name) + +# COMMAND ---------- + +model_serving_client.get_endpoint(_endpoint_name) + +# COMMAND ---------- + +endpoints = model_serving_client.list_endpoints() +for e in endpoints: + print(f" {e['name']} - {e['creator']}") + +# COMMAND ---------- + +# MAGIC %md ### Next notebook +# MAGIC +# MAGIC Congratulations! You have finished your Mini MLOps exercise. There is no next notebook. diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/_README.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/_README.py new file mode 100644 index 0000000..c67eb44 --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/_README.py @@ -0,0 +1,41 @@ +# Databricks notebook source +# MAGIC %md # Mini MLOps pipeline - UC - WIP +# MAGIC +# MAGIC ##### Overview +# MAGIC +# MAGIC * Trains several model runs with different hyperparameters. +# MAGIC * Registers the best run's model in the model registry as a production stage. +# MAGIC * Batch scoring with Spark. +# MAGIC * Real-time model scoring with Serverless Model Serving ([AWS](https://docs.databricks.com/machine-learning/model-inference/serverless/serverless-real-time-inference.html) - [Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/)). +# MAGIC +# MAGIC ##### Notebooks +# MAGIC +# MAGIC * [01_Train_Model]($01_Train_Model) - Run several Sklearn training runs with different hyperparameters. +# MAGIC * Option to use [MLflow Autologging](https://docs.databricks.com/applications/mlflow/databricks-autologging.html). +# MAGIC * [02_Register_Model]($02_Register_Model) - Find the best run and register as model `models:/mini_mlops_pipeline/production`. +# MAGIC * [03a_Batch_Scoring]($03a_Batch_Scoring) - Batch score with Sklearn, Pyfunc and UDF flavors. +# MAGIC * [03b_Inference-Autogenerated]($03b_Inference-Autogenerated) - Autogenerated from model version UI button `Use model for inference`. +# MAGIC * Real-time Model Serving endpoint - using [Serverless Model Serving](https://docs.databricks.com/machine-learning/model-inference/serverless/serverless-real-time-inference.html). +# MAGIC * [04a_RT_Serving_Start]($04a_RT_Serving_Start) - Start endpoint. +# MAGIC * [04b_RT_Serving_Score]($04b_RT_Serving_Score) - Score endpoint. +# MAGIC * [04c_RT_Serving_Stop]($04c_RT_Serving_Stop) - Stop endpoint. +# MAGIC * Helper notebooks: +# MAGIC * [Common]($includes/Common) - Helper functions. +# MAGIC * [ModelServingClient]($includes/ModelServingClient) - Python HTTP client to invoke Databricks Model Serving API. +# MAGIC * [HttpClient]($includes/HttpClient) - Python HTTP client to invoke Databricks API. +# MAGIC +# MAGIC ##### Github +# MAGIC * [github.com/amesar/mlflow-examples/databricks/notebooks/Mini_MLOps_Pipeline](https://github.com/amesar/mlflow-examples/tree/master/databricks/notebooks/Mini_MLOps_Pipeline) +# MAGIC +# MAGIC Last updated: _2023-10-10_ + +# COMMAND ---------- + +# MAGIC %md +# MAGIC **Batch Scoring Pipeline** +# MAGIC +# MAGIC +# MAGIC +# MAGIC **Real-time Scoring Pipeline** +# MAGIC +# MAGIC diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/Common.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/Common.py new file mode 100644 index 0000000..9c98bc0 --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/Common.py @@ -0,0 +1,213 @@ +# Databricks notebook source +# Common helper functions. +# Global variables: +# * model_uri - URI of the registered model. +# * experiment_name - name of the 01_Train_Model notebook experiment. +# * endpoint_name - mini_mlops_wine_quality + +# COMMAND ---------- + +import os +import mlflow + +print("Versions:") +print(" MLflow Version:", mlflow.__version__) +print(" DATABRICKS_RUNTIME_VERSION:", os.environ.get('DATABRICKS_RUNTIME_VERSION', None)) + +# COMMAND ---------- + +mlflow.set_registry_uri("databricks-uc") +mlflow_client = mlflow.MlflowClient(registry_uri="databricks-uc") + + +# COMMAND ---------- + +def show_mlflow_uris(msg): + print(f"{msg}:") + print(" mlflow.get_tracking_uri:", mlflow.get_tracking_uri()) + print(" mlflow.get_registry_uri:", mlflow.get_registry_uri()) + print(" mlflowClient.tracking_uri:", mlflow_client.tracking_uri) + print(" mlflowClient.registry_uri:", mlflow_client._registry_uri) +show_mlflow_uris("After UC settings") + +# COMMAND ---------- + +def show_status(): + print("Status:") + print(" _model_name: ", _model_name) + print(" _model_uri: ", _model_uri) + print(" _endpoint_name:", _endpoint_name) + +# COMMAND ---------- + +_model_name_path = "/tmp/Mini_MLOps_Pipeline.txt" +print("_model_name_path:", _model_name_path) +_model_name = None + +def _write_model_name(): + with open(_model_name_path, "w") as f: + f.write(_model_name) + +def _read_model_name(): + if os.path.exists(_model_name_path): + with open(_model_name_path, "r") as f: + return f.read() + else: + raise RuntimeError(f"ERROR: '{_model_name_path}' missing. Run notebook 02_Register_Model first") + +_model_name = _read_model_name() +_model_uri = f"models:/{_model_name}/production" +_endpoint_name = "mini_mlops_wine_quality" + +show_status() + +# COMMAND ---------- + +# Experiment name is the same as the 01_Train_Model notebook. +_notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() + +_notebook = _notebook_context.notebookPath().get() +_dir = os.path.dirname(_notebook) +_experiment_name = os.path.join(_dir, "01_Train_Model") + +print("_experiment_name:", _experiment_name) + +# COMMAND ---------- + +def _get_notebook_tag(tag): + tag = _notebook_context.tags().get(tag) + return None if tag.isEmpty() else tag.get() + +# COMMAND ---------- + +def _create_experiment_for_repos(): + print("Creating Repos scratch experiment") + with mlflow.start_run() as run: + pass # mlflow.set_tag("info","hi there") + return mlflow_client.get_experiment(run.info.experiment_id) + +# COMMAND ---------- + +# Gets the current notebook's experiment information + +def init(): + experiment_name = _notebook + print("Experiment name:", experiment_name) + experiment = mlflow_client.get_experiment_by_name(experiment_name) + # Is running in as Repos, cannot need to create its default "notebook" experiment + if not experiment: + experiment = _create_experiment_for_repos() + print(f"Running as Repos - created Repos experiment:", experiment.name) + + print("Experiment ID:", experiment.experiment_id) + print("Experiment name:", experiment.name) + return experiment + +# COMMAND ---------- + +def delete_runs(experiment_id): + runs = mlflow_client.search_runs(experiment_id) + print(f"Found {len(runs)} runs for experiment_id {experiment_id}") + for run in runs: + mlflow_client.delete_run(run.info.run_id) + +# COMMAND ---------- + +def _read_data(data_path): + import pandas as pd + print(f"Reading data from '{data_path}'") + pdf = pd.read_csv(data_path) + pdf.columns = pdf.columns.str.replace(" ","_") # make Spark legal column names + return pdf + +def get_wine_quality_data(table_name=""): + """ Read CSV data from internet or from Delta table """ + data_path = "https://raw.githubusercontent.com/mlflow/mlflow/master/examples/sklearn_elasticnet_wine/wine-quality.csv" + if table_name == "": + return _read_data(data_path) + else: + if not spark.catalog._jcatalog.tableExists(table_name): # Create table if it does not exist + print(f"Creating table '{table_name}'") + pdf = _read_data(data_path) + df = spark.createDataFrame(pdf) + df.write.mode("overwrite").saveAsTable(table_name) + print(f"Reading from table '{table_name}'") + return spark.table(table_name).toPandas() # Read from Delta table + +# COMMAND ---------- + +# Columns +_col_label = "quality" +_col_prediction = "prediction" + +# COMMAND ---------- + +_host_name = _get_notebook_tag("browserHostName") +print("_host_name:", _host_name) + +# COMMAND ---------- + +token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() +dbutils.fs.put("file:///root/.databrickscfg",f"[DEFAULT]\nhost=https://{_host_name}\ntoken = "+token,overwrite=True) + +# COMMAND ---------- + +def display_experiment_uri(experiment): + host_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("browserHostName").get() + uri = f"https://{host_name}/#mlflow/experiments/{experiment.experiment_id}" + displayHTML(f""" +
+ + + + +
Experiment
UI link{uri}
Name{experiment.name}
ID{experiment.experiment_id}
+ """) + +# COMMAND ---------- + +def display_registered_model_uri(model_name): + uri = f"https://{_host_name}/#mlflow/models/{model_name}" + displayHTML("""Registered Model URI: {}""".format(uri,uri)) + +# COMMAND ---------- + +""" +Wait function due to cloud eventual consistency. +Waits until a version is in the READY status. +""" +import time +from mlflow.entities.model_registry.model_version_status import ModelVersionStatus + +def wait_until_version_ready(model_name, model_version, sleep_time=1, iterations=100): + start = time.time() + for _ in range(iterations): + version = mlflow_client.get_model_version(model_name, model_version.version) + status = ModelVersionStatus.from_string(version.status) + dt = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(round(time.time()))) + print(f"{dt}: Version {version.version} status: {ModelVersionStatus.to_string(status)}") + if status == ModelVersionStatus.READY: + break + elif status == ModelVersionStatus.FAILED_REGISTRATION: + raise Exception(f"ERROR: status={ModelVersionStatus.to_string(status)}") + time.sleep(sleep_time) + end = time.time() + print(f"Waited {round(end-start,2)} seconds") + +# COMMAND ---------- + +def assert_widget(value, name): + if len(value.rstrip())==0: + raise RuntimeError(f"ERROR: '{name}' widget is required") + +# COMMAND ---------- + +# MAGIC %run ./HttpClient + +# COMMAND ---------- + +databricks_client = DatabricksHttpClient() + +# COMMAND ---------- + +# MAGIC %run ./ModelServingClient diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/HttpClient.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/HttpClient.py new file mode 100644 index 0000000..77e56de --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/HttpClient.py @@ -0,0 +1,89 @@ +# Databricks notebook source +# HTTP client for MLflow and Databricks APIs + +# COMMAND ---------- + +class HttpException(Exception): + def __init__(self, ex, http_status_code): + self.http_status_code = http_status_code + +# COMMAND ---------- + +import os +import json +import requests + +def get_token(): + return dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() + +def get_host(): + host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("browserHostName").get() + return f"https://{host}" + +TIMEOUT = 15 + +""" Wrapper for get and post methods for Databricks REST APIs. """ +class HttpClient(object): + def __init__(self, api_prefix, host=None, token=None): + if not host: host = get_host() + if not token: token = get_token() + self.api_uri = os.path.join(host, api_prefix) + self.token = token + + def _get(self, resource): + """ Executes an HTTP GET call + :param resource: Relative path name of resource such as cluster/list + """ + uri = self._mk_uri(resource) + rsp = requests.get(uri, headers=self._mk_headers()) + self._check_response(rsp, uri) + return rsp + + def get(self, resource): + return json.loads(self._get(resource).text) + + def post(self, resource, data): + """ Executes an HTTP POST call + :param resource: Relative path name of resource such as runs/search + :param data: Post request payload + """ + uri = self._mk_uri(resource) + data = json.dumps(data) + rsp = requests.post(uri, headers=self._mk_headers(), data=data) + self._check_response(rsp,uri) + return json.loads(rsp.text) + + def _delete(self, resource, data=None): + """ Executes an HTTP POST call + :param resource: Relative path name of resource such as runs/search + :param data: Post request payload + """ + uri = self._mk_uri(resource) + data = json.dumps(data) if data else None + rsp = requests.delete(uri, headers=self._mk_headers(), data=data, timeout=TIMEOUT) + self._check_response(rsp, uri) + return rsp + + def delete(self, resource, data=None): + return json.loads(self._delete(resource, data).text) + + def _mk_headers(self): + return {} if self.token is None else { "Authorization": f"Bearer {self.token}" } + + def _mk_uri(self, resource): + return f"{self.api_uri}/{resource}" + + def _check_response(self, rsp, uri): + if rsp.status_code < 200 or rsp.status_code > 299: + raise HttpException(f"HTTP status code: {rsp.status_code}. Reason: {rsp.reason} URL: {uri}", rsp.status_code) + + def __repr__(self): + return self.api_uri + +class DatabricksHttpClient(HttpClient): + def __init__(self, host=None, token=None): + super().__init__("api/2.0", host, token) + +class MlflowHttpClient(HttpClient): + def __init__(self, host=None, token=None): + super().__init__("api/2.0/mlflow", host, token) diff --git a/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/ModelServingClient.py b/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/ModelServingClient.py new file mode 100644 index 0000000..3365583 --- /dev/null +++ b/databricks/notebooks/Mini_MLOps_Pipeline_UC/includes/ModelServingClient.py @@ -0,0 +1,56 @@ +# Databricks notebook source +class ModelServingClient: + + def __init__(self): + self.databricks_client = DatabricksHttpClient() + + def get_endpoint(self, endpoint_name): + try: + endpoint = databricks_client.get(f"serving-endpoints/{endpoint_name}") + return endpoint + except HttpException as e: + if e.http_status_code != 404: + raise e + return None + + def start_endpoint(self, spec): + return self.databricks_client.post("serving-endpoints", spec) + + def stop_endpoint(self, endpoint_name): + try: + self.databricks_client.delete(f"serving-endpoints/{endpoint_name}") + return True + except HttpException as e: + if e.http_status_code != 404: + raise e + return False + + def list_endpoints(self): + endpoints = databricks_client.get("serving-endpoints") + if len(endpoints) > 0: + endpoints = endpoints["endpoints"] + return endpoints + + def wait_until(self, endpoint_name, max=20, sleep_time=2): + import time + for i in range(0,max): + endpoint = self.get_endpoint(endpoint_name) + if not endpoint: + return {} + # 'state': {'ready': 'READY', 'config_update': 'NOT_UPDATING'}, + state = endpoint.get("state",None) + now = time.strftime("%Y-%m-%d_%H:%M:%S", time.gmtime(time.time())) + print(f"{now}: Waiting {i+1}/{max}: {state}") + if state["ready"] == "READY" or state["config_update"] == "UPDATE_FAILED": + return state + import time + time.sleep(sleep_time) + return {} + +# COMMAND ---------- + +model_serving_client = ModelServingClient() + +# COMMAND ---------- + +