Skip to content

Commit

Permalink
Merge pull request #50 from teamdatatonic/fix/enable-model-monitoring
Browse files Browse the repository at this point in the history
Fix: enable model monitoring for XGBoost pipeline
  • Loading branch information
Linchin authored Jun 2, 2023
2 parents 31396ab + 034c440 commit a78d2f6
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 25 deletions.
16 changes: 10 additions & 6 deletions pipelines/src/pipelines/tensorflow/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,16 @@ def tensorflow_pipeline(
).set_display_name("Ingest data")

# lookup champion model
champion_model = lookup_model(
model_name=model_name,
project_location=project_location,
project_id=project_id,
fail_on_model_not_found=True,
).set_display_name("Look up champion model")
champion_model = (
lookup_model(
model_name=model_name,
project_location=project_location,
project_id=project_id,
fail_on_model_not_found=True,
)
.set_display_name("Look up champion model")
.set_caching_options(False)
)

# batch predict from BigQuery to BigQuery
bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ def _get_temp_dir(dirpath, task_id):
parser.add_argument("--hparams", default={}, type=json.loads)
args = parser.parse_args()

if args.model.startswith("gs://"):
args.model = Path("/gcs/" + args.model[5:])

# merge dictionaries by overwriting default_model_params if provided in model_params
hparams = {**DEFAULT_HPARAMS, **args.hparams}
logging.info(f"Using model hyper-parameters: {hparams}")
Expand Down Expand Up @@ -261,9 +264,9 @@ def _get_temp_dir(dirpath, task_id):
logging.info("not chief node, exiting now")
sys.exit()

os.makedirs(args.model, exist_ok=True)
logging.info(f"Save model to: {args.model}")
tf_model.save(args.model, save_format="tf")
args.model.mkdir(parents=True)
tf_model.save(str(args.model), save_format="tf")

logging.info(f"Save metrics to: {args.metrics}")
eval_metrics = dict(zip(tf_model.metrics_names, tf_model.evaluate(test_ds)))
Expand All @@ -281,11 +284,13 @@ def _get_temp_dir(dirpath, task_id):
json.dump(metrics, fp)

# Persist URIs of training file(s) for model monitoring in batch predictions
path = Path(args.model) / TRAINING_DATASET_INFO
# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501
# for the expected schema.
path = args.model / TRAINING_DATASET_INFO
training_dataset_for_monitoring = {
"gcsSource": {"uris": [args.train_data]},
"dataFormat": "csv",
"targetField": hparams["label"],
"targetField": label,
}
logging.info(f"Save training dataset info for model monitoring: {path}")
logging.info(f"Training dataset: {training_dataset_for_monitoring}")
Expand Down
1 change: 1 addition & 0 deletions pipelines/src/pipelines/tensorflow/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def tensorflow_pipeline(
fail_on_model_not_found=False,
)
.set_display_name("Lookup past model")
.set_caching_options(False)
.outputs["model_resource_name"]
)

Expand Down
16 changes: 10 additions & 6 deletions pipelines/src/pipelines/xgboost/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,16 @@ def xgboost_pipeline(
).set_display_name("Ingest data")

# lookup champion model
champion_model = lookup_model(
model_name=model_name,
project_location=project_location,
project_id=project_id,
fail_on_model_not_found=True,
).set_display_name("Look up champion model")
champion_model = (
lookup_model(
model_name=model_name,
project_location=project_location,
project_id=project_id,
fail_on_model_not_found=True,
)
.set_display_name("Look up champion model")
.set_caching_options(False)
)

# batch predict from BigQuery to BigQuery
bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}"
Expand Down
35 changes: 26 additions & 9 deletions pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import argparse
from pathlib import Path

import joblib
import json
import os
Expand All @@ -14,7 +16,9 @@

logging.basicConfig(level=logging.DEBUG)


# used for monitoring during prediction time
TRAINING_DATASET_INFO = "training_dataset.json"
# numeric/categorical features in Chicago trips dataset to be preprocessed
NUM_COLS = ["dayofweek", "hourofday", "trip_distance", "trip_miles", "trip_seconds"]
ORD_COLS = ["company"]
OHE_COLS = ["payment_type"]
Expand All @@ -39,6 +43,9 @@ def indices_in_list(elements: list, base_list: list) -> list:
parser.add_argument("--hparams", default={}, type=json.loads)
args = parser.parse_args()

if args.model.startswith("gs://"):
args.model = Path("/gcs/" + args.model[5:])

logging.info("Read csv files into dataframes")
df_train = pd.read_csv(args.train_data)
df_valid = pd.read_csv(args.valid_data)
Expand Down Expand Up @@ -111,15 +118,25 @@ def indices_in_list(elements: list, base_list: list) -> list:
"rootMeanSquaredLogError": np.sqrt(metrics.mean_squared_log_error(y_test, y_pred)),
}

try:
model_path = args.model.replace("gs://", "/gcs/")
logging.info(f"Save model to: {model_path}")
os.makedirs(model_path, exist_ok=True)
joblib.dump(pipeline, model_path + "model.joblib")
except Exception as e:
print(e)
raise e
logging.info(f"Save model to: {args.model}")
args.model.mkdir(parents=True)
joblib.dump(pipeline, str(args.model / "model.joblib"))

logging.info(f"Metrics: {metrics}")
with open(args.metrics, "w") as fp:
json.dump(metrics, fp)

# Persist URIs of training file(s) for model monitoring in batch predictions
# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501
# for the expected schema.
path = args.model / TRAINING_DATASET_INFO
training_dataset_for_monitoring = {
"gcsSource": {"uris": [args.train_data]},
"dataFormat": "csv",
"targetField": label,
}
logging.info(f"Training dataset info: {training_dataset_for_monitoring}")

with open(path, "w") as fp:
logging.info(f"Save training dataset info for model monitoring: {path}")
json.dump(training_dataset_for_monitoring, fp)
1 change: 1 addition & 0 deletions pipelines/src/pipelines/xgboost/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def xgboost_pipeline(
fail_on_model_not_found=False,
)
.set_display_name("Lookup past model")
.set_caching_options(False)
.outputs["model_resource_name"]
)

Expand Down

0 comments on commit a78d2f6

Please sign in to comment.