diff --git a/.gitignore b/.gitignore index 13579d3..cca22b2 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,7 @@ node_modules/ .serverless/ # Env files -*.env \ No newline at end of file +*.env + +# Python +venv/ diff --git a/jobs/ml-ops/README.md b/jobs/ml-ops/README.md index 4667276..36e52a6 100644 --- a/jobs/ml-ops/README.md +++ b/jobs/ml-ops/README.md @@ -1,67 +1,96 @@ # Serverless MLOps -In this example, we train and deploy a binary classification inference API using serverless computing resources (job+container). We use object storage resources to store data and training artifacts. We use container registry to store docker images. +In this example, we train and deploy a binary classification inference model using Scaleway Serverless. To do this, we use the following resources: -## Use case: Bank Telemarketing +1. Serverless Job for training +2. Serverless Job to populate data in S3 +3. Serverless Container for inference -### Context +We use object storage to share data between the two. + +## Context + +In this example we use a bank telemarketing dataset to predict if a client would engage in a term deposit subscription. + +This dataset records marketing phone calls made to clients. The outcome of the phone call is in shown in the `y` column: -We use a bank telemarketing dataset to predict if a client would engage in a term deposit subscription. This dataset records marketing phone calls made to clients. The outcome of the phone call is in shown in the `y` column: * `0` : no subscription * `1` : subscription -### Data Source +## Data Source The dataset has many versions and is open-sourced and published [here](http://archive.ics.uci.edu/dataset/222/bank+marketing) on the UCI Machine Leaning repository and is close to the one analyzed in the following research work: * [Moro et al., 2014] S. Moro, P. Cortez and P. Rita. A Data-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014 -We use the dataset labelled in the source as `bank-additional-full.csv`. You can download, extract this file, rename it to `bank_telemarketing.csv` then put it under this [directory](./s3/data-store/data/). +## Running the example -## How to deploy your MLOps pipeline on Scaleway Cloud? - -### Step A: Create cloud resources for the ML pipeline +### Step 1. Provision resources with Terraform -Create `.env` file in `jobs/data-loader-job` and `jobs/ml-job` directories and fill them as it follows: +Set your Scaleway access key, secret key and project ID in environment variables: -```text -SCW_ACCESS_KEY= -SCW_SECRET_KEY= +```console +export TF_VAR_access_key= +export TF_VAR_secret_key= +export TF_VAR_project_id= + +cd terraform +terraform init +terraform plan +terraform apply ``` -Create `.tfvars` file in `/terraform` directory and put variable values in it: +### Step 2. Run the data and training Jobs + +*At the time of writing, the Scaleway CLI does not support Jobs, so we use a Python script* ``` -region = "fr-par" -access_key = "" -secret_key = "" -project_id = "" -data_file = "bank_telemarketing.csv" -model_object = "classifier.pkl" -image_version = "v1" +cd scripts + +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +python3 run upload +python3 run training ``` -Then perform: +You can then check your Job runs in the [Jobs Console](https://console.scaleway.com/serverless-jobs/jobs). + +### Step 4. Use the inference API -```bash -cd terraform -terraform init -terraform plan -var-file=testing.tfvars -terraform apply -var-file=testing.tfvars ``` +export INFERENCE_URL=$(terraform output endpoint) -### Step B: Define and run a job to ship data from public source to s3 +curl -X POST \ + -H "Content-Type: application/json" \ + -d @inference/example.json + $INFERENCE_URL +``` -Use the console to define and run the data loader job using image pushed to Scaleway registry. +## Local testing -cf. this [readme](./jobs/data-loader-job/README.md) +To test the example locally you can use [Docker Compose](https://docs.docker.com/compose/install/). -### Step C: Define and run the ML job to train classifier +``` +# Build the containers locally +docker compose build -Use the console to define and the ML job using image pushed to Scaleway registry. +# Run the data job +docker compose run data -cf. this [readme](./jobs/ml-job/README.md) +# Run the training +docker compose run training -### Step D: Call your serverless container to (re)load model and to get inference results +# Start the inference server +docker compose up inference +``` + +Access the inference API locally: -cf. this [readme](./containers/inference-api/README.md) +``` +curl -X POST \ + -H "Content-Type: application/json" \ + -d @inference/example.json + http://localhost:8080 +``` diff --git a/jobs/ml-ops/containers/inference-api/README.md b/jobs/ml-ops/containers/inference-api/README.md deleted file mode 100644 index fb162da..0000000 --- a/jobs/ml-ops/containers/inference-api/README.md +++ /dev/null @@ -1,15 +0,0 @@ -# Deploy an inference API on Scaleway Serverless Containers - -## Test the inference API using HTTP calls - -### Step 1: (Re)Load the classifier after model training - -```bash -curl -H "X-Auth-Token: $CONTAINER_TOKEN" "/load_classifier" -``` - -### Step 2: Call inference endpoint - -```bash -curl -H "X-Auth-Token: $CONTAINER_TOKEN" -X POST "/inference" -H "Content-Type: application/json" -d '{"age": 44, "job": "blue-collar", "marital": "married", "education": "basic.4y", "default": "unknown", "housing": "yes", "loan": "no", "contact": "cellular", "month": "aug", "day_of_week": "thu", "duration": 210, "campaign": 1, "pdays": 999, "previous": "0", "poutcome": "nonexistent", "emp_var_rate": 1.4, "cons_price_idx": 93.444, "cons_conf_idx": -36.1, "euribor3m": 4.963, "nr_employed": 5228.1}' -``` diff --git a/jobs/ml-ops/containers/inference-api/main.py b/jobs/ml-ops/containers/inference-api/main.py deleted file mode 100644 index ed566a3..0000000 --- a/jobs/ml-ops/containers/inference-api/main.py +++ /dev/null @@ -1,43 +0,0 @@ -from fastapi import FastAPI -from sklearn.ensemble import RandomForestClassifier -import data_processing as process -import pickle, boto3, pandas, os - -classifier = RandomForestClassifier() - -app = FastAPI() - - -@app.get("/load_classifier") -def load_classifier(): - """(Re)loads classifier from model registry bucket""" - - s3 = boto3.resource( - "s3", - region_name=os.environ["MAIN_REGION"], - use_ssl=True, - endpoint_url=f'https://s3.{os.environ["MAIN_REGION"]}.scw.cloud', - aws_access_key_id=os.environ["SCW_ACCESS_KEY"], - aws_secret_access_key=os.environ["SCW_SECRET_KEY"], - ) - - bucket = s3.Bucket(name=os.environ["MODEL_REGISTRY"]) # type: ignore - bucket.download_file(os.environ["MODEL_FILE"], os.environ["MODEL_FILE"]) - - global classifier - classifier = pickle.load(open(os.environ["MODEL_FILE"], "rb")) - - return {"message": "model loaded successfully"} - - -@app.post("/inference") -def classify(data: process.ClientProfile): - """Predicts class given client profile""" - - data_point_json = data.model_dump() - data_point_pd = pandas.DataFrame(index=[0], data=data_point_json) - data_point_processed = process.transform_data(process.clean_data(data_point_pd)) - global classifier - prediction = classifier.predict(data_point_processed) - - return {"predicted_class": int(prediction)} diff --git a/jobs/ml-ops/data/.gitignore b/jobs/ml-ops/data/.gitignore new file mode 100644 index 0000000..93db21b --- /dev/null +++ b/jobs/ml-ops/data/.gitignore @@ -0,0 +1 @@ +dataset/ diff --git a/jobs/ml-ops/data/Dockerfile b/jobs/ml-ops/data/Dockerfile new file mode 100644 index 0000000..cdef988 --- /dev/null +++ b/jobs/ml-ops/data/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim-bookworm + +WORKDIR /app + +RUN apt-get update +RUN apt-get install -y \ + curl \ + unzip + +RUN pip install --upgrade pip +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY . . + +CMD ["python", "main.py"] diff --git a/jobs/ml-ops/data/main.py b/jobs/ml-ops/data/main.py new file mode 100644 index 0000000..add6fb3 --- /dev/null +++ b/jobs/ml-ops/data/main.py @@ -0,0 +1,57 @@ +import boto3 +import os +import urllib.request +import zipfile + +DATA_DIR = "dataset" + +ZIP_URL = "http://archive.ics.uci.edu/static/public/222/bank+marketing.zip" +ZIP_DOWNLOAD_PATH = os.path.join(DATA_DIR, "downloaded.zip") +NESTED_ZIP_PATH = os.path.join(DATA_DIR, "bank-additional.zip") + +DATA_FILE = "bank-additional.csv" +DATA_CSV_PATH = os.path.join(DATA_DIR, "bank-additional", DATA_FILE) + + +def main(): + """Pulls file from source, and uploads to a target S3 bucket""" + + # Download the zip + os.makedirs(DATA_DIR, exist_ok=True) + urllib.request.urlretrieve(ZIP_URL, ZIP_DOWNLOAD_PATH) + + # Extract + with zipfile.ZipFile(ZIP_DOWNLOAD_PATH, "r") as fh: + fh.extractall(DATA_DIR) + + # Remove original zip + os.remove(ZIP_DOWNLOAD_PATH) + + # Extract zips within the zip + with zipfile.ZipFile(NESTED_ZIP_PATH) as fh: + fh.extractall(DATA_DIR) + + access_key = os.environ["SCW_ACCESS_KEY"] + secret_key = os.environ["SCW_SECRET_KEY"] + bucket_name = os.environ["S3_BUCKET_NAME"] + region_name = os.environ["SCW_REGION"] + s3_url = f"https://s3.{region_name}.scw.cloud" + + s3 = boto3.resource( + "s3", + region_name=region_name, + use_ssl=True, + endpoint_url=s3_url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + bucket = s3.Bucket(name=bucket_name) + bucket.upload_file( + Filename=DATA_CSV_PATH, + Key=DATA_FILE, + ) + + +if __name__ == "__main__": + main() diff --git a/jobs/ml-ops/data/requirements.txt b/jobs/ml-ops/data/requirements.txt new file mode 100644 index 0000000..edc92cc --- /dev/null +++ b/jobs/ml-ops/data/requirements.txt @@ -0,0 +1,2 @@ +boto3==1.33.2 +requests==2.31.0 diff --git a/jobs/ml-ops/docker-compose.yml b/jobs/ml-ops/docker-compose.yml new file mode 100644 index 0000000..80ecbaf --- /dev/null +++ b/jobs/ml-ops/docker-compose.yml @@ -0,0 +1,37 @@ +version: "3" + +services: + data: + build: + context: ./data + depends_on: + - minio + + training: + build: + context: ./training + depends_on: + - minio + + inference: + build: + context: ./inference + ports: + - 8080:80 + depends_on: + - minio + + minio: + image: minio/minio + ports: + - "9000:9000" + - "9001:9001" + volumes: + - minio_storage:/data + environment: + MINIO_ROOT_USER: example + MINIO_ROOT_PASSWORD: example + command: server --console-address ":9001" /data + +volumes: + minio_storage: {} diff --git a/jobs/ml-ops/containers/inference-api/Dockerfile b/jobs/ml-ops/inference/Dockerfile similarity index 80% rename from jobs/ml-ops/containers/inference-api/Dockerfile rename to jobs/ml-ops/inference/Dockerfile index 137c85a..bd779e8 100644 --- a/jobs/ml-ops/containers/inference-api/Dockerfile +++ b/jobs/ml-ops/inference/Dockerfile @@ -2,9 +2,9 @@ FROM python:3.12-slim-bookworm WORKDIR /app -COPY . . - RUN pip install --upgrade pip +COPY requirements.txt . RUN pip install -r requirements.txt -CMD ["uvicorn", "main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] \ No newline at end of file +COPY . . +CMD ["uvicorn", "main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] diff --git a/jobs/ml-ops/containers/inference-api/data_processing.py b/jobs/ml-ops/inference/data.py similarity index 91% rename from jobs/ml-ops/containers/inference-api/data_processing.py rename to jobs/ml-ops/inference/data.py index 0f9a94e..d570cb8 100644 --- a/jobs/ml-ops/containers/inference-api/data_processing.py +++ b/jobs/ml-ops/inference/data.py @@ -28,17 +28,21 @@ class ClientProfile(BaseModel): nr_employed: float -def clean_data(data: pd.DataFrame) -> pd.DataFrame: +def clean_profile(profile: ClientProfile) -> pd.DataFrame: """Removes rows with missing value(s)""" - data = data.dropna() - return data + profile_json = profile.model_dump() + + cleaned = pd.DataFrame(index=[0], data=profile_json) + cleaned = cleaned.dropna() + + return cleaned def transform_data(data: pd.DataFrame) -> pd.DataFrame: """ - This method handles the transformation of categorical variables of the dataset into 0/1 indicators. - It also adds missing categorical variables that are by default false (0). + Transforms categorical variables of the dataset into 0/1 indicators. + Adds missing categorical variables that are by default false (0). """ # # use the same category for basic education sub-categories diff --git a/jobs/ml-ops/inference/example.json b/jobs/ml-ops/inference/example.json new file mode 100644 index 0000000..d973ab8 --- /dev/null +++ b/jobs/ml-ops/inference/example.json @@ -0,0 +1,22 @@ +{ + "age": 44, + "job": "blue-collar", + "marital": "married", + "education": "basic.4y", + "default": "unknown", + "housing": "yes", + "loan": "no", + "contact": "cellular", + "month": "aug", + "day_of_week": "thu", + "duration": 210, + "campaign": 1, + "pdays": 999, + "previous": "0", + "poutcome": "nonexistent", + "emp_var_rate": 1.4, + "cons_price_idx": 93.444, + "cons_conf_idx": -36.1, + "euribor3m": 4.963, + "nr_employed": 5228.1 +} diff --git a/jobs/ml-ops/inference/main.py b/jobs/ml-ops/inference/main.py new file mode 100644 index 0000000..995fad4 --- /dev/null +++ b/jobs/ml-ops/inference/main.py @@ -0,0 +1,66 @@ +from fastapi import FastAPI +from sklearn.ensemble import RandomForestClassifier +from sklearn.metrics import RocCurveDisplay +import pickle +import boto3 +import pandas +import os + +import data + +classifier = RandomForestClassifier() + +app = FastAPI() + +MODEL_FILE = "classifier.pkl" + + +class ClassifierLoader(object): + _classifier = None + + @classmethod + def load(cls, force=False): + if force or cls._classifier is None: + access_key = os.environ["SCW_ACCESS_KEY"] + secret_key = os.environ["SCW_SECRET_KEY"] + region = os.environ["SCW_REGION"] + url = f"https://s3.{region}.scw.cloud" + s3_bucket = os.environ["S3_BUCKET_NAME"] + + s3 = boto3.resource( + "s3", + region_name=region, + use_ssl=True, + endpoint_url=url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + bucket = s3.Bucket(name=s3_bucket) + bucket.download_file(MODEL_FILE, MODEL_FILE) + + with open(MODEL_FILE, "rb") as fh: + cls._classifier = pickle.load(fh) + + return cls._classifier + + +@app.get("/load") +def load(): + """(Re)loads classifier from model registry bucket""" + ClassifierLoader.load(force=True) + + return {"message": "model loaded successfully"} + + +@app.post("/inference") +def classify(profile: data.ClientProfile): + """Predicts class given client profile""" + + cleaned_data = data.clean_profile(profile) + data_point_processed = data.transform_data(cleaned_data) + + classifier = ClassifierLoader.load() + prediction = classifier.predict(data_point_processed) + + return {"predicted_class": int(prediction)} diff --git a/jobs/ml-ops/containers/inference-api/requirements.txt b/jobs/ml-ops/inference/requirements.txt similarity index 83% rename from jobs/ml-ops/containers/inference-api/requirements.txt rename to jobs/ml-ops/inference/requirements.txt index f03fe61..188415c 100644 --- a/jobs/ml-ops/containers/inference-api/requirements.txt +++ b/jobs/ml-ops/inference/requirements.txt @@ -4,4 +4,3 @@ uvicorn==0.24.0.post1 pandas==2.1.2 numpy==1.26.2 scikit-learn==1.3.2 -python-dotenv==1.0.0 \ No newline at end of file diff --git a/jobs/ml-ops/jobs/data-loader-job/Dockerfile b/jobs/ml-ops/jobs/data-loader-job/Dockerfile deleted file mode 100644 index dd0d05a..0000000 --- a/jobs/ml-ops/jobs/data-loader-job/Dockerfile +++ /dev/null @@ -1,20 +0,0 @@ -FROM python:3.12-slim-bookworm - -WORKDIR /data-loader-job - -COPY . . - -# Get data from public source -RUN apt-get update -RUN apt-get install curl unzip --assume-yes -RUN curl http://archive.ics.uci.edu/static/public/222/bank+marketing.zip -o bank-telemarketing.zip -RUN unzip bank-telemarketing.zip -RUN unzip bank-additional.zip -RUN cp ./bank-additional/bank-additional-full.csv ./data/ -RUN mv data/bank-additional-full.csv data/bank_telemarketing.csv - -# Push data to data store -RUN pip install --upgrade pip -RUN pip install -r requirements.txt - -CMD ["python", "main.py"] \ No newline at end of file diff --git a/jobs/ml-ops/jobs/data-loader-job/README.md b/jobs/ml-ops/jobs/data-loader-job/README.md deleted file mode 100644 index 029ef09..0000000 --- a/jobs/ml-ops/jobs/data-loader-job/README.md +++ /dev/null @@ -1,20 +0,0 @@ -# Data Loader Job - -## Before pushing docker image to private registry - -Create and fill `.env` file with these variables with appropriate values: - -```bash -SCW_ACCESS_KEY=my_access_key -SCW_SECRET_KEY=my_secret_key -``` - -## Define and run data loader job on the console - -Use these environment variables for your job: - -```text -SCW_S3_BUCKET= -SCW_REGION=fr-par -SOURCE_FILE_NAME=bank_telemarketing.csv -``` \ No newline at end of file diff --git a/jobs/ml-ops/jobs/data-loader-job/data/bank_telemarketing_dummy.csv b/jobs/ml-ops/jobs/data-loader-job/data/bank_telemarketing_dummy.csv deleted file mode 100644 index 10bea55..0000000 --- a/jobs/ml-ops/jobs/data-loader-job/data/bank_telemarketing_dummy.csv +++ /dev/null @@ -1 +0,0 @@ -# Download the csv file from source (cf. example's main README) \ No newline at end of file diff --git a/jobs/ml-ops/jobs/data-loader-job/main.py b/jobs/ml-ops/jobs/data-loader-job/main.py deleted file mode 100644 index 3790231..0000000 --- a/jobs/ml-ops/jobs/data-loader-job/main.py +++ /dev/null @@ -1,29 +0,0 @@ -from dotenv import load_dotenv -import os, sys, boto3 - - -def main() -> int: - """Uploads a local CSV file to a target S3 bucket""" - - load_dotenv(dotenv_path="./.env") - - s3 = boto3.resource( - "s3", - region_name=os.environ["SCW_REGION"], - use_ssl=True, - endpoint_url=f'https://s3.{os.environ["SCW_REGION"]}.scw.cloud', - aws_access_key_id=os.environ["SCW_ACCESS_KEY"], - aws_secret_access_key=os.environ["SCW_SECRET_KEY"], - ) - - bucket = s3.Bucket(name=os.environ["SCW_DATA_STORE"]) # type: ignore - bucket.upload_file( - Filename="./data/" + os.environ["DATA_FILE_NAME"], - Key=os.environ["DATA_FILE_NAME"], - ) - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/jobs/ml-ops/jobs/data-loader-job/requirements.txt b/jobs/ml-ops/jobs/data-loader-job/requirements.txt deleted file mode 100644 index 6d6870c..0000000 --- a/jobs/ml-ops/jobs/data-loader-job/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -boto3==1.33.2 -python-dotenv==1.0.0 \ No newline at end of file diff --git a/jobs/ml-ops/jobs/ml-job/README.md b/jobs/ml-ops/jobs/ml-job/README.md deleted file mode 100644 index 7076276..0000000 --- a/jobs/ml-ops/jobs/ml-job/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# ML job for binary classification - -## Before pushing docker image to private registry - -Create and fill `.env` file with these variables with appropriate values: - -```bash -SCW_ACCESS_KEY=my_access_key -SCW_SECRET_KEY=my_secret_key -``` - -## Define and run an ML job - -You can create a job definition on the console using the private registry image. Run the job and check that training artifacts are uploaded to object storage buckets. - -Define these environment variables during job run: - -```text -SCW_S3_BUCKET_DATA= -DATA_FILE_NAME=bank_telemarketing.csv -SCW_S3_BUCKET_MODEL= -MODEL_FILE_NAME=classifier.pkl -SCW_S3_BUCKET_PERF= -SCW_REGION=fr-par -``` \ No newline at end of file diff --git a/jobs/ml-ops/jobs/ml-job/main.py b/jobs/ml-ops/jobs/ml-job/main.py deleted file mode 100644 index 4b18f88..0000000 --- a/jobs/ml-ops/jobs/ml-job/main.py +++ /dev/null @@ -1,70 +0,0 @@ -import sys, os, pickle, boto3 -import ml_training as ml -from dotenv import load_dotenv - - -def main() -> int: - """ - Trains a classifier on data pulled from a data store. - Uploads training/test artifacts into artifact data stores. - """ - - load_dotenv(dotenv_path="./.env") - - s3 = boto3.resource( - "s3", - region_name=os.environ["SCW_REGION"], - use_ssl=True, - endpoint_url=f'https://s3.{os.environ["SCW_REGION"]}.scw.cloud', - aws_access_key_id=os.environ["SCW_ACCESS_KEY"], - aws_secret_access_key=os.environ["SCW_SECRET_KEY"], - ) - - # download data locally from data store - data_store = s3.Bucket(name=os.environ["SCW_DATA_STORE"]) # type: ignore - data_store.download_file( - os.environ["DATA_FILE_NAME"], "./data/" + os.environ["DATA_FILE_NAME"] - ) - data = ml.load_data("./data/" + os.environ["DATA_FILE_NAME"]) - cleaned_data = ml.clean_data(data) - transformed_data = ml.transform_data(cleaned_data) - - X_train, X_test, y_train, y_test = ml.split_to_train_test_data(transformed_data) - X_train, y_train = ml.over_sample_target_class(X_train, y_train) - - # train and upload classifier to model registry - classifier, _ = ml.tune_classifier(X_train, y_train) - pickle.dump(classifier, open(os.environ["MODEL_FILE_NAME"], "wb")) - model_registry = s3.Bucket(name=os.environ["SCW_MODEL_REGISTRY"]) # type: ignore - model_registry.upload_file( - Filename="/ml-job/" + os.environ["MODEL_FILE_NAME"], - Key=os.environ["MODEL_FILE_NAME"], - ) - - # compute performance on test data - y_pred = ml.predict_on_test_data(classifier, X_test) - y_pred_prob = ml.predict_prob_on_test_data(classifier, X_test) - test_metrics = ml.compute_performance_metrics(y_test, y_pred, y_pred_prob) - pickle.dump(test_metrics, open("performance_metrics.pkl", "wb")) - performance_monitor = s3.Bucket(name=os.environ["SCW_PERF_MONITOR"]) # type: ignore - performance_monitor.upload_file( - Filename="/ml-job/performance_metrics.pkl", Key="performance_metrics.pkl" - ) - - # save roc_auc plot - ml.save_roc_plot(classifier, X_test, y_test) - performance_monitor.upload_file( - Filename="/ml-job/roc_auc_curve.png", Key="roc_auc_curve.png" - ) - - # save confusion matrix - ml.save_confusion_matrix_plot(classifier, X_test, y_test) - performance_monitor.upload_file( - Filename="/ml-job/confusion_matrix.png", Key="confusion_matrix.png" - ) - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/jobs/ml-ops/terraform/container.tf b/jobs/ml-ops/terraform/container.tf index c62f29a..7e8f6aa 100644 --- a/jobs/ml-ops/terraform/container.tf +++ b/jobs/ml-ops/terraform/container.tf @@ -1,65 +1,29 @@ -provider "docker" { - host = "unix:///var/run/docker.sock" - - registry_auth { - address = scaleway_registry_namespace.inference_api_image_registry.endpoint - username = "nologin" - password = var.secret_key - } - - registry_auth { - address = scaleway_registry_namespace.data_loader_image_registry.endpoint - username = "nologin" - password = var.secret_key - } - - registry_auth { - address = scaleway_registry_namespace.ml_job_image_registry.endpoint - username = "nologin" - password = var.secret_key - } -} - -resource "docker_image" "inference_api_image" { - name = "${scaleway_registry_namespace.inference_api_image_registry.endpoint}/inference-api:${var.image_version}" - build { - context = "${path.cwd}/../container/inference-api" - } - - provisioner "local-exec" { - command = "docker push ${docker_image.inference_api_image.name}" - } -} - -resource "scaleway_container_namespace" "inference_api_namespace" { - name = "ml-inference-${random_string.random_suffix.result}" - description = "Serving inference models deployed as serverless containers" +resource "scaleway_container_namespace" "main" { + name = "ml-ops-example-${random_string.random_suffix.result}" + description = "MLOps example" } -resource "scaleway_container" "inference_api_container" { - name = "inference-api-${random_string.random_suffix.result}" - description = "Serving an inference API" - namespace_id = scaleway_container_namespace.inference_api_namespace.id - registry_image = docker_image.inference_api_image.name +resource "scaleway_container" "inference" { + name = "inference" + description = "Inference serving API" + namespace_id = scaleway_container_namespace.main.id + registry_image = docker_image.inference.name port = 80 - cpu_limit = 1120 + cpu_limit = 2000 memory_limit = 2048 min_scale = 1 max_scale = 5 environment_variables = { - "MODEL_REGISTRY" = scaleway_object_bucket.model_registry.name - "MAIN_REGION" = var.region - "MODEL_FILE" = "classifier.pkl" + "S3_BUCKET_NAME" = scaleway_object_bucket.main.name + "SCW_REGION" = var.region } secret_environment_variables = { "SCW_ACCESS_KEY" = var.access_key "SCW_SECRET_KEY" = var.secret_key } - privacy = "private" - protocol = "http1" deploy = true } -resource "scaleway_container_token" "inference_api_token" { - container_id = scaleway_container.inference_api_container.id +output "endpoint" { + value = scaleway_container.inference.domain_name } diff --git a/jobs/ml-ops/terraform/images.tf b/jobs/ml-ops/terraform/images.tf new file mode 100644 index 0000000..bd567ee --- /dev/null +++ b/jobs/ml-ops/terraform/images.tf @@ -0,0 +1,38 @@ +resource "scaleway_registry_namespace" "main" { + name = "ml-ops-example-${random_string.random_suffix.result}" + region = var.region + project_id = var.project_id +} + +resource "docker_image" "inference" { + name = "${scaleway_registry_namespace.main.endpoint}/inference:0.0.1" + build { + context = "${path.cwd}/../inference" + } + + provisioner "local-exec" { + command = "docker push ${docker_image.inference.name}" + } +} + +resource "docker_image" "data" { + name = "${scaleway_registry_namespace.main.endpoint}/data:${var.image_version}" + build { + context = "${path.cwd}/../data" + } + + provisioner "local-exec" { + command = "docker push ${docker_image.data.name}" + } +} + +resource "docker_image" "training" { + name = "${scaleway_registry_namespace.main.endpoint}/training:${var.image_version}" + build { + context = "${path.cwd}/../training" + } + + provisioner "local-exec" { + command = "docker push ${docker_image.training.name}" + } +} diff --git a/jobs/ml-ops/terraform/jobs.tf b/jobs/ml-ops/terraform/jobs.tf index 205ffdf..2d24077 100644 --- a/jobs/ml-ops/terraform/jobs.tf +++ b/jobs/ml-ops/terraform/jobs.tf @@ -1,21 +1,29 @@ -resource "docker_image" "data_loader_image" { - name = "${scaleway_registry_namespace.data_loader_image_registry.endpoint}/data-loader:${var.image_version}" - build { - context = "${path.cwd}/../jobs/data-loader-job" - } +resource scaleway_job_definition data { + name = "data" + cpu_limit = 1000 + memory_limit = 1024 + image_uri = docker_image.data.name + timeout = "10m" - provisioner "local-exec" { - command = "docker push ${docker_image.data_loader_image.name}" + env = { + "S3_BUCKET_NAME": scaleway_object_bucket.main.name, + "SCW_ACCESS_KEY": var.access_key, + "SCW_SECRET_KEY": var.secret_key, + "SCW_REGION" = var.region } } -resource "docker_image" "ml_job_image" { - name = "${scaleway_registry_namespace.ml_job_image_registry.endpoint}/ml-job:${var.image_version}" - build { - context = "${path.cwd}/../jobs/ml-job" - } +resource scaleway_job_definition training { + name = "training" + cpu_limit = 2000 + memory_limit = 2048 + image_uri = docker_image.training.name + timeout = "10m" - provisioner "local-exec" { - command = "docker push ${docker_image.ml_job_image.name}" + env = { + "S3_BUCKET_NAME": scaleway_object_bucket.main.name, + "SCW_ACCESS_KEY": var.access_key, + "SCW_SECRET_KEY": var.secret_key, + "SCW_REGION" = var.region } } diff --git a/jobs/ml-ops/terraform/outputs.tf b/jobs/ml-ops/terraform/outputs.tf deleted file mode 100644 index eaa0902..0000000 --- a/jobs/ml-ops/terraform/outputs.tf +++ /dev/null @@ -1,8 +0,0 @@ -output "inference_api_endpoint" { - value = scaleway_container.inference_api_container.domain_name -} - -output "inference_api_token" { - value = scaleway_container_token.inference_api_token.token - sensitive = true -} diff --git a/jobs/ml-ops/terraform/providers.tf b/jobs/ml-ops/terraform/providers.tf index 57433fc..439df4d 100644 --- a/jobs/ml-ops/terraform/providers.tf +++ b/jobs/ml-ops/terraform/providers.tf @@ -4,3 +4,13 @@ provider "scaleway" { secret_key = var.secret_key project_id = var.project_id } + +provider "docker" { + host = "unix:///var/run/docker.sock" + + registry_auth { + address = scaleway_registry_namespace.main.endpoint + username = "nologin" + password = var.secret_key + } +} diff --git a/jobs/ml-ops/terraform/registry.tf b/jobs/ml-ops/terraform/registry.tf deleted file mode 100644 index c1b9f7b..0000000 --- a/jobs/ml-ops/terraform/registry.tf +++ /dev/null @@ -1,18 +0,0 @@ -resource "scaleway_registry_namespace" "inference_api_image_registry" { - name = "inference-api-images-${random_string.random_suffix.result}" - region = var.region - project_id = var.project_id -} - -resource "scaleway_registry_namespace" "data_loader_image_registry" { - name = "data-loder-images-${random_string.random_suffix.result}" - region = var.region - project_id = var.project_id -} - -resource "scaleway_registry_namespace" "ml_job_image_registry" { - name = "ml-job-images-${random_string.random_suffix.result}" - region = var.region - project_id = var.project_id -} - diff --git a/jobs/ml-ops/terraform/s3.tf b/jobs/ml-ops/terraform/s3.tf index e4c9e9f..da21579 100644 --- a/jobs/ml-ops/terraform/s3.tf +++ b/jobs/ml-ops/terraform/s3.tf @@ -1,11 +1,3 @@ -resource "scaleway_object_bucket" "data_store" { - name = "data-store-${random_string.random_suffix.result}" -} - -resource "scaleway_object_bucket" "model_registry" { - name = "model-registry-${random_string.random_suffix.result}" -} - -resource "scaleway_object_bucket" "performance_monitoring_record" { - name = "performance-monitoring-${random_string.random_suffix.result}" +resource "scaleway_object_bucket" "main" { + name = "ml-ops-${random_string.random_suffix.result}" } diff --git a/jobs/ml-ops/terraform/variables.tf b/jobs/ml-ops/terraform/variables.tf index 1ad7055..82f056e 100644 --- a/jobs/ml-ops/terraform/variables.tf +++ b/jobs/ml-ops/terraform/variables.tf @@ -1,7 +1,3 @@ -variable "region" { - type = string -} - variable "access_key" { type = string } @@ -14,17 +10,25 @@ variable "project_id" { type = string } +variable "region" { + type = string + default = "fr-par" +} + variable "data_file" { type = string description = "name data file in data store" + default = "bank_telemarketing.csv" } variable "model_object" { type = string description = "name of model object stored in model registry" + default = "classifier.pkl" } variable "image_version" { type = string + default = "0.0.1" } diff --git a/jobs/ml-ops/jobs/ml-job/Dockerfile b/jobs/ml-ops/training/Dockerfile similarity index 78% rename from jobs/ml-ops/jobs/ml-job/Dockerfile rename to jobs/ml-ops/training/Dockerfile index 854ede2..d459054 100644 --- a/jobs/ml-ops/jobs/ml-job/Dockerfile +++ b/jobs/ml-ops/training/Dockerfile @@ -1,10 +1,10 @@ FROM python:3.12-slim-bookworm -WORKDIR /ml-job/ - -COPY . . +WORKDIR /app RUN pip install --upgrade pip +COPY requirements.txt . RUN pip install -r requirements.txt +COPY . . CMD [ "python", "main.py" ] diff --git a/jobs/ml-ops/training/main.py b/jobs/ml-ops/training/main.py new file mode 100644 index 0000000..5cc2fb8 --- /dev/null +++ b/jobs/ml-ops/training/main.py @@ -0,0 +1,84 @@ +import pandas as pd +import os +import pickle +import boto3 +import ml_training as ml +from sklearn.metrics import RocCurveDisplay +from sklearn.metrics import ConfusionMatrixDisplay + +DATA_FILE_NAME = "foo.txt" +LOCAL_DATA_FILE_NAME = f"./data/{DATA_FILE_NAME}" + +MODEL_FILE = "classifier.pkl" +PERF_FILE = "performance.pkl" +ROC_AUC_FILE = "roc_auc.png" +CONFUSION_MATRIX_FILE = "confusion_matrix.png" + + +def main() -> int: + """ + Trains a classifier on data pulled from a data store. + Uploads training/test artifacts into artifact data stores. + """ + + region = os.environ["SCW_REGION"] + access_key = os.environ["SCW_ACCESS_KEY"] + secret_key = os.environ["SCW_ACCESS_KEY"] + bucket_name = os.environ["S3_BUCKET_NAME"] + s3_url = f"https://s3.{region}.scw.cloud" + + s3 = boto3.resource( + "s3", + region_name=region, + use_ssl=True, + endpoint_url=s3_url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + # Download data + s3_bucket = s3.Bucket(bucket_name) + s3_bucket.download_file(DATA_FILE_NAME, LOCAL_DATA_FILE_NAME) + data = pd.read_csv(LOCAL_DATA_FILE_NAME, sep=";") + + # Clean and transform data + cleaned_data = data.dropna() + transformed_data = ml.transform_data(cleaned_data) + + # Split train and test + x_train, x_test, y_train, y_test = ml.split_to_train_test_data(transformed_data) + x_train, y_train = ml.over_sample_target_class(x_train, y_train) + + # Train and upload classifier to s3 + classifier, _ = ml.tune_classifier(x_train, y_train) + + with open(MODEL_FILE, "wb") as fh: + pickle.dump(classifier, fh) + + s3_bucket.upload_file( + Filename=MODEL_FILE, + Key=MODEL_FILE, + ) + + # Compute performance on test data + y_pred = classifier.predict(x_test) + y_pred_prob = classifier.predict_proba(x_test) + test_metrics = ml.compute_performance_metrics(y_test, y_pred, y_pred_prob) + + with open(PERF_FILE, "wb") as fh: + pickle.dump(test_metrics, fh) + s3_bucket.upload_file(Filename=PERF_FILE, Key=PERF_FILE) + + # save roc_auc plot + display = RocCurveDisplay.from_estimator(classifier, x_test, y_test) + display.figure_.savefig(ROC_AUC_FILE) + s3_bucket.upload_file(Filename=ROC_AUC_FILE, Key=ROC_AUC_FILE) + + # save confusion matrix + display = ConfusionMatrixDisplay.from_estimator(classifier, x_test, y_test) + display.figure_.savefig(CONFUSION_MATRIX_FILE) + s3_bucket.upload_file(Filename=CONFUSION_MATRIX_FILE, Key=CONFUSION_MATRIX_FILE) + + +if __name__ == "__main__": + main() diff --git a/jobs/ml-ops/jobs/ml-job/requirements.txt b/jobs/ml-ops/training/requirements.txt similarity index 82% rename from jobs/ml-ops/jobs/ml-job/requirements.txt rename to jobs/ml-ops/training/requirements.txt index d7942ba..b8fb282 100644 --- a/jobs/ml-ops/jobs/ml-job/requirements.txt +++ b/jobs/ml-ops/training/requirements.txt @@ -4,4 +4,3 @@ scikit-learn==1.3.2 imblearn==0.0 matplotlib==3.8.2 boto3==1.33.2 -python-dotenv==1.0.0 \ No newline at end of file diff --git a/jobs/ml-ops/jobs/ml-job/ml_training.py b/jobs/ml-ops/training/training.py similarity index 61% rename from jobs/ml-ops/jobs/ml-job/ml_training.py rename to jobs/ml-ops/training/training.py index 627a12e..7642570 100644 --- a/jobs/ml-ops/jobs/ml-job/ml_training.py +++ b/jobs/ml-ops/training/training.py @@ -3,28 +3,14 @@ from imblearn.over_sampling import SMOTE from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss -from sklearn.metrics import RocCurveDisplay -from sklearn.metrics import ConfusionMatrixDisplay from sklearn.model_selection import RandomizedSearchCV from sklearn.ensemble import RandomForestClassifier -def load_data(path: str) -> pd.DataFrame: - data = pd.read_csv(path, sep=";") - return data - - -def clean_data(data: pd.DataFrame) -> pd.DataFrame: - """Removes rows with missing value(s)""" - - data = data.dropna() - return data - - def transform_data(data: pd.DataFrame) -> pd.DataFrame: """Handles the transformation of categorical variables of the dataset into 0/1 indicators""" - # use the same category for basic education sub-categories + # Use the same category for basic education sub-categories data["education"] = np.where( data["education"] == "basic.9y", "Basic", data["education"] ) @@ -35,7 +21,7 @@ def transform_data(data: pd.DataFrame) -> pd.DataFrame: data["education"] == "basic.4y", "Basic", data["education"] ) - # transform all categorical variables into 0/1 indicators and remove columns with string categories + # Transform categorical variables into 0/1 indicators and remove columns with string categories cat_vars = [ "job", "marital", @@ -57,11 +43,11 @@ def transform_data(data: pd.DataFrame) -> pd.DataFrame: to_keep = [i for i in data_vars if i not in cat_vars] data = data[to_keep] - # normalize column naming + # Normalize column naming data.columns = data.columns.str.replace(".", "_") data.columns = data.columns.str.replace(" ", "_") - # replace yes/no by 1/0 for target variable y + # Replace yes/no by 1/0 for target variable y data["y"] = data["y"].replace(to_replace=["yes", "no"], value=[1, 0]) return data @@ -72,48 +58,30 @@ def split_to_train_test_data( ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Extracts target (predicted) variable and splits data into training/test data""" - # extract target (predicted) variable - X = data.loc[:, data.columns != "y"] # type: ignore - y = data.loc[:, data.columns == "y"] # type: ignore + # Extract target (predicted) variable + x = data.loc[:, data.columns != "y"] + y = data.loc[:, data.columns == "y"] - X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=0.2, stratify=y, random_state=50 + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=0.2, stratify=y, random_state=50 ) - return X_train, X_test, y_train, y_test + return x_train, x_test, y_train, y_test def over_sample_target_class( - X_train: pd.DataFrame, y_train: pd.DataFrame + x_train: pd.DataFrame, y_train: pd.DataFrame ) -> tuple[pd.DataFrame, pd.DataFrame]: """Resamples training data""" over_sampler = SMOTE(random_state=0) - sampled_data_X, sampled_data_y = over_sampler.fit_resample(X_train, y_train) # type: ignore + sampled_data_x, sampled_data_y = over_sampler.fit_resample(x_train, y_train) - return pd.DataFrame(data=sampled_data_X, columns=X_train.columns), pd.DataFrame( + return pd.DataFrame(data=sampled_data_x, columns=x_train.columns), pd.DataFrame( data=sampled_data_y, columns=["y"] ) -def predict_on_test_data( - classifier: RandomForestClassifier, X_test: pd.DataFrame -) -> np.ndarray: - """Predicts class of each row on the test data input""" - - y_pred = classifier.predict(X_test) - return y_pred - - -def predict_prob_on_test_data( - classifier: RandomForestClassifier, X_test: pd.DataFrame -) -> np.ndarray: - """Predicts likelihood probability of each predicted category""" - - y_pred = classifier.predict_proba(X_test) - return y_pred - - def compute_performance_metrics( y_true: pd.DataFrame, y_pred: np.ndarray, y_pred_prob: np.ndarray ) -> dict: @@ -126,32 +94,14 @@ def compute_performance_metrics( return { "accuracy": round(acc, 2), - "precision": round(prec, 2), # type: ignore - "recall": round(recall, 2), # type: ignore + "precision": round(prec, 2), + "recall": round(recall, 2), "entropy": round(entropy, 2), } -def save_roc_plot( - clf: RandomForestClassifier, X_test: pd.DataFrame, y_test: pd.DataFrame -) -> None: - """Saves ROC curve locally""" - - display = RocCurveDisplay.from_estimator(clf, X_test, y_test) - display.figure_.savefig("roc_auc_curve.png") - - -def save_confusion_matrix_plot( - clf: RandomForestClassifier, X_test: pd.DataFrame, y_test: pd.DataFrame -) -> None: - """Saves the confusion matrix locally""" - - display = ConfusionMatrixDisplay.from_estimator(clf, X_test, y_test) - display.figure_.savefig("confusion_matrix.png") - - def tune_classifier( - X_train: pd.DataFrame, y_train: pd.DataFrame + x_train: pd.DataFrame, y_train: pd.DataFrame ) -> tuple[RandomForestClassifier, dict]: """Looks for optimal classifier hyperparameters then use them to fit a classifier""" @@ -174,7 +124,7 @@ def tune_classifier( random_state=40, n_jobs=-1, ) - random_search.fit(X_train, y_train) + random_search.fit(x_train, y_train) optimal_params = random_search.best_params_ n_estimators = optimal_params["n_estimators"] @@ -192,6 +142,6 @@ def tune_classifier( max_depth=max_depth, bootstrap=bootstrap, ) - opt_classifier.fit(X_train, y_train) + opt_classifier.fit(x_train, y_train) return opt_classifier, optimal_params