Skip to content

Commit

Permalink
Issue #17: First pass commit for Mini_MLOps_Pipeline UC notebooks
Browse files Browse the repository at this point in the history
  • Loading branch information
amesar committed Oct 11, 2023
1 parent eb59f6b commit 8911df7
Show file tree
Hide file tree
Showing 11 changed files with 1,120 additions and 0 deletions.
151 changes: 151 additions & 0 deletions databricks/notebooks/Mini_MLOps_Pipeline_UC/01_Train_Model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Databricks notebook source
# MAGIC %md # Train Sklearn Model
# MAGIC
# MAGIC **Overview**
# MAGIC * Option to use [MLflow Autologging](https://docs.databricks.com/applications/mlflow/databricks-autologging.html).
# MAGIC * Train a Sklearn model several times with different `maxDepth` hyperparameter values.
# MAGIC * Runs will be in the notebook experiment.
# MAGIC * Algorithm: DecisionTreeRegressor.
# MAGIC
# MAGIC **Widgets**
# MAGIC * Autologging - yes or no.
# MAGIC * Delete existing runs - Deletes existing experiment runs before executing training runs.
# MAGIC * Delta table - If specified will read data from a Delta table. The table will be created if it doesn't already exist. Otherwise read wine quality data via HTTP download.

# COMMAND ----------

# MAGIC %md ### Setup

# COMMAND ----------

# MAGIC %run ./includes/Common

# COMMAND ----------

dbutils.widgets.dropdown("1. Autologging","no",["yes","no"])
autologging = dbutils.widgets.get("1. Autologging") == "yes"

dbutils.widgets.dropdown("2. Delete existing runs","yes",["yes","no"])
do_delete_runs = dbutils.widgets.get("2. Delete existing runs") == "yes"

dbutils.widgets.text("3. Delta table", "")
delta_table = dbutils.widgets.get("3. Delta table")

autologging, do_delete_runs
print("autologging:", autologging)
print("do_delete_runs:", do_delete_runs)
print("delta_table:", delta_table)

# COMMAND ----------

experiment = init()

# COMMAND ----------

# MAGIC %md ### Delete any existing runs

# COMMAND ----------

if do_delete_runs:
delete_runs(experiment.experiment_id)

# COMMAND ----------

# MAGIC %md ### Prepare data

# COMMAND ----------

data = get_wine_quality_data(delta_table)
display(data)

# COMMAND ----------

data.describe()

# COMMAND ----------

from sklearn.model_selection import train_test_split

train, test = train_test_split(data, test_size=0.30, random_state=42)
X_train = train.drop([_col_label], axis=1)
X_test = test.drop([_col_label], axis=1)
y_train = train[[_col_label]]
y_test = test[[_col_label]]

# COMMAND ----------

# MAGIC %md ### Training Pipeline

# COMMAND ----------

import numpy as np
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error

# COMMAND ----------

# MAGIC %md #### Train with explicit calls to MLflow API

# COMMAND ----------

from mlflow.models.signature import infer_signature

# COMMAND ----------


def train_no_autologging(max_depth):
import time
now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time()))
with mlflow.start_run(run_name=f"No_autolog") as run:
mlflow.set_tag("mlflow_version", mlflow.__version__)
mlflow.set_tag("experiment_name", experiment.name)
mlflow.log_param("max_depth", max_depth)
model = DecisionTreeRegressor(max_depth=max_depth)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, predictions))
mlflow.log_metric("training_rmse", rmse)
print(f"rmse={rmse:5.3f} max_depth={max_depth:02d} run_id={run.info.run_id}")
signature = infer_signature(X_train, predictions)
mlflow.sklearn.log_model(model, "model", signature=signature)

# COMMAND ----------

# MAGIC %md #### Train with MLflow Autologging

# COMMAND ----------

def train_with_autologging(max_depth):
model = DecisionTreeRegressor(max_depth=max_depth)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, predictions))
print(f"rmse={rmse:5.3f} max_depth={max_depth:02d}")

# COMMAND ----------

# MAGIC %md ### Train model with different hyperparameters
# MAGIC * In a realistic scenario you would use hyperopt or similar. See:
# MAGIC * [Hyperopt concepts](https://docs.databricks.com/machine-learning/automl-hyperparam-tuning/hyperopt-concepts.html)
# MAGIC * [Hyperparameter tuning](https://docs.databricks.com/machine-learning/automl-hyperparam-tuning/index.html)

# COMMAND ----------

max_depths = [1, 2, 4, 8, 16]
#max_depths = [1]

for max_depth in max_depths:
if autologging:
train_with_autologging(max_depth)
else:
train_no_autologging(max_depth)

# COMMAND ----------

display_experiment_uri(experiment)

# COMMAND ----------

# MAGIC %md ### Next notebook
# MAGIC
# MAGIC Next go to the **[02_Register_Model]($02_Register_Model)** notebook.
176 changes: 176 additions & 0 deletions databricks/notebooks/Mini_MLOps_Pipeline_UC/02_Register_Model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Databricks notebook source
# MAGIC %md # Register Best Model in Model Registry
# MAGIC * Creates a new registered model if it doesn't already exist.
# MAGIC * Deletes all current model versions (optional).
# MAGIC * Finds the best model (lowest RMSE metric) generated from [01_Train_Model]($01_Train_Model) notebook experiment.
# MAGIC * Adds the best run as a registered model version and promotes it to the `production` stage.

# COMMAND ----------

# MAGIC %md ### Setup

# COMMAND ----------

# MAGIC %run ./includes/Common

# COMMAND ----------

dbutils.widgets.text("1. Registered model","")
_model_name = dbutils.widgets.get("1. Registered model")
_write_model_name()

dbutils.widgets.dropdown("2. Delete existing versions","yes",["yes","no"])
delete_existing_versions = dbutils.widgets.get("2. Delete existing versions") == "yes"

print("_model_name:", _model_name)
print("delete_existing_versions:", delete_existing_versions)

# COMMAND ----------

assert_widget(_model_name, "1. Registered model")

# COMMAND ----------

# MAGIC %md ### Display experiment

# COMMAND ----------

experiment = mlflow_client.get_experiment_by_name(_experiment_name)
experiment_id = experiment.experiment_id
display_experiment_uri(experiment)

# COMMAND ----------

# MAGIC %md ### Show all the RMSE metrics and max_depth params of all runs

# COMMAND ----------

df = mlflow.search_runs(experiment_id, order_by=["metrics.rmse ASC"])
df = df[["run_id","metrics.training_rmse","params.max_depth"]]
df = df.round({"metrics.training_rmse": 3})
display(df)

# COMMAND ----------

# MAGIC %md ### Find best run
# MAGIC
# MAGIC Search for the run with lowest RMSE metric.

# COMMAND ----------

runs = mlflow_client.search_runs(experiment_id, order_by=["metrics.training_rmse ASC"], max_results=1)
best_run = runs[0]
best_run.info.run_id, round(best_run.data.metrics["training_rmse"],3), best_run.data.params

# COMMAND ----------

# MAGIC %md ##### If you used a Delta table to train your model, its value is displayed here as in:
# MAGIC
# MAGIC `path=dbfs:/user/hive/warehouse/andre.db/wine_quality,version=0,format=delta`

# COMMAND ----------

best_run.data.tags.get("sparkDatasourceInfo")

# COMMAND ----------

# MAGIC %md ### Create registered model
# MAGIC
# MAGIC If model already exists remove all existing versions.

# COMMAND ----------

from mlflow.exceptions import MlflowException, RestException
from mlflow.exceptions import MlflowException, RestException

try:
registered_model = mlflow_client.get_registered_model(_model_name)
print(f"Found existing {_model_name}")
versions = mlflow_client.search_model_versions(f"name='{_model_name}'")
print(f"Found {len(versions)} versions")
if delete_existing_versions:
for vr in versions:
print(f" version={vr.version} status={vr.status} stage={vr.current_stage} run_id={vr.run_id}")
mlflow_client.delete_model_version(_model_name, vr.version)
except RestException as e:
print("INFO:",e)
if e.error_code == "RESOURCE_DOES_NOT_EXIST":
print(f"Creating {_model_name}")
registered_model = mlflow_client.create_registered_model(_model_name)
else:
raise RuntimeError(e)

# COMMAND ----------

type(registered_model),registered_model.__dict__

# COMMAND ----------

display_registered_model_uri(_model_name)

# COMMAND ----------

# MAGIC %md **Create the version**

# COMMAND ----------

source = f"{best_run.info.artifact_uri}/model"
source

# COMMAND ----------

mlflow_client.tracking_uri, mlflow_client._registry_uri, _model_name

# COMMAND ----------

version = mlflow_client.create_model_version(_model_name, source, best_run.info.run_id)
type(version), version.__dict__

# COMMAND ----------

# MAGIC %md ### Promote best run version to Production stage

# COMMAND ----------

# MAGIC %md **Wait until version is in READY status**

# COMMAND ----------

_model_name

# COMMAND ----------

wait_until_version_ready(_model_name, version, sleep_time=2)

# COMMAND ----------

version = mlflow_client.get_model_version(_model_name, version.version)
version_id = version.version

print("version.version:", version.version)
print("version.status:", version.status)
print("version.current_stage:", version.current_stage)
print("version.aliases:", version.aliases)
print("version.run_id:", version.run_id)

# COMMAND ----------

# MAGIC %md **Transition to production stage**

# COMMAND ----------

##mlflow_client.transition_model_version_stage(_model_name, version_id, "Production")
mlflow_client.set_registered_model_alias(_model_name, "production", version.version)

# COMMAND ----------

version = mlflow_client.get_model_version(_model_name, version_id)
type(version), version.__dict__

# COMMAND ----------

# MAGIC %md ### Next notebook
# MAGIC
# MAGIC Now either go to:
# MAGIC * **[03a_Batch_Scoring]($03a_Batch_Scoring)** notebook for batch scoring
# MAGIC * **[04a_RT_Serving_Start ]($04a_RT_Serving_Start )** notebook for real-time model serving
Loading

0 comments on commit 8911df7

Please sign in to comment.