Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jobs): improve MLOps example #82

Merged
merged 15 commits into from
Jun 28, 2024
Merged
23 changes: 21 additions & 2 deletions jobs/ml-ops/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,28 @@ Set your Scaleway access key, secret key and project ID in environment variables
export TF_VAR_access_key=<your-access-key>
export TF_VAR_secret_key=<your-secret-key>
export TF_VAR_project_id=<your-project-id> # you can create a separate project for this example
```

You can optionally configure non-default CRON schedules to orderly fetch data then train a model, and finally re-load a new model within the inference server. For this, set the following Terraform environment variables:
Shillaker marked this conversation as resolved.
Show resolved Hide resolved

```console
export TF_VAR_data_fetch_cron_schedule=<cron-schedule-expression>
export TF_VAR_training_cron_schedule=<cron-schedule-expression>
export TF_VAR_inference_cron_schedule=<cron-schedule-expression>
```
Shillaker marked this conversation as resolved.
Show resolved Hide resolved

Then deploy MLOps infrastructure using the following:

```console
cd terraform
terraform init
terraform plan
terraform apply
```

### Step 2. Run the data and training Jobs
### Step 2. Optional: trigger jobs manually

The pipeline is automatic, all jobs will be run at their respective scheduled time. This step can be ignored unless for debugging or test purposes.

To run the jobs for the data and training, we can use the Scaleway CLI:

Expand All @@ -60,12 +74,17 @@ You can also trigger the jobs from the [Jobs section](https://console.scaleway.c

### Step 3. Use the inference API

Load model with the latest version using:

```
cd terraform
export INFERENCE_URL=$(terraform output raw endpoint)
curl -X POST ${INFERENCE_URL}
```

curl -X POST ${INFERENCE_URL}/load
Then post data to infer the class:

```
curl -X POST \
-H "Content-Type: application/json" \
-d @../inference/example.json
Expand Down
3 changes: 2 additions & 1 deletion jobs/ml-ops/data/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import boto3
import os
import urllib.request
import zipfile

import boto3

DATA_DIR = "dataset"

ZIP_URL = "http://archive.ics.uci.edu/static/public/222/bank+marketing.zip"
Expand Down
2 changes: 1 addition & 1 deletion jobs/ml-ops/inference/data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pandas as pd
import numpy as np
import pandas as pd
from pydantic import BaseModel


Expand Down
60 changes: 60 additions & 0 deletions jobs/ml-ops/inference/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os
import pickle

import boto3


class ClassifierLoader:
_classifier = None
_classifier_version = ""

@classmethod
def load(cls, force=False):
if force or cls._classifier is None:
access_key = os.environ["ACCESS_KEY"]
secret_key = os.environ["SECRET_KEY"]
region_name = os.environ["REGION"]

bucket_name = os.environ["S3_BUCKET_NAME"]
s3_url = os.environ["S3_URL"]

s3 = boto3.client(
"s3",
region_name=region_name,
endpoint_url=s3_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)

# get model file with the latest version
bucket_objects = s3.list_objects(Bucket=bucket_name)
get_last_modified = lambda object: int(
object["LastModified"].strftime("%s")
)
model_objects = [
model_object
for model_object in bucket_objects["Contents"]
if "classifier" in model_object["Key"]
]
latest_model_file = [
object["Key"] for object in sorted(model_objects, key=get_last_modified)
][0]

s3.download_file(bucket_name, latest_model_file, latest_model_file)

with open(latest_model_file, "rb") as fh:
cls._classifier = pickle.load(fh)
cls._classifier_version = latest_model_file[11:-4]

print(
"Successfully loaded model file: {latest_model_file}".format(
latest_model_file=latest_model_file
),
flush=True,
)

return cls._classifier

@classmethod
def model_version(cls):
return cls._classifier_version
54 changes: 19 additions & 35 deletions jobs/ml-ops/inference/main.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,38 @@
import data
from fastapi import FastAPI
from loader import ClassifierLoader
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import RocCurveDisplay
import pickle
import boto3
import pandas
import os

import data

classifier = RandomForestClassifier()

app = FastAPI()

MODEL_FILE = "classifier.pkl"


class ClassifierLoader:
_classifier = None
@app.get("/")
def hello():
"""Get Model Version"""

@classmethod
def load(cls, force=False):
if force or cls._classifier is None:
access_key = os.environ["ACCESS_KEY"]
secret_key = os.environ["SECRET_KEY"]
region_name = os.environ["REGION"]
model_version = ClassifierLoader.model_version()

bucket_name = os.environ["S3_BUCKET_NAME"]
s3_url = os.environ["S3_URL"]
if model_version == "":
return {
"message": "Hello, this is the inference server! No classifier loaded in memory."
}

s3 = boto3.client(
"s3",
region_name=region_name,
endpoint_url=s3_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
return {
"message": "Hello, this is the inference server! Serving classifier with version {model_version}".format(
model_version=model_version
)
}

s3.download_file(bucket_name, MODEL_FILE, MODEL_FILE)

with open(MODEL_FILE, "rb") as fh:
cls._classifier = pickle.load(fh)

return cls._classifier


@app.post("/load")
# this endpoint is used by cron trigger to load model from S3
@app.post("/")
def load():
"""Reloads classifier from model registry bucket"""

ClassifierLoader.load(force=True)

return {"message": "model loaded successfully"}


Expand All @@ -59,7 +43,7 @@ def classify(profile: data.ClientProfile):
cleaned_data = data.clean_profile(profile)
data_point_processed = data.transform_data(cleaned_data)

# Lazy-loads classifer from S3
# Lazy-loads classifier from S3
classifier = ClassifierLoader.load()
prediction = classifier.predict(data_point_processed)

Expand Down
8 changes: 7 additions & 1 deletion jobs/ml-ops/terraform/container.tf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ resource "scaleway_container" "inference" {
cpu_limit = 2000
memory_limit = 2048
min_scale = 1
max_scale = 5
max_scale = 1
Copy link
Contributor Author

@redanrd redanrd May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to preserve the container state as we load the model in memory. Having many instances would possibly lead to ones with no model loaded in memory so they would fail when requested for inference.

environment_variables = {
"S3_BUCKET_NAME" = scaleway_object_bucket.main.name
"S3_URL" = var.s3_url
Expand All @@ -24,3 +24,9 @@ resource "scaleway_container" "inference" {
}
deploy = true
}

resource scaleway_container_cron "inference_cron" {
container_id = scaleway_container.inference.id
schedule = var.inference_cron_schedule
args = jsonencode({})
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 3 additions & 0 deletions jobs/ml-ops/terraform/images.tf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ resource "docker_image" "inference" {
name = "${scaleway_registry_namespace.main.endpoint}/inference:${var.image_version}"
build {
context = "${path.cwd}/../inference"
no_cache = true
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
}

provisioner "local-exec" {
Expand All @@ -19,6 +20,7 @@ resource "docker_image" "data" {
name = "${scaleway_registry_namespace.main.endpoint}/data:${var.image_version}"
build {
context = "${path.cwd}/../data"
no_cache = true
}

provisioner "local-exec" {
Expand All @@ -30,6 +32,7 @@ resource "docker_image" "training" {
name = "${scaleway_registry_namespace.main.endpoint}/training:${var.image_version}"
build {
context = "${path.cwd}/../training"
no_cache = true
}

provisioner "local-exec" {
Expand Down
10 changes: 8 additions & 2 deletions jobs/ml-ops/terraform/jobs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ resource "scaleway_job_definition" "fetch_data" {
memory_limit = 1024
image_uri = docker_image.data.name
timeout = "10m"

cron {
schedule = var.data_fetch_cron_schedule
timezone = "Europe/Paris"
}
env = {
"S3_BUCKET_NAME" : scaleway_object_bucket.main.name,
"S3_URL" : var.s3_url,
Expand All @@ -20,7 +23,10 @@ resource "scaleway_job_definition" "training" {
memory_limit = 4096
image_uri = docker_image.training.name
timeout = "10m"

cron {
schedule = var.training_cron_schedule
timezone = "Europe/Paris"
}
env = {
"S3_BUCKET_NAME" : scaleway_object_bucket.main.name,
"S3_URL" : var.s3_url,
Expand Down
18 changes: 10 additions & 8 deletions jobs/ml-ops/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ variable "s3_url" {
default = "https://s3.fr-par.scw.cloud"
}

variable "data_file" {
type = string
description = "name data file in data store"
default = "bank_telemarketing.csv"
variable "data_fetch_cron_schedule" {
type = string
default = "* */10 * * *"
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
}

variable "model_object" {
type = string
description = "name of model object stored in model registry"
default = "classifier.pkl"
variable "training_cron_schedule" {
type = string
default = "* */11 * * *"
}

variable "inference_cron_schedule" {
type = string
default = "* */12 * * *"
}
1 change: 1 addition & 0 deletions jobs/ml-ops/terraform/versions.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ terraform {
required_providers {
scaleway = {
source = "scaleway/scaleway"
version = ">= 2.39"
}
docker = {
source = "kreuzwerker/docker"
Expand Down
18 changes: 10 additions & 8 deletions jobs/ml-ops/training/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import pandas as pd
import os
import pickle
from datetime import datetime

import boto3
import pandas as pd
import training as ml
from sklearn.metrics import RocCurveDisplay
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay

DATA_FILE_NAME = "bank-additional-full.csv"
VERSION = datetime.now().strftime("%Y%m%d%H%M")

MODEL_FILE = "classifier.pkl"
PERF_FILE = "performance.pkl"
ROC_AUC_FILE = "roc_auc.png"
CONFUSION_MATRIX_FILE = "confusion_matrix.png"
DATA_FILE_NAME = "bank-additional-full.csv"
MODEL_FILE = "classifier_" + VERSION + ".pkl"
PERF_FILE = "performance_" + VERSION + ".pkl"
ROC_AUC_FILE = "roc_auc_" + VERSION + ".png"
CONFUSION_MATRIX_FILE = "confusion_matrix_" + VERSION + ".png"


def main() -> int:
Expand Down
7 changes: 3 additions & 4 deletions jobs/ml-ops/training/training.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import pandas as pd
import numpy as np
import pandas as pd
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, log_loss, precision_score, recall_score
from sklearn.model_selection import RandomizedSearchCV, train_test_split


def transform_data(data: pd.DataFrame) -> pd.DataFrame:
Expand Down
Loading