diff --git a/Makefile b/Makefile index eeda7282..ff67f45b 100644 --- a/Makefile +++ b/Makefile @@ -15,17 +15,12 @@ -include env.sh export - -help: ## Display this help screen +help: ## Display this help screen. @grep -h -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' - -pre-commit: ## Runs the pre-commit checks over entire repo - cd pipelines && \ - poetry run pre-commit run --all-files env ?= dev AUTO_APPROVE_FLAG := -deploy: ## Deploy the Terraform infrastructure to your project. Requires VERTEX_PROJECT_ID and VERTEX_LOCATION env variables to be set in env.sh. Optionally specify env= (default = dev) +deploy: ## Deploy infrastructure to your project. Optionally set env= (default = dev). @if [ "$(auto-approve)" = "true" ]; then \ AUTO_APPROVE_FLAG="-auto-approve"; \ fi; \ @@ -33,7 +28,7 @@ deploy: ## Deploy the Terraform infrastructure to your project. Requires VERTEX_ terraform init -backend-config='bucket=${VERTEX_PROJECT_ID}-tfstate' && \ terraform apply -var 'project_id=${VERTEX_PROJECT_ID}' -var 'region=${VERTEX_LOCATION}' $$AUTO_APPROVE_FLAG -undeploy: ## DESTROY the Terraform infrastructure in your project. Requires VERTEX_PROJECT_ID and VERTEX_LOCATION env variables to be set in env.sh. Optionally specify env= (default = dev) +undeploy: ## DESTROY the infrastructure in your project. Optionally set env= (default = dev). @if [ "$(auto-approve)" = "true" ]; then \ AUTO_APPROVE_FLAG="-auto-approve"; \ fi; \ @@ -41,33 +36,35 @@ undeploy: ## DESTROY the Terraform infrastructure in your project. Requires VERT terraform init -backend-config='bucket=${VERTEX_PROJECT_ID}-tfstate' && \ terraform destroy -var 'project_id=${VERTEX_PROJECT_ID}' -var 'region=${VERTEX_LOCATION}' $$AUTO_APPROVE_FLAG -install: ## Set up local environment for Python development on pipelines +install: ## Set up local Python environment for development. @cd pipelines && \ poetry install --with dev && \ cd ../components && \ - poetry install --with dev + poetry install --with dev && \ + cd ../model && \ + poetry install -compile: ## Compile the pipeline to pipeline.yaml. Must specify pipeline= +compile: ## Compile pipeline. Must set pipeline=. @cd pipelines/src && \ - poetry run kfp dsl compile --py pipelines/${pipeline}/pipeline.py --output pipelines/${pipeline}/pipeline.yaml --function pipeline + echo "Compiling $$pipeline pipeline" && \ + poetry run kfp dsl compile --py pipelines/${pipeline}.py --output pipelines/${pipeline}.yaml --function pipeline -targets ?= training serving -build: ## Build and push training and/or serving container(s) image using Docker. Specify targets= e.g. targets=training or targets="training serving" (default) +images ?= training serving +build: ## Build and push container(s). Set images= e.g. images=training (default = training serving). @cd model && \ - for target in $$targets ; do \ - echo "Building $$target image" && \ + for image in $$images ; do \ + echo "Building $$image image" && \ gcloud builds submit . \ --region=${VERTEX_LOCATION} \ --project=${VERTEX_PROJECT_ID} \ --gcs-source-staging-dir=gs://${VERTEX_PROJECT_ID}-staging/source \ - --substitutions=_DOCKER_TARGET=$$target,_DESTINATION_IMAGE_URI=${CONTAINER_IMAGE_REGISTRY}/$$target:${RESOURCE_SUFFIX} ; \ + --substitutions=_DOCKER_TARGET=$$image,_DESTINATION_IMAGE_URI=${CONTAINER_IMAGE_REGISTRY}/$$image:${RESOURCE_SUFFIX} ; \ done - compile ?= true build ?= true wait ?= false -run: ## Run pipeline in sandbox environment. Must specify pipeline=. Optionally specify wait= (default = false). Set compile=false to skip recompiling the pipeline and set build=false to skip rebuilding container images +run: ## Run pipeline. Must set pipeline=. Optionally set wait= (default = false), compile= (default = true) to recompile pipeline, build= (default = true) to rebuild container image(s), images= (default = training serving) to set which images are rebuilt. @if [ $(compile) = "true" ]; then \ $(MAKE) compile ; \ elif [ $(compile) != "false" ]; then \ @@ -81,12 +78,19 @@ run: ## Run pipeline in sandbox environment. Must specify pipeline= to test scripts and optionally components +test: ## Run unit tests for pipelines. Optionally set components= (default = true) to test components package. @if [ $(components) = "true" ]; then \ - echo "Testing components" && \ + echo "Running unit tests in components" && \ cd components && \ poetry run pytest && \ cd .. ; \ @@ -94,6 +98,10 @@ test: ## Run unit tests. Specify components= to test scripts and opt echo "ValueError: components must be either true or false" ; \ exit ; \ fi && \ - echo "Testing scripts" && \ + echo "Running unit tests in pipelines" && \ cd pipelines && \ - poetry run python -m pytest tests/utils + poetry run python -m pytest + +pre-commit: ## Run pre-commit checks for pipelines. + cd pipelines && \ + poetry run pre-commit run --all-files diff --git a/README.md b/README.md index fbecd4ef..74744856 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. --> -# Vertex Pipelines End-to-end Samples +# Vertex Pipelines End-to-End Samples _AKA "Vertex AI Turbo Templates"_ @@ -71,19 +71,19 @@ Before your CI/CD pipelines can deploy the infrastructure, you will need to set ```bash export DEV_PROJECT_ID=my-dev-gcp-project export DEV_LOCATION=europe-west2 -gsutil mb -l DEV_LOCATION -p DEV_PROJECT_ID --pap=enforced gs://DEV_PROJECT_ID-tfstate && \ - gsutil ubla set on gs://DEV_PROJECT_ID-tfstate +gsutil mb -l $DEV_LOCATION -p $DEV_PROJECT_ID --pap=enforced gs://$DEV_PROJECT_ID-tfstate && \ + gsutil ubla set on gs://$DEV_PROJECT_ID-tfstate ``` Enable APIs in admin project: ```bash export ADMIN_PROJECT_ID=my-admin-gcp-project -gcloud services enable cloudresourcemanager.googleapis.com serviceusage.googleapis.com --project=ADMIN_PROJECT_ID +gcloud services enable cloudresourcemanager.googleapis.com serviceusage.googleapis.com --project=$ADMIN_PROJECT_ID ``` ```bash -make deploy env=dev VERTEX_PROJECT_ID= +make deploy env=dev ``` More details about infrastructure is explained in [this README](docs/INFRASTRUCTURE.md). @@ -117,10 +117,10 @@ You can modify this to suit your own use case. Build the training and serving container images and push them to Artifact Registry with: ```bash -make build [ targets=training serving ] +make build [ images=training serving ] ``` -Optionally specify the `targets` variable to only build one of the images. +Optionally specify the `images` variable to only build one of the images. **Execute pipelines:** Vertex AI Pipelines uses KubeFlow to orchestrate your training steps, as such you'll need to: @@ -136,10 +136,17 @@ make run pipeline=training [ wait= ] [ build= ] [ compil The command has the following true/false flags: -- `build` - re-build containers for training & serving code (limit by setting targets=training to build only one of the containers) +- `build` - re-build containers for training & serving code (limit by setting images=training to build only one of the containers) - `compile` - re-compile the pipeline to YAML - `wait` - run the pipeline (a-)sync +**Shortcuts:** Use these commands which support the same options as `run` to run the training or prediction pipeline: + +```bash +make training +make prediction +``` + ## Test Unit tests are performed using [pytest](https://docs.pytest.org). diff --git a/cloudbuild/e2e-test.yaml b/cloudbuild/e2e-test.yaml index 8c5c51d0..99a38e22 100644 --- a/cloudbuild/e2e-test.yaml +++ b/cloudbuild/e2e-test.yaml @@ -51,6 +51,7 @@ steps: - ENABLE_PIPELINE_CACHING=${_TEST_ENABLE_PIPELINE_CACHING} - VERTEX_LOCATION=${_TEST_VERTEX_LOCATION} - VERTEX_PROJECT_ID=${_TEST_VERTEX_PROJECT_ID} + - BQ_LOCATION=${_TEST_BQ_LOCATION} - VERTEX_SA_EMAIL=${_TEST_VERTEX_SA_EMAIL} - VERTEX_CMEK_IDENTIFIER=${_TEST_VERTEX_CMEK_IDENTIFIER} - VERTEX_NETWORK=${_TEST_VERTEX_NETWORK} diff --git a/cloudbuild/release.yaml b/cloudbuild/release.yaml index fc20bc75..a99c2eba 100644 --- a/cloudbuild/release.yaml +++ b/cloudbuild/release.yaml @@ -47,12 +47,12 @@ steps: cd pipelines && \ poetry run python -m pipelines.utils.upload_pipeline \ --dest=https://${_VERTEX_LOCATION}-kfp.pkg.dev/$$proj/vertex-pipelines \ - --yaml=src/pipelines/training/pipeline.yaml \ + --yaml=src/pipelines/training.yaml \ --tag=latest \ --tag=${TAG_NAME} && \ poetry run python -m pipelines.utils.upload_pipeline \ --dest=https://${_VERTEX_LOCATION}-kfp.pkg.dev/$$proj/vertex-pipelines \ - --yaml=src/pipelines/prediction/pipeline.yaml \ + --yaml=src/pipelines/prediction.yaml \ --tag=latest \ --tag=${TAG_NAME}; \ done diff --git a/components/src/components/extract_table.py b/components/src/components/extract_table.py index 496e5377..5fe413f4 100644 --- a/components/src/components/extract_table.py +++ b/components/src/components/extract_table.py @@ -11,81 +11,32 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from kfp.dsl import Dataset, Output, ContainerSpec, container_component -from kfp.dsl import Dataset, Output, component - -@component( - base_image="python:3.9", - packages_to_install=["google-cloud-bigquery==2.30.0"], -) +@container_component def extract_table( - bq_client_project_id: str, - source_project_id: str, - dataset_id: str, - table_name: str, - dataset: Output[Dataset], - destination_gcs_uri: str = None, - dataset_location: str = "EU", - extract_job_config: dict = None, - skip_if_exists: bool = True, + project: str, + location: str, + table: str, + data: Output[Dataset], + destination_format: str = "CSV", + compression: str = "NONE", + field_delimiter: str = ",", + print_header: str = "true", ): - """ - Extract BQ table in GCS. - Args: - bq_client_project_id (str): project id that will be used by the bq client - source_project_id (str): project id from where BQ table will be extracted - dataset_id (str): dataset id from where BQ table will be extracted - table_name (str): table name (without project id and dataset id) - dataset (Output[Dataset]): output dataset artifact generated by the operation, - this parameter will be passed automatically by the orchestrator - dataset_location (str): bq dataset location. Defaults to "EU". - extract_job_config (dict): dict containing optional parameters - required by the bq extract operation. Defaults to None. - See available parameters here - https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.ExtractJobConfig.html # noqa - destination_gcs_uri (str): GCS URI to use for saving query results (optional). - - Returns: - Outputs (NamedTuple (str, list)): Output dataset directory and its GCS uri. - """ - - import logging - from pathlib import Path - from google.cloud.exceptions import GoogleCloudError - from google.cloud import bigquery - - # set uri of output dataset if destination_gcs_uri is provided - if destination_gcs_uri: - dataset.uri = destination_gcs_uri - - logging.info(f"Checking if destination exists: {dataset.path}") - if Path(dataset.path).exists() and skip_if_exists: - logging.info("Destination already exists, skipping table extraction!") - return - - full_table_id = f"{source_project_id}.{dataset_id}.{table_name}" - table = bigquery.table.Table(table_ref=full_table_id) - - if extract_job_config is None: - extract_job_config = {} - job_config = bigquery.job.ExtractJobConfig(**extract_job_config) - - logging.info(f"Extract table {table} to {dataset.uri}") - client = bigquery.client.Client( - project=bq_client_project_id, location=dataset_location + return ContainerSpec( + image="google/cloud-sdk:alpine", + command=["bq"], + args=[ + "extract", + f"--project_id={project}", + f"--location={location}", + f"--destination_format={destination_format}", + f"--compression={compression}", + f"--field_delimiter={field_delimiter}", + f"--print_header={print_header}", + table, + data.uri, + ], ) - extract_job = client.extract_table( - table, - dataset.uri, - job_config=job_config, - ) - - try: - result = extract_job.result() - logging.info("Table extracted, result: {}".format(result)) - except GoogleCloudError as e: - logging.error(e) - logging.error(extract_job.error_result) - logging.error(extract_job.errors) - raise e diff --git a/components/src/components/lookup_model.py b/components/src/components/lookup_model.py index 813208a7..1f00035b 100644 --- a/components/src/components/lookup_model.py +++ b/components/src/components/lookup_model.py @@ -22,10 +22,9 @@ ) def lookup_model( model_name: str, - project_location: str, - project_id: str, + location: str, + project: str, model: Output[Model], - order_models_by: str = "create_time desc", fail_on_model_not_found: bool = False, ) -> NamedTuple("Outputs", [("model_resource_name", str), ("training_dataset", dict)]): """ @@ -33,15 +32,9 @@ def lookup_model( Args: model_name (str): display name of the model - project_location (str): location of the Google Cloud project - project_id (str): project id of the Google Cloud project + location (str): location of the Google Cloud project + project (str): project id of the Google Cloud project model (Output[Model]): a Vertex AI model - order_models_by (str): if multiple models are found based on the display name, - use a filter clause: - A comma-separated list of fields to order by, sorted in - ascending order. Use "desc" after a field name for descending. - Supported fields: `display_name`, `create_time`, `update_time` - Defaults to "create_time desc". fail_on_model_not_found (bool): if set to True, raise runtime error if model is not found @@ -60,25 +53,23 @@ def lookup_model( logging.info(f"listing models with display name {model_name}") models = Model.list( filter=f'display_name="{model_name}"', - order_by=order_models_by, - location=project_location, - project=project_id, + location=location, + project=project, ) - logging.info(f"found {len(models)} models") + logging.info(f"found {len(models)} model(s)") training_dataset = {} model_resource_name = "" if len(models) == 0: logging.error( - f"No model found with name {model_name}" - + f"(project: {project_id} location: {project_location})" + f"No model found with name {model_name} " + + f"(project: {project} location: {location})" ) if fail_on_model_not_found: raise RuntimeError(f"Failed as model was not found") elif len(models) == 1: target_model = models[0] model_resource_name = target_model.resource_name - logging.info(f"choosing model by order ({order_models_by})") logging.info(f"model display name: {target_model.display_name}") logging.info(f"model resource name: {target_model.resource_name}") logging.info(f"model uri: {target_model.uri}") diff --git a/components/src/components/model_batch_predict.py b/components/src/components/model_batch_predict.py index da5b6b14..ab329181 100644 --- a/components/src/components/model_batch_predict.py +++ b/components/src/components/model_batch_predict.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kfp.dsl import Input, Model, component -from typing import List, NamedTuple +from kfp.dsl import Input, Model, component, OutputPath +from typing import List @component( @@ -25,9 +25,10 @@ ) def model_batch_predict( model: Input[Model], + gcp_resources: OutputPath(str), job_display_name: str, - project_location: str, - project_id: str, + location: str, + project: str, source_uri: str, destination_uri: str, source_format: str, @@ -39,15 +40,15 @@ def model_batch_predict( monitoring_alert_email_addresses: List[str] = None, monitoring_skew_config: dict = None, instance_config: dict = None, -) -> NamedTuple("Outputs", [("gcp_resources", str)]): +): """ Trigger a batch prediction job and enable monitoring. Args: model (Input[Model]): Input model to use for calculating predictions. job_display_name: Name of the batch prediction job. - project_location (str): location of the Google Cloud project. Defaults to None. - project_id (str): project id of the Google Cloud project. Defaults to None. + location (str): location of the Google Cloud project. Defaults to None. + project (str): project id of the Google Cloud project. Defaults to None. source_uri (str): bq:// URI or a list of gcs:// URIs to read input instances. destination_uri (str): bq:// or gs:// URI to store output predictions. source_format (str): E.g. "bigquery", "jsonl", "csv". See: @@ -67,7 +68,7 @@ def model_batch_predict( input instances to the instances that the Model accepts. See: https://cloud.google.com/vertex-ai/docs/reference/rest/v1beta1/projects.locations.batchPredictionJobs#instanceconfig Returns: - NamedTuple: gcp_resources for Vertex AI UI integration. + OutputPath: gcp_resources for Vertex AI UI integration. """ import logging @@ -118,7 +119,7 @@ def is_job_successful(job_state: JobState) -> bool: _POLLING_INTERVAL_IN_SECONDS = 20 _CONNECTION_ERROR_RETRY_LIMIT = 5 - api_endpoint = f"{project_location}-aiplatform.googleapis.com" + api_endpoint = f"{location}-aiplatform.googleapis.com" input_config = {"instancesFormat": source_format} output_config = {"predictionsFormat": destination_format} @@ -167,11 +168,19 @@ def is_job_successful(job_state: JobState) -> bool: logging.info(request) client = JobServiceClient(client_options={"api_endpoint": api_endpoint}) response = client.create_batch_prediction_job( - parent=f"projects/{project_id}/locations/{project_location}", + parent=f"projects/{project}/locations/{location}", batch_prediction_job=request, ) logging.info(f"Submitted batch prediction job: {response.name}") + # output GCP resource for Vertex AI UI integration + batch_job_resources = GcpResources() + dr = batch_job_resources.resources.add() + dr.resource_type = "BatchPredictionJob" + dr.resource_uri = response.name + with open(gcp_resources, "w") as f: + f.write(MessageToJson(batch_job_resources)) + with execution_context.ExecutionContext( on_cancel=partial( send_cancel_request, @@ -206,12 +215,3 @@ def is_job_successful(job_state: JobState) -> bool: f"Waiting for {_POLLING_INTERVAL_IN_SECONDS} seconds for next poll." ) time.sleep(_POLLING_INTERVAL_IN_SECONDS) - - # return GCP resource for Vertex AI UI integration - batch_job_resources = GcpResources() - dr = batch_job_resources.resources.add() - dr.resource_type = "BatchPredictionJob" - dr.resource_uri = response.name - gcp_resources = MessageToJson(batch_job_resources) - - return (gcp_resources,) diff --git a/components/src/components/upload_model.py b/components/src/components/upload_model.py index 2484ffec..d23609c6 100644 --- a/components/src/components/upload_model.py +++ b/components/src/components/upload_model.py @@ -25,26 +25,27 @@ ) def upload_model( model: Input[Model], - test_dataset: Input[Dataset], + test_data: Input[Dataset], model_evaluation: Input[Metrics], vertex_model: Output[VertexModel], - project_id: str, - project_location: str, + project: str, + location: str, model_name: str, eval_metric: str, eval_lower_is_better: bool, pipeline_job_id: str, serving_container_image: str, + model_description: str = None, evaluation_name: str = "Imported evaluation", ) -> None: """ Args: model (Model): Input challenger model. - test_dataset (Dataset): Test dataset used for evaluating challenger model. + test_data (Dataset): Test dataset used for evaluating challenger model. vertex_model (VertexModel): Output model uploaded to Vertex AI Model Registry. model_evaluation (Metrics): Evaluation metrics of challenger model. - project_id (str): project id of the Google Cloud project. - project_location (str): location of the Google Cloud project. + project (str): project id of the Google Cloud project. + location (str): location of the Google Cloud project. pipeline_job_id (str): model_name (str): Name of champion and challenger model in Vertex AI Model Registry. @@ -53,8 +54,9 @@ def upload_model( False for classification metrics. serving_container_image (str): Container URI for serving the challenger model. - evaluation_name (str): Name of evaluation results which are displayed in the - Vertex AI UI of the challenger model. + model_description (str): Optional. Description of model. + evaluation_name (str): Optional. Name of evaluation results which are + displayed in the Vertex AI UI of the challenger model. """ import json @@ -64,22 +66,20 @@ def upload_model( from google.cloud.aiplatform_v1 import ModelEvaluation, ModelServiceClient from google.protobuf.json_format import ParseDict - def lookup_model( - project_id: str, project_location: str, model_name: str - ) -> aip.Model: + def lookup_model(model_name: str) -> aip.Model: """Look up model in model registry.""" logging.info(f"listing models with display name {model_name}") models = aip.Model.list( filter=f'display_name="{model_name}"', - location=project_location, - project=project_id, + location=location, + project=project, ) logging.info(f"found {len(models)} models") if len(models) == 0: logging.info( f"No model found with name {model_name}" - + f"(project: {project_id} location: {project_location})" + + f"(project: {project} location: {location})" ) return None elif len(models) == 1: @@ -115,6 +115,7 @@ def upload_model_to_registry( logging.info(f"Uploading model {model_name} (default: {is_default_version}") uploaded_model = aip.Model.upload( display_name=model_name, + description=model_description, artifact_uri=model.uri, serving_container_image_uri=serving_container_image, serving_container_predict_route="/predict", @@ -126,7 +127,7 @@ def upload_model_to_registry( # Output google.VertexModel artifact vertex_model.uri = ( - f"https://{project_location}-aiplatform.googleapis.com/v1/" + f"https://{location}-aiplatform.googleapis.com/v1/" f"{uploaded_model.versioned_resource_name}" ) vertex_model.metadata["resourceName"] = uploaded_model.versioned_resource_name @@ -136,7 +137,6 @@ def upload_model_to_registry( def import_evaluation( parsed_metrics: dict, challenger_model: aip.Model, - project_location: str, evaluation_name: str, ) -> str: """Import model evaluation.""" @@ -153,7 +153,7 @@ def import_evaluation( "metadata": { "pipeline_job_id": pipeline_job_id, "evaluation_dataset_type": "gcs", - "evaluation_dataset_path": [test_dataset.uri], + "evaluation_dataset_path": [test_data.uri], }, } @@ -161,9 +161,7 @@ def import_evaluation( logging.debug(f"Request: {request}") challenger_name = challenger_model.versioned_resource_name client = ModelServiceClient( - client_options={ - "api_endpoint": project_location + "-aiplatform.googleapis.com" - } + client_options={"api_endpoint": location + "-aiplatform.googleapis.com"} ) logging.info(f"Uploading model evaluation for {challenger_name}") response = client.import_model_evaluation( @@ -177,9 +175,7 @@ def import_evaluation( with open(model_evaluation.path, "r") as f: challenger_metrics = json.load(f) - champion_model = lookup_model( - project_id=project_id, project_location=project_location, model_name=model_name - ) + champion_model = lookup_model(model_name=model_name) challenger_wins = True parent_model_uri = None @@ -206,6 +202,5 @@ def import_evaluation( import_evaluation( parsed_metrics=challenger_metrics, challenger_model=model, - project_location=project_location, evaluation_name=evaluation_name, ) diff --git a/components/tests/conftest.py b/components/tests/conftest.py index 89cdc9ba..eb9014b4 100644 --- a/components/tests/conftest.py +++ b/components/tests/conftest.py @@ -31,7 +31,7 @@ def mock_kfp_artifact(monkeypatch): Args: monkeypatch: Used to patch the decorator `@component` in `kfp.v2.dsl`. This prevents KFP from changing the Python functions when applying - pytests. + pytest. Returns: None diff --git a/components/tests/test_extract_table.py b/components/tests/test_extract_table.py deleted file mode 100644 index 2db07695..00000000 --- a/components/tests/test_extract_table.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import google.cloud.bigquery # noqa -from kfp.dsl import Dataset -from unittest import mock - -import components - -extract_bq_to_dataset = components.extract_table.python_func - - -@mock.patch("google.cloud.bigquery.client.Client") -@mock.patch("google.cloud.bigquery.table.Table") -@mock.patch("google.cloud.bigquery.job.ExtractJobConfig") -def test_extract_table(mock_job_config, mock_table, mock_client, tmpdir): - """ - Checks that the extract_bq_to_dataset is called correctly - """ - mock_path = tmpdir - mock_client.extract_table.return_value = "my-job" - mock_table.return_value.table_ref = "my-table" - mock_job_config.return_value = "mock-job-config" - - extract_bq_to_dataset( - bq_client_project_id="my-project-id", - source_project_id="source-project-id", - dataset_id="dataset-id", - table_name="table-name", - dataset=Dataset(uri=mock_path), - destination_gcs_uri="gs://mock_bucket", - dataset_location="EU", - extract_job_config=None, - skip_if_exists=False, - ) - - mock_client.return_value.extract_table.assert_called_once_with( - mock_table.return_value, "gs://mock_bucket", job_config="mock-job-config" - ) - - -@mock.patch("google.cloud.bigquery.client.Client") -@mock.patch("google.cloud.bigquery.table.Table") -@mock.patch("google.cloud.bigquery.job.ExtractJobConfig") -@mock.patch("pathlib.Path.exists") -def test_extract_table_skip_existing( - mock_path_exists, mock_job_config, mock_table, mock_client, tmpdir -): - """ - Checks that when the dataset exists the method is not called - """ - mock_path = tmpdir - mock_path_exists.return_value = True - - extract_bq_to_dataset( - bq_client_project_id="my-project-id", - source_project_id="source-project-id", - dataset_id="dataset-id", - table_name="table-name", - dataset=Dataset(uri=mock_path), - destination_gcs_uri="gs://mock_bucket", - dataset_location="EU", - extract_job_config=None, - skip_if_exists=True, - ) - - assert not mock_client.return_value.extract_table.called diff --git a/components/tests/test_lookup_model.py b/components/tests/test_lookup_model.py index a7868b66..58003203 100644 --- a/components/tests/test_lookup_model.py +++ b/components/tests/test_lookup_model.py @@ -22,14 +22,14 @@ @mock.patch("google.cloud.aiplatform.Model") -def test_lookup_model(mock_model, tmpdir): +def test_lookup_model(mock_model, tmp_path): """ Assert lookup_model produces expected resource name, and that list method is called with the correct arguemnts """ # Mock attribute and method - mock_path = tmpdir + mock_path = str(tmp_path / "model") mock_model.resource_name = "my-model-resource-name" mock_model.uri = mock_path mock_model.list.return_value = [mock_model] @@ -37,9 +37,8 @@ def test_lookup_model(mock_model, tmpdir): # Invoke the model look up found_model_resource_name, _ = lookup_model( model_name="my-model", - project_location="europe-west4", - project_id="my-project-id", - order_models_by="create_time desc", + location="europe-west4", + project="my-project-id", fail_on_model_not_found=False, model=Model(uri=mock_path), ) @@ -49,14 +48,13 @@ def test_lookup_model(mock_model, tmpdir): # Check the list method was called once with the correct arguments mock_model.list.assert_called_once_with( filter='display_name="my-model"', - order_by="create_time desc", location="europe-west4", project="my-project-id", ) @mock.patch("google.cloud.aiplatform.Model") -def test_lookup_model_when_no_models(mock_model, tmpdir): +def test_lookup_model_when_no_models(mock_model, tmp_path): """ Checks that when there are no models and fail_on_model_found = False, lookup_model returns an empty string. @@ -64,19 +62,17 @@ def test_lookup_model_when_no_models(mock_model, tmpdir): mock_model.list.return_value = [] exported_model_resource_name, _ = lookup_model( model_name="my-model", - project_location="europe-west4", - project_id="my-project-id", - order_models_by="create_time desc", + location="europe-west4", + project="my-project-id", fail_on_model_not_found=False, - model=Model(uri=str(tmpdir)), + model=Model(uri=str(tmp_path / "model")), ) - print(exported_model_resource_name) assert exported_model_resource_name == "" @mock.patch("google.cloud.aiplatform.Model") -def test_lookup_model_when_no_models_fail(mock_model, tmpdir): +def test_lookup_model_when_no_models_fail(mock_model, tmp_path): """ Checks that when there are no models and fail_on_model_found = True, lookup_model raises a RuntimeError. @@ -87,9 +83,8 @@ def test_lookup_model_when_no_models_fail(mock_model, tmpdir): with pytest.raises(RuntimeError): lookup_model( model_name="my-model", - project_location="europe-west4", - project_id="my-project-id", - order_models_by="create_time desc", + location="europe-west4", + project="my-project-id", fail_on_model_not_found=True, - model=Model(uri=str(tmpdir)), + model=Model(uri=str(tmp_path / "model")), ) diff --git a/components/tests/test_model_batch_predict.py b/components/tests/test_model_batch_predict.py index ad4c427a..aec99fad 100644 --- a/components/tests/test_model_batch_predict.py +++ b/components/tests/test_model_batch_predict.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import json import pytest from unittest import mock from kfp.dsl import Model @@ -59,7 +57,7 @@ def test_model_batch_predict( create_job, get_job, - tmpdir, + tmp_path, source_format, destination_format, source_uri, @@ -70,22 +68,27 @@ def test_model_batch_predict( """ Asserts model_batch_predict successfully creates requests given different arguments. """ - mock_model = Model(uri=tmpdir, metadata={"resourceName": ""}) + mock_model = Model(uri=str(tmp_path / "model"), metadata={"resourceName": ""}) + gcp_resources_path = tmp_path / "gcp_resources.json" - (gcp_resources,) = model_batch_predict( - model=mock_model, - job_display_name="", - project_location="", - project_id="", - source_uri=source_uri, - destination_uri=destination_format, - source_format=source_format, - destination_format=destination_format, - monitoring_training_dataset=monitoring_training_dataset, - monitoring_alert_email_addresses=monitoring_alert_email_addresses, - monitoring_skew_config=monitoring_skew_config, - ) + try: + model_batch_predict( + model=mock_model, + job_display_name="", + location="", + project="", + source_uri=source_uri, + destination_uri=destination_format, + source_format=source_format, + destination_format=destination_format, + monitoring_training_dataset=monitoring_training_dataset, + monitoring_alert_email_addresses=monitoring_alert_email_addresses, + monitoring_skew_config=monitoring_skew_config, + gcp_resources=str(gcp_resources_path), + ) - create_job.assert_called_once() - get_job.assert_called_once() - assert json.loads(gcp_resources)["resources"][0]["resourceUri"] == mock_job1.name + create_job.assert_called_once() + get_job.assert_called_once() + assert gcp_resources_path.exists() + finally: + gcp_resources_path.unlink(missing_ok=True) diff --git a/components/tests/test_upload_model.py b/components/tests/test_upload_model.py index cb35b4e8..d4b96e4c 100644 --- a/components/tests/test_upload_model.py +++ b/components/tests/test_upload_model.py @@ -48,6 +48,7 @@ def test_model_upload_no_champion( model = Model(uri="dummy-model-uri") serving_container_image = "dummy_image:latest" model_name = "dummy-model-name" + model_description = "dummy model_description" vertex_model = VertexModel.create( name=model_name, uri="chall_uri", model_resource_name="chall_resource_name" ) @@ -64,16 +65,17 @@ def test_model_upload_no_champion( upload_model( model=model, + model_description=model_description, serving_container_image=serving_container_image, vertex_model=vertex_model, - project_id=project, - project_location=location, + project=project, + location=location, model_evaluation=model_evaluation, eval_metric=eval_metric, eval_lower_is_better=eval_lower_is_better, model_name=model_name, pipeline_job_id=pipeline_job_id, - test_dataset=test_dataset, + test_data=test_dataset, evaluation_name=evaluation_name, ) @@ -91,6 +93,7 @@ def test_model_upload_no_champion( # Check model upload call mock_model_class.upload.assert_called_once_with( display_name=model_name, + description=model_description, artifact_uri="dummy-model-uri", serving_container_image_uri=serving_container_image, serving_container_predict_route="/predict", @@ -158,6 +161,7 @@ def test_model_upload_challenger_wins( model = Model(uri="dummy-model-uri") serving_container_image = "dummy_image:latest" model_name = "dummy-model-name" + model_description = "dummy model_description" vertex_model = VertexModel.create( name=model_name, uri="chall_uri", model_resource_name="chall_resource_name" ) @@ -174,16 +178,17 @@ def test_model_upload_challenger_wins( upload_model( model=model, + model_description=model_description, serving_container_image=serving_container_image, vertex_model=vertex_model, - project_id=project, - project_location=location, + project=project, + location=location, model_evaluation=model_evaluation, eval_metric=eval_metric, eval_lower_is_better=eval_lower_is_better, model_name=model_name, pipeline_job_id=pipeline_job_id, - test_dataset=test_dataset, + test_data=test_dataset, evaluation_name=evaluation_name, ) @@ -201,6 +206,7 @@ def test_model_upload_challenger_wins( # Check model upload call mock_model_class.upload.assert_called_once_with( display_name=model_name, + description=model_description, artifact_uri="dummy-model-uri", serving_container_image_uri=serving_container_image, serving_container_predict_route="/predict", @@ -268,6 +274,7 @@ def test_model_upload_champion_wins( model = Model(uri="dummy-model-uri") serving_container_image = "dummy_image:latest" model_name = "dummy-model-name" + model_description = "dummy model_description" vertex_model = VertexModel.create( name=model_name, uri="chall_uri", model_resource_name="chall_resource_name" ) @@ -284,16 +291,17 @@ def test_model_upload_champion_wins( upload_model( model=model, + model_description=model_description, serving_container_image=serving_container_image, vertex_model=vertex_model, - project_id=project, - project_location=location, + project=project, + location=location, model_evaluation=model_evaluation, eval_metric=eval_metric, eval_lower_is_better=eval_lower_is_better, model_name=model_name, pipeline_job_id=pipeline_job_id, - test_dataset=test_dataset, + test_data=test_dataset, evaluation_name=evaluation_name, ) @@ -311,6 +319,7 @@ def test_model_upload_champion_wins( # Check model upload call mock_model_class.upload.assert_called_once_with( display_name=model_name, + description=model_description, artifact_uri="dummy-model-uri", serving_container_image_uri=serving_container_image, serving_container_predict_route="/predict", diff --git a/docs/AUTOMATION.md b/docs/AUTOMATION.md index 5094b4a7..b9c4adc2 100644 --- a/docs/AUTOMATION.md +++ b/docs/AUTOMATION.md @@ -63,6 +63,7 @@ Set up a trigger for the `e2e-test.yaml` pipeline, and provide substitution valu | `_TEST_VERTEX_PROJECT_ID` | Google Cloud project ID in which you want to run the ML pipelines in the E2E tests as part of the CI/CD pipeline. | Project ID for the DEV environment | | `_TEST_VERTEX_SA_EMAIL` | Email address of the service account you want to use to run the ML pipelines in the E2E tests as part of the CI/CD pipeline. | `vertex-pipelines@.iam.gserviceaccount.com` | | `_TEST_ENABLE_PIPELINE_CACHING` | Override the default caching behaviour of the ML pipelines. Leave blank to use the default caching behaviour. | `False` | +| `_TEST_BQ_LOCATION` | The location of BigQuery datasets used in training and prediction pipelines. | `US` or `EU` if using multi-region datasets | We recommend to enable comment control for this trigger (select `Required` under `Comment Control`). This will mean that the end-to-end tests will only run once a repository collaborator or owner comments `/gcbrun` on the pull request. This will help to avoid unnecessary runs of the ML pipelines while a Pull Request is still being worked on, as they can take a long time (and can be expensive to run on every Pull Request!) diff --git a/docs/CONTRIBUTION.md b/docs/CONTRIBUTION.md index b1f07e49..6fcc298b 100644 --- a/docs/CONTRIBUTION.md +++ b/docs/CONTRIBUTION.md @@ -65,31 +65,6 @@ gcloud artifacts repositories create vertex-images \ --location=${GCP_REGION} ``` -### BigQuery - -Create a new BigQuery dataset for the Chicago Taxi data: - -``` -bq --location=${GCP_REGION} mk --dataset "${GCP_PROJECT_ID}:chicago_taxi_trips" -``` - -Create a new BigQuery dataset for data processing during the pipelines: - -``` -bq --location=${GCP_REGION} mk --dataset "${GCP_PROJECT_ID}:preprocessing" -``` - -Set up a BigQuery transfer job to mirror the Chicago Taxi dataset to your project - -``` -bq mk --transfer_config \ - --project_id=${GCP_PROJECT_ID} \ - --data_source="cross_region_copy" \ - --target_dataset="chicago_taxi_trips" \ - --display_name="Chicago taxi trip mirror" \ - --params='{"source_dataset_id":"'"chicago_taxi_trips"'","source_project_id":"'"bigquery-public-data"'"}' -``` - ### Service Accounts Two service accounts are required diff --git a/docs/PIPELINES.md b/docs/PIPELINES.md index cfd9b27a..7d1da590 100644 --- a/docs/PIPELINES.md +++ b/docs/PIPELINES.md @@ -15,7 +15,7 @@ limitations under the License. --> # ML Pipelines -There are two ML pipelines defined in this repository: a training pipeline (located in [pipelines/src/pipelines/training/pipeline.py](/pipelines/src/pipelines/training/pipeline.py)) and a batch prediction pipeline (located in [pipelines/src/pipelines/prediction/pipeline.py](/pipelines/src/pipelines/prediction/pipeline.py)). +There are two ML pipelines defined in this repository: a training pipeline (located in `pipelines/src/pipelines/training.py` and a batch prediction pipeline (located in `pipelines/src/pipelines/prediction.py`). ## Pipeline input parameters @@ -70,7 +70,7 @@ In the next sections we will walk through the different pipeline steps. The first pipeline step runs a SQL script in BigQuery to extract data from the source table and load it into tables according to a train/test/validation split. -The SQL query for this can be found in [pipelines/src/pipelines/training/queries/preprocessing.sql](/pipelines/src/pipelines/training/queries/preprocessing.sql). +The SQL query for this can be found in `pipelines/src/pipelines/queries/preprocessing.sql`. As you can see in this SQL query, there are some placeholder values (marked by the curly brace syntax `{{ }}`). When the pipeline runs, these are replaced with values provided from the ML pipeline. @@ -88,7 +88,7 @@ This step is performed using a custom KFP component located in [components/bigqu ### Training step -The training step is defined as a [KFP container component](https://www.kubeflow.org/docs/components/pipelines/v2/components/container-components/) in the [pipeline.py](/pipelines/src/pipelines/training/pipeline.py) file. +The training step is defined as a [KFP container component](https://www.kubeflow.org/docs/components/pipelines/v2/components/container-components/) in the [pipeline.py](/pipelines/src/pipelines/training.py) file. The container image used for this component is built using CI/CD (or the `make build target=training` command if you want to build it during development). diff --git a/docs/PRODUCTION.md b/docs/PRODUCTION.md index 8302f902..8f1315fc 100644 --- a/docs/PRODUCTION.md +++ b/docs/PRODUCTION.md @@ -29,7 +29,7 @@ This document describes the full process from making a change to your pipeline c ## Making your changes to the pipelines 1. Create a feature branch off the main/master branch: `git checkout -b my-feature-branch` -1. Make changes to your pipeline code locally (e.g. [pipelines/src/pipelines/training/pipeline.py](/pipelines/src/pipelines/training/pipeline.py)) +1. Make changes to your pipeline code locally (e.g. `pipelines/src/pipelines/training.py`) 1. Commit these changes to your feature branch 1. Push your feature branch to GitHub 1. Open a Pull Request (PR) from your feature branch to the main/master branch @@ -52,7 +52,6 @@ When the new tag is created, the `release.yaml` pipeline should be triggered. It #### Example - - You have set up the following Cloud Build variables / substitutions for the `release.yaml` CI/CD pipeline - `_PIPELINE_PUBLISH_AR_PATHS` = `https://-kfp.pkg.dev//vertex-pipelines https://-kfp.pkg.dev//vertex-pipelines https://-kfp.pkg.dev//vertex-pipelines` - You create a release from the main/master branch and use the git tag `v1.2` @@ -81,44 +80,27 @@ Create a new branch off the main/master branch e.g. `git checkout -b test-env-sc ``` cloud_schedulers_config = { - xgboost_training = { - description = "Trigger my training pipeline in Vertex" + training = { + description = "Trigger training pipeline in Vertex AI" schedule = "0 0 1 * *" time_zone = "UTC" template_path = "https://-kfp.pkg.dev//vertex-pipelines/xgboost-train-pipeline/v1.2" enable_caching = null pipeline_parameters = { - project_id = - project_location = "europe-west2" - ingestion_project_id = - model_name = "simple_xgboost" - model_label = "label_name" - dataset_id = "preprocessing" - dataset_location = "europe-west2" - ingestion_dataset_id = "chicago_taxi_trips" - timestamp = "2022-12-01 00:00:00" + // Add pipeline parameters which are expected by your pipeline here e.g. + // project = "my-project-id" }, }, - xgboost_prediction = { - description = "Trigger my prediction pipeline in Vertex" + prediction = { + description = "Trigger prediction pipeline in Vertex AI" schedule = "0 0 * * *" time_zone = "UTC" template_path = "https://-kfp.pkg.dev//vertex-pipelines/xgboost-prediction-pipeline/v1.2" enable_caching = null pipeline_parameters = { - project_id = - project_location = "europe-west2" - ingestion_project_id = - model_name = "simple_xgboost" - model_label = "label_name" - dataset_id = "preprocessing" - dataset_location = "europe-west2" - ingestion_dataset_id = "chicago_taxi_trips" - timestamp = "2022-12-01 00:00:00" - batch_prediction_machine_type = "n1-standard-4" - batch_prediction_min_replicas = 3 - batch_prediction_max_replicas = 5 + // Add pipeline parameters which are expected by your pipeline here e.g. + // project = "my-project-id" }, }, diff --git a/docs/notebooks/02_run_pipelines.ipynb b/docs/notebooks/02_run_pipelines.ipynb index 5cb70550..ce971a8e 100644 --- a/docs/notebooks/02_run_pipelines.ipynb +++ b/docs/notebooks/02_run_pipelines.ipynb @@ -159,7 +159,7 @@ "metadata": {}, "outputs": [], "source": [ - "! make run pipeline=training wait=true" + "! make training wait=true" ] }, { @@ -193,7 +193,7 @@ "metadata": {}, "outputs": [], "source": [ - "! make run pipeline=prediction build=false" + "! make prediction" ] }, { @@ -202,9 +202,9 @@ "tags": [] }, "source": [ - "**Note:** The `make run` command has the following true/false flags:\n", + "**Note:** The command has the following true/false flags:\n", "\n", - "- `build` - re-build containers for training & serving code (limit by setting `targets=training` to build only one of the containers)\n", + "- `build` - re-build containers for training & serving code (limit by setting `images=training` to build only one of the containers)\n", "- `compile` - re-compile the pipeline to YAML\n", "- `wait` - run the pipeline (a-)sync" ] diff --git a/docs/notebooks/extras/01_hyperparameter_tuning.ipynb b/docs/notebooks/extras/01_hyperparameter_tuning.ipynb index a19170f0..b49bd952 100644 --- a/docs/notebooks/extras/01_hyperparameter_tuning.ipynb +++ b/docs/notebooks/extras/01_hyperparameter_tuning.ipynb @@ -400,7 +400,7 @@ "metadata": {}, "outputs": [], "source": [ - "! make run pipeline=training build=true targets=training" + "! make training" ] }, { diff --git a/env.sh.example b/env.sh.example index 029c3089..8666c732 100644 --- a/env.sh.example +++ b/env.sh.example @@ -15,6 +15,7 @@ export VERTEX_PROJECT_ID=my-gcp-project export VERTEX_LOCATION=europe-west2 +export BQ_LOCATION=US export VERTEX_NETWORK= export VERTEX_CMEK_IDENTIFIER= diff --git a/model/Dockerfile b/model/Dockerfile index db0b33cd..ffd743b6 100644 --- a/model/Dockerfile +++ b/model/Dockerfile @@ -27,11 +27,11 @@ RUN poetry config virtualenvs.create false && poetry install FROM builder AS training -COPY training/train.py training/train.py +COPY training training FROM builder AS serving RUN poetry install --with serving -COPY serving/main.py serving/main.py +COPY serving serving CMD exec uvicorn serving.main:app --host "0.0.0.0" --port "$AIP_HTTP_PORT" diff --git a/pipelines/tests/__init__.py b/model/training/__init__.py similarity index 100% rename from pipelines/tests/__init__.py rename to model/training/__init__.py diff --git a/model/training/__main__.py b/model/training/__main__.py new file mode 100644 index 00000000..60e48174 --- /dev/null +++ b/model/training/__main__.py @@ -0,0 +1,35 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import os +import logging + +from .train import train + +logging.basicConfig(level=logging.DEBUG) + +parser = argparse.ArgumentParser() +parser.add_argument("--input_path", type=str, required=True) +parser.add_argument("--input_test_path", type=str, required=False) +parser.add_argument("--output_train_path", type=str, required=True) +parser.add_argument("--output_valid_path", type=str, required=True) +parser.add_argument("--output_test_path", type=str, required=True) +parser.add_argument("--output_model", default=os.getenv("AIP_MODEL_DIR"), type=str) +parser.add_argument("--output_metrics", type=str, required=True) +parser.add_argument("--hparams", default={}, type=json.loads) +args = vars(parser.parse_args()) + +train(**args) diff --git a/model/training/train.py b/model/training/train.py index a5cf8544..978dae30 100644 --- a/model/training/train.py +++ b/model/training/train.py @@ -11,23 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import argparse + from pathlib import Path import joblib -import json -import os import logging -import numpy as np import pandas as pd -from sklearn import metrics from sklearn.compose import ColumnTransformer +from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OrdinalEncoder, OneHotEncoder from xgboost import XGBRegressor -logging.basicConfig(level=logging.DEBUG) +from .utils import indices_in_list, save_metrics, save_monitoring_info, split_xy + # used for monitoring during prediction time TRAINING_DATASET_INFO = "training_dataset.json" @@ -37,124 +35,103 @@ OHE_COLS = ["payment_type"] -def split_xy(df: pd.DataFrame, label: str) -> (pd.DataFrame, pd.Series): - """Split dataframe into X and y.""" - return df.drop(columns=[label]), df[label] - - -def indices_in_list(elements: list, base_list: list) -> list: - """Get indices of specific elements in a base list""" - return [idx for idx, elem in enumerate(base_list) if elem in elements] - +def train( + input_path: str, + input_test_path: str, + output_train_path: str, + output_valid_path: str, + output_test_path: str, + output_model: str, + output_metrics: str, + hparams: dict, +): + + logging.info("Read csv files into dataframes") + df = pd.read_csv(input_path) + + logging.info("Split dataframes") + label = hparams.pop("label") + + if input_test_path: + # if static test data is used, only split into train & valid dataframes + if input_test_path.startswith("gs://"): + input_test_path = "/gcs/" + input_test_path[5:] + df_train, df_valid = train_test_split(df, test_size=0.2, random_state=1) + df_test = pd.read_csv(input_test_path) + else: + # otherwise, split into train, valid, and test dataframes + df_train, df_test = train_test_split(df, test_size=0.2, random_state=1) + df_train, df_valid = train_test_split(df_train, test_size=0.25, random_state=1) + + # create output folders + for x in [output_metrics, output_train_path, output_test_path, output_test_path]: + Path(x).parent.mkdir(parents=True, exist_ok=True) + Path(output_model).mkdir(parents=True, exist_ok=True) + + df_train.to_csv(output_train_path, index=False) + df_valid.to_csv(output_valid_path, index=False) + df_test.to_csv(output_test_path, index=False) + + X_train, y_train = split_xy(df_train, label) + X_valid, y_valid = split_xy(df_valid, label) + X_test, y_test = split_xy(df_test, label) + + logging.info("Get the number of unique categories for ordinal encoded columns") + ordinal_columns = X_train[ORD_COLS] + n_unique_cat = ordinal_columns.nunique() + + logging.info("Get indices of columns in base data") + col_list = X_train.columns.tolist() + num_indices = indices_in_list(NUM_COLS, col_list) + cat_indices_onehot = indices_in_list(OHE_COLS, col_list) + cat_indices_ordinal = indices_in_list(ORD_COLS, col_list) + + ordinal_transformers = [ + ( + f"ordinal encoding for {ord_col}", + OrdinalEncoder( + handle_unknown="use_encoded_value", unknown_value=n_unique_cat[ord_col] + ), + [ord_index], + ) + for ord_col in ORD_COLS + for ord_index in cat_indices_ordinal + ] + all_transformers = [ + ("numeric_scaling", StandardScaler(), num_indices), + ( + "one_hot_encoding", + OneHotEncoder(handle_unknown="ignore"), + cat_indices_onehot, + ), + ] + ordinal_transformers -parser = argparse.ArgumentParser() -parser.add_argument("--train-data", type=str, required=True) -parser.add_argument("--valid-data", type=str, required=True) -parser.add_argument("--test-data", type=str, required=True) -parser.add_argument("--model", default=os.getenv("AIP_MODEL_DIR"), type=str, help="") -parser.add_argument("--model-output-uri", type=str) -parser.add_argument("--metrics", type=str, required=True) -parser.add_argument("--hparams", default={}, type=json.loads) -args = parser.parse_args() + logging.info("Build sklearn preprocessing steps") + preprocesser = ColumnTransformer(transformers=all_transformers) + logging.info("Build sklearn pipeline with XGBoost model") + xgb_model = XGBRegressor(**hparams) -if args.model.startswith("gs://"): - args.model = "/gcs/" + args.model[5:] + pipeline = Pipeline( + steps=[("feature_engineering", preprocesser), ("train_model", xgb_model)] + ) -logging.info("Read csv files into dataframes") -df_train = pd.read_csv(args.train_data) -df_valid = pd.read_csv(args.valid_data) -df_test = pd.read_csv(args.test_data) + logging.info("Transform validation data") + valid_preprocesser = preprocesser.fit(X_train) + X_valid_transformed = valid_preprocesser.transform(X_valid) -logging.info("Split dataframes") -label = args.hparams["label"] -X_train, y_train = split_xy(df_train, label) -X_valid, y_valid = split_xy(df_valid, label) -X_test, y_test = split_xy(df_test, label) + logging.info("Fit model") + pipeline.fit( + X_train, y_train, train_model__eval_set=[(X_valid_transformed, y_valid)] + ) -logging.info("Get the number of unique categories for ordinal encoded columns") -ordinal_columns = X_train[ORD_COLS] -n_unique_cat = ordinal_columns.nunique() + logging.info("Predict test data") + y_pred = pipeline.predict(X_test) + y_pred = y_pred.clip(0) -logging.info("Get indices of columns in base data") -col_list = X_train.columns.tolist() -num_indices = indices_in_list(NUM_COLS, col_list) -cat_indices_onehot = indices_in_list(OHE_COLS, col_list) -cat_indices_ordinal = indices_in_list(ORD_COLS, col_list) + logging.info(f"Save model to: {output_model}") + joblib.dump(pipeline, f"{output_model}/model.joblib") -ordinal_transformers = [ - ( - f"ordinal encoding for {ord_col}", - OrdinalEncoder( - handle_unknown="use_encoded_value", unknown_value=n_unique_cat[ord_col] - ), - [ord_index], + save_metrics(y_test, y_pred, output_metrics) + save_monitoring_info( + output_train_path, label, f"{output_model}/{TRAINING_DATASET_INFO}" ) - for ord_col in ORD_COLS - for ord_index in cat_indices_ordinal -] -all_transformers = [ - ("numeric_scaling", StandardScaler(), num_indices), - ( - "one_hot_encoding", - OneHotEncoder(handle_unknown="ignore"), - cat_indices_onehot, - ), -] + ordinal_transformers - -logging.info("Build sklearn preprocessing steps") -preprocesser = ColumnTransformer(transformers=all_transformers) -logging.info("Build sklearn pipeline with XGBoost model") -xgb_model = XGBRegressor(**args.hparams) - -pipeline = Pipeline( - steps=[("feature_engineering", preprocesser), ("train_model", xgb_model)] -) - -logging.info("Transform validation data") -valid_preprocesser = preprocesser.fit(X_train) -X_valid_transformed = valid_preprocesser.transform(X_valid) - -logging.info("Fit model") -pipeline.fit(X_train, y_train, train_model__eval_set=[(X_valid_transformed, y_valid)]) - -logging.info("Predict test data") -y_pred = pipeline.predict(X_test) -y_pred = y_pred.clip(0) - -metrics = { - "problemType": "regression", - "rootMeanSquaredError": np.sqrt(metrics.mean_squared_error(y_test, y_pred)), - "meanAbsoluteError": metrics.mean_absolute_error(y_test, y_pred), - "meanAbsolutePercentageError": metrics.mean_absolute_percentage_error( - y_test, y_pred - ), - "rSquared": metrics.r2_score(y_test, y_pred), - "rootMeanSquaredLogError": np.sqrt(metrics.mean_squared_log_error(y_test, y_pred)), -} - -logging.info(f"Save model to: {args.model}") -Path(args.model).mkdir(parents=True) -joblib.dump(pipeline, f"{args.model}/model.joblib") - -model_uri = "gs://" + args.model[5:] -with open(args.model_output_uri, "w") as f: - f.write(model_uri) - -logging.info(f"Metrics: {metrics}") -with open(args.metrics, "w") as fp: - json.dump(metrics, fp) - -# Persist URIs of training file(s) for model monitoring in batch predictions -# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501 -# for the expected schema. -path = f"{args.model}/{TRAINING_DATASET_INFO}" -training_dataset_for_monitoring = { - "gcsSource": {"uris": [args.train_data]}, - "dataFormat": "csv", - "targetField": label, -} -logging.info(f"Training dataset info: {training_dataset_for_monitoring}") - -with open(path, "w") as fp: - logging.info(f"Save training dataset info for model monitoring: {path}") - json.dump(training_dataset_for_monitoring, fp) diff --git a/model/training/utils.py b/model/training/utils.py new file mode 100644 index 00000000..56b9b8a0 --- /dev/null +++ b/model/training/utils.py @@ -0,0 +1,66 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import json + +import numpy as np +import pandas as pd +from sklearn import metrics + + +def split_xy(df: pd.DataFrame, label: str) -> (pd.DataFrame, pd.Series): + """Split dataframe into X and y.""" + return df.drop(columns=[label]), df[label] + + +def indices_in_list(elements: list, base_list: list) -> list: + """Get indices of specific elements in a base list""" + return [idx for idx, elem in enumerate(base_list) if elem in elements] + + +def save_metrics(y_test: pd.DataFrame, y_pred: pd.DataFrame, output_path: str): + """Save metrics in JSON format for Vertex AI Evaluation.""" + data = { + "problemType": "regression", + "rootMeanSquaredError": np.sqrt(metrics.mean_squared_error(y_test, y_pred)), + "meanAbsoluteError": metrics.mean_absolute_error(y_test, y_pred), + "meanAbsolutePercentageError": metrics.mean_absolute_percentage_error( + y_test, y_pred + ), + "rSquared": metrics.r2_score(y_test, y_pred), + "rootMeanSquaredLogError": np.sqrt( + metrics.mean_squared_log_error(y_test, y_pred) + ), + } + + logging.info(f"Metrics: {metrics}") + with open(output_path, "w") as fp: + json.dump(data, fp) + + +def save_monitoring_info(train_path: str, label: str, output_path: str): + """Persist URIs of training file(s) for model monitoring in batch predictions. + For the expected schema see: + https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501 + """ + training_dataset_for_monitoring = { + "gcsSource": {"uris": [train_path]}, + "dataFormat": "csv", + "targetField": label, + } + logging.info(f"Training dataset info: {training_dataset_for_monitoring}") + + with open(output_path, "w") as fp: + logging.info(f"Save training dataset info for model monitoring: {output_path}") + json.dump(training_dataset_for_monitoring, fp) diff --git a/pipelines/src/pipelines/__init__.py b/pipelines/src/pipelines/__init__.py index 33ac7fee..7ba50f93 100644 --- a/pipelines/src/pipelines/__init__.py +++ b/pipelines/src/pipelines/__init__.py @@ -11,23 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from pathlib import Path -from jinja2 import Template - - -def generate_query(input_file: Path, **replacements) -> str: - """ - Read input file and replace placeholder using Jinja. - - Args: - input_file (Path): input file to read - replacements: keyword arguments to use to replace placeholders - Returns: - str: replaced content of input file - """ - - with open(input_file, "r") as f: - query_template = f.read() - - return Template(query_template).render(**replacements) diff --git a/pipelines/src/pipelines/prediction.py b/pipelines/src/pipelines/prediction.py new file mode 100644 index 00000000..07e39676 --- /dev/null +++ b/pipelines/src/pipelines/prediction.py @@ -0,0 +1,116 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pathlib +from os import environ as env + +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp +from kfp import dsl + +from pipelines.utils.query import generate_query +from components import lookup_model, model_batch_predict + + +RESOURCE_SUFFIX = env.get("RESOURCE_SUFFIX", "default") +# set training-serving skew thresholds and emails to receive alerts: +ALERT_EMAILS = [] +SKEW_THRESHOLDS = {"defaultSkewThreshold": {"value": 0.001}} +# or set different thresholds per feature: +# SKEW_THRESHOLDS = {"skewThresholds": {"payment_type": {"value": 0.001}}, ... } + + +@dsl.pipeline(name="turbo-prediction-pipeline") +def pipeline( + project: str = env.get("VERTEX_PROJECT_ID"), + location: str = env.get("VERTEX_LOCATION"), + bq_location: str = env.get("BQ_LOCATION"), + bq_source_uri: str = "bigquery-public-data.chicago_taxi_trips.taxi_trips", + model_name: str = "xgb_regressor", + dataset: str = "turbo_templates", + timestamp: str = "2022-12-01 00:00:00", + machine_type: str = "n2-standard-4", + min_replicas: int = 3, + max_replicas: int = 10, +): + """ + Prediction pipeline which: + 1. Looks up the default model version (champion). + 2. Runs a batch prediction job with BigQuery as input and output + 3. Optionally monitors training-serving skew + + Args: + project (str): project id of the Google Cloud project + location (str): location of the Google Cloud project + bq_location (str): location of dataset in BigQuery + bq_source_uri (str): `..` of ingestion data in BigQuery + model_name (str): name of model + dataset (str): dataset id to store staging data & predictions in BigQuery + timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format + (YYYY-MM-DDThh:mm:ss.sss±hh:mm or YYYY-MM-DDThh:mm:ss). + If any time part is missing, it will be regarded as zero + machine_type (str): Machine type to be used for Vertex Batch + Prediction. Example machine_types - n1-standard-4, n1-standard-16 etc. + min_replicas (int): Minimum no of machines to distribute the + Vertex Batch Prediction job for horizontal scalability + max_replicas (int): Maximum no of machines to distribute the + Vertex Batch Prediction job for horizontal scalability + """ + + queries_folder = pathlib.Path(__file__).parent / "queries" + table = f"prep_prediction_{RESOURCE_SUFFIX}" + + prep_query = generate_query( + queries_folder / "preprocessing.sql", + source=bq_source_uri, + location=bq_location, + dataset=f"{project}.{dataset}", + table=table, + start_timestamp=timestamp, + ) + + prep_op = BigqueryQueryJobOp( + project=project, + location=bq_location, + query=prep_query, + ).set_display_name("Ingest & preprocess data") + + lookup_op = lookup_model( + model_name=model_name, + location=location, + project=project, + fail_on_model_not_found=True, + ).set_display_name("Look up champion model") + + ( + model_batch_predict( + model=lookup_op.outputs["model"], + job_display_name="turbo-template-predict-job", + location=location, + project=project, + source_uri=f"bq://{project}.{dataset}.{table}", + destination_uri=f"bq://{project}.{dataset}", + source_format="bigquery", + destination_format="bigquery", + instance_config={ + "instanceType": "object", + }, + machine_type=machine_type, + starting_replica_count=min_replicas, + max_replica_count=max_replicas, + monitoring_training_dataset=lookup_op.outputs["training_dataset"], + monitoring_alert_email_addresses=ALERT_EMAILS, + monitoring_skew_config=SKEW_THRESHOLDS, + ) + .after(prep_op) + .set_display_name("Run prediction job") + ) diff --git a/pipelines/src/pipelines/prediction/pipeline.py b/pipelines/src/pipelines/prediction/pipeline.py deleted file mode 100644 index dd63d727..00000000 --- a/pipelines/src/pipelines/prediction/pipeline.py +++ /dev/null @@ -1,153 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import pathlib - -from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp -from kfp import dsl - -from pipelines import generate_query -from components import lookup_model, model_batch_predict - - -@dsl.pipeline(name="xgboost-prediction-pipeline") -def pipeline( - project_id: str = os.environ.get("VERTEX_PROJECT_ID"), - project_location: str = os.environ.get("VERTEX_LOCATION"), - ingestion_project_id: str = os.environ.get("VERTEX_PROJECT_ID"), - model_name: str = "simple_xgboost", - preprocessing_dataset_id: str = "preprocessing", - dataset_location: str = os.environ.get("VERTEX_LOCATION"), - ingestion_dataset_id: str = "chicago_taxi_trips", - prediction_dataset_id: str = "prediction", - timestamp: str = "2022-12-01 00:00:00", - batch_prediction_machine_type: str = "n1-standard-4", - batch_prediction_min_replicas: int = 3, - batch_prediction_max_replicas: int = 10, - resource_suffix: str = os.environ.get("RESOURCE_SUFFIX"), -): - """ - XGB prediction pipeline which: - 1. Looks up the default model version (champion) and - dataset which was used to the train model. - 2. Runs a BatchPredictionJob with optional training-serving skew detection. - - Args: - project_id (str): project id of the Google Cloud project - project_location (str): location of the Google Cloud project - ingestion_project_id (str): project id containing the source bigquery data - for ingestion. This can be the same as `project_id` if the source data is - in the same project where the ML pipeline is executed. - model_name (str): name of model - preprocessing_dataset_id (str): id of BQ dataset used to - store all staging data . - prediction_dataset_id (str): id of BQ dataset used to - store all predictions. - dataset_location (str): location of dataset - ingestion_dataset_id (str): dataset id of ingestion data - timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format - (YYYY-MM-DDThh:mm:ss.sss±hh:mm or YYYY-MM-DDThh:mm:ss). - If any time part is missing, it will be regarded as zero. - batch_prediction_machine_type (str): Machine type to be used for Vertex Batch - Prediction. Example machine_types - n1-standard-4, n1-standard-16 etc. - batch_prediction_min_replicas (int): Minimum no of machines to distribute the - Vertex Batch Prediction job for horizontal scalability - batch_prediction_max_replicas (int): Maximum no of machines to distribute the - Vertex Batch Prediction job for horizontal scalability. - resource_suffix (str): Optional. Additional suffix to append GCS resources - that get overwritten. - - Returns: - None - - """ - - # Create variables to ensure the same arguments are passed - # into different components of the pipeline - time_column = "trip_start_timestamp" - ingestion_table = "taxi_trips" - table_suffix = "_xgb_prediction_" + str(resource_suffix) # suffix to table names - ingested_table = "ingested_data_" + table_suffix - monitoring_alert_email_addresses = [] - monitoring_skew_config = {"defaultSkewThreshold": {"value": 0.001}} - - # generate sql queries which are used in ingestion and preprocessing - # operations - queries_folder = pathlib.Path(__file__).parent / "queries" - - preprocessing_query = generate_query( - queries_folder / "preprocessing.sql", - source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", - source_table=ingestion_table, - prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}", - preprocessing_dataset=f"{ingestion_project_id}.{preprocessing_dataset_id}", - ingested_table=ingested_table, - dataset_region=project_location, - filter_column=time_column, - filter_start_value=timestamp, - ) - - preprocessing = ( - BigqueryQueryJobOp( - project=project_id, - location=dataset_location, - query=preprocessing_query, - ) - .set_caching_options(False) - .set_display_name("Ingest data") - ) - - # lookup champion model - champion_model = ( - lookup_model( - model_name=model_name, - project_location=project_location, - project_id=project_id, - fail_on_model_not_found=True, - ) - .set_display_name("Look up champion model") - .set_caching_options(False) - ) - - # batch predict from BigQuery to BigQuery - bigquery_source_input_uri = ( - f"bq://{project_id}.{preprocessing_dataset_id}.{ingested_table}" - ) - bigquery_destination_output_uri = f"bq://{project_id}.{prediction_dataset_id}" - - batch_prediction = ( - model_batch_predict( - model=champion_model.outputs["model"], - job_display_name="my-xgboost-batch-prediction-job", - project_location=project_location, - project_id=project_id, - source_uri=bigquery_source_input_uri, - destination_uri=bigquery_destination_output_uri, - source_format="bigquery", - destination_format="bigquery", - instance_config={ - "instanceType": "object", - }, - machine_type=batch_prediction_machine_type, - starting_replica_count=batch_prediction_min_replicas, - max_replica_count=batch_prediction_max_replicas, - monitoring_training_dataset=champion_model.outputs["training_dataset"], - monitoring_alert_email_addresses=monitoring_alert_email_addresses, - monitoring_skew_config=monitoring_skew_config, - ) - .set_caching_options(False) - .after(preprocessing) - .set_display_name("Batch prediction job") - ) diff --git a/pipelines/src/pipelines/prediction/queries/preprocessing.sql b/pipelines/src/pipelines/prediction/queries/preprocessing.sql deleted file mode 100644 index 294223a4..00000000 --- a/pipelines/src/pipelines/prediction/queries/preprocessing.sql +++ /dev/null @@ -1,69 +0,0 @@ --- Copyright 2022 Google LLC - --- Licensed under the Apache License, Version 2.0 (the 'License'); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an 'AS IS' BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - --- Treat 'filter_start_value' as the current time, unless it is empty then use CURRENT_DATETIME() instead --- This allows us to set the filter_start_value to a specific time for testing or for backfill - --- If prediction dataset don't exist, create it -CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` - OPTIONS ( - description = 'Prediction Dataset', - location = '{{ dataset_region }}'); - --- We recreate the ingestion table every time the pipeline run, --- so we need to drop the generated in the previous run -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( -with filter_start_values as ( - SELECT - IF('{{ filter_start_value }}' = '', CURRENT_DATETIME(), CAST('{{ filter_start_value }}' AS DATETIME)) as filter_start_value -) --- Ingest data between 2 and 3 months ago -,filtered_data as ( - SELECT - * - FROM `{{ source_dataset }}.{{ source_table }}`, filter_start_values - WHERE - DATE({{ filter_column }}) BETWEEN - DATE_SUB(DATE(CAST(filter_start_values.filter_start_value as DATETIME)), INTERVAL 3 MONTH) AND - DATE_SUB(DATE(filter_start_value), INTERVAL 2 MONTH) -) --- Use the average trip_seconds as a replacement for NULL or 0 values -,mean_time as ( - SELECT CAST(avg(trip_seconds) AS INT64) as avg_trip_seconds - FROM filtered_data -) - -SELECT - CAST(EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS FLOAT64) AS dayofweek, - CAST(EXTRACT(HOUR FROM trip_start_timestamp) AS FLOAT64) AS hourofday, - ST_DISTANCE( - ST_GEOGPOINT(pickup_longitude, pickup_latitude), - ST_GEOGPOINT(dropoff_longitude, dropoff_latitude)) AS trip_distance, - trip_miles, - CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds - WHEN trip_seconds <= 0 then m.avg_trip_seconds - ELSE trip_seconds - END AS FLOAT64) AS trip_seconds, - payment_type, - company, -FROM filtered_data as t, mean_time as m -WHERE - trip_miles > 0 AND fare > 0 AND fare < 1500 - {% for field in ['fare', 'trip_start_timestamp', 'pickup_longitude', - 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude','payment_type','company'] %} - AND `{{ field }}` IS NOT NULL - {% endfor %} - ); diff --git a/pipelines/src/pipelines/queries/preprocessing.sql b/pipelines/src/pipelines/queries/preprocessing.sql new file mode 100644 index 00000000..0cd8f953 --- /dev/null +++ b/pipelines/src/pipelines/queries/preprocessing.sql @@ -0,0 +1,56 @@ +-- Create dataset if it doesn't exist +CREATE SCHEMA IF NOT EXISTS `{{ dataset }}` + OPTIONS ( + description = 'Chicago Taxi Trips with Turbo Template', + location = '{{ location }}'); + +-- Create (or replace) table with preprocessed data +DROP TABLE IF EXISTS `{{ dataset }}.{{ table }}`; +CREATE TABLE `{{ dataset }}.{{ table }}` AS ( +WITH start_timestamps AS ( +SELECT + IF('{{ start_timestamp }}' = '', + CURRENT_DATETIME(), + CAST('{{ start_timestamp }}' AS DATETIME)) AS start_timestamp +) +-- Ingest data between 2 and 3 months ago +,filtered_data AS ( + SELECT + * + FROM `{{ source }}`, start_timestamps + WHERE + DATE(trip_start_timestamp) BETWEEN + DATE_SUB(DATE(CAST(start_timestamps.start_timestamp AS DATETIME)), INTERVAL 3 MONTH) AND + DATE_SUB(DATE(start_timestamp), INTERVAL 2 MONTH) +) +-- Use the average trip_seconds as a replacement for NULL or 0 values +,mean_time AS ( + SELECT CAST(avg(trip_seconds) AS INT64) as avg_trip_seconds + FROM filtered_data +) + +SELECT + CAST(EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS FLOAT64) AS dayofweek, + CAST(EXTRACT(HOUR FROM trip_start_timestamp) AS FLOAT64) AS hourofday, + ST_DISTANCE( + ST_GEOGPOINT(pickup_longitude, pickup_latitude), + ST_GEOGPOINT(dropoff_longitude, dropoff_latitude)) AS trip_distance, + trip_miles, + CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds + WHEN trip_seconds <= 0 then m.avg_trip_seconds + ELSE trip_seconds + END AS FLOAT64) AS trip_seconds, + payment_type, + company, + {% if label %} + (fare + tips + tolls + extras) AS `{{ label }}`, + {% endif %} +FROM filtered_data AS t, mean_time AS m +WHERE + trip_miles > 0 AND fare > 0 AND fare < 1500 + {% for field in [ + 'fare', 'trip_start_timestamp', 'pickup_longitude', 'pickup_latitude', + 'dropoff_longitude', 'dropoff_latitude','payment_type','company' ] %} + AND `{{ field }}` IS NOT NULL + {% endfor %} +); diff --git a/pipelines/src/pipelines/training.py b/pipelines/src/pipelines/training.py new file mode 100644 index 00000000..1b621e84 --- /dev/null +++ b/pipelines/src/pipelines/training.py @@ -0,0 +1,160 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pathlib +from os import environ as env + +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp +from kfp import dsl +from kfp.dsl import Dataset, Input, Metrics, Model, Output + +from pipelines.utils.query import generate_query +from components import extract_table, upload_model + + +LABEL = "total_fare" +PRIMARY_METRIC = "rootMeanSquaredError" +HPARAMS = dict( + n_estimators=200, + early_stopping_rounds=10, + objective="reg:squarederror", + booster="gbtree", + learning_rate=0.3, + min_split_loss=0, + max_depth=6, + label=LABEL, +) +RESOURCE_SUFFIX = env.get("RESOURCE_SUFFIX", "default") +TRAINING_IMAGE = f"{env['CONTAINER_IMAGE_REGISTRY']}/training:{RESOURCE_SUFFIX}" +SERVING_IMAGE = f"{env['CONTAINER_IMAGE_REGISTRY']}/serving:{RESOURCE_SUFFIX}" + + +@dsl.container_component +def train( + input_data: Input[Dataset], + input_test_path: str, + hparams: dict, + train_data: Output[Dataset], + valid_data: Output[Dataset], + test_data: Output[Dataset], + model: Output[Model], + metrics: Output[Metrics], +): + return dsl.ContainerSpec( + image=TRAINING_IMAGE, + command=["python"], + args=[ + "-m" "training", + "--input_path", + input_data.path, + dsl.IfPresentPlaceholder( + input_name="input_test_path", + then=["--input_test_path", input_test_path], + ), + "--hparams", + hparams, + "--output_train_path", + train_data.path, + "--output_valid_path", + valid_data.path, + "--output_test_path", + test_data.path, + "--output_model", + model.path, + "--output_metrics", + metrics.path, + ], + ) + + +@dsl.pipeline(name="turbo-training-pipeline") +def pipeline( + project: str = env.get("VERTEX_PROJECT_ID"), + location: str = env.get("VERTEX_LOCATION"), + bq_location: str = env.get("BQ_LOCATION"), + bq_source_uri: str = "bigquery-public-data.chicago_taxi_trips.taxi_trips", + model_name: str = "xgb_regressor", + dataset: str = "turbo_templates", + timestamp: str = "2022-12-01 00:00:00", + test_data_gcs_uri: str = "", +): + """ + Training pipeline which: + 1. Preprocesses data in BigQuery + 2. Extracts data to Cloud Storage + 3. Trains a model using a custom prebuilt container + 4. Uploads the model to Model Registry + 5. Evaluates the model against a champion model + 6. Selects a new champion based on the primary metrics + + Args: + project (str): project id of the Google Cloud project + location (str): location of the Google Cloud project + bq_location (str): location of dataset in BigQuery + bq_source_uri (str): `..
` of ingestion data in BigQuery + model_name (str): name of model + dataset (str): dataset id to store staging data & predictions in BigQuery + timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format + (YYYY-MM-DDThh:mm:ss.sss±hh:mm or YYYY-MM-DDThh:mm:ss). + If any time part is missing, it will be regarded as zero. + test_data_gcs_uri (str): Optional. GCS URI of static held-out test dataset. + """ + + table = f"prep_training_{RESOURCE_SUFFIX}" + queries_folder = pathlib.Path(__file__).parent / "queries" + + prep_query = generate_query( + queries_folder / "preprocessing.sql", + source=bq_source_uri, + location=bq_location, + dataset=f"{project}.{dataset}", + table=table, + label=LABEL, + start_timestamp=timestamp, + ) + + prep_op = BigqueryQueryJobOp( + project=project, + location=bq_location, + query=prep_query, + ).set_display_name("Ingest & preprocess data") + + data_op = ( + extract_table( + project=project, + location=bq_location, + table=f"{project}:{dataset}.{table}", + ) + .after(prep_op) + .set_display_name("Extract data") + ) + + train_op = train( + input_data=data_op.outputs["data"], + input_test_path=test_data_gcs_uri, + hparams=HPARAMS, + ).set_display_name("Train model") + + upload_model( + project=project, + location=location, + model=train_op.outputs["model"], + model_evaluation=train_op.outputs["metrics"], + test_data=train_op.outputs["test_data"], + eval_metric=PRIMARY_METRIC, + eval_lower_is_better=True, + serving_container_image=SERVING_IMAGE, + model_name=model_name, + model_description="Predict price of a taxi trip.", + pipeline_job_id="{{$.pipeline_job_name}}", + ).set_display_name("Upload model") diff --git a/pipelines/src/pipelines/training/pipeline.py b/pipelines/src/pipelines/training/pipeline.py deleted file mode 100644 index dd73e910..00000000 --- a/pipelines/src/pipelines/training/pipeline.py +++ /dev/null @@ -1,212 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import pathlib - -from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp -from kfp import dsl -from kfp.dsl import Dataset, Input, Metrics, Model, Output, OutputPath -from pipelines import generate_query -from components import extract_table, upload_model - -CONTAINER_IMAGE_REGISTRY = os.environ["CONTAINER_IMAGE_REGISTRY"] -RESOURCE_SUFFIX = os.environ.get("RESOURCE_SUFFIX", "default") -TRAINING_IMAGE = f"{CONTAINER_IMAGE_REGISTRY}/training:{RESOURCE_SUFFIX}" -SERVING_IMAGE = f"{CONTAINER_IMAGE_REGISTRY}/serving:{RESOURCE_SUFFIX}" - - -@dsl.container_component -def train( - train_data: Input[Dataset], - valid_data: Input[Dataset], - test_data: Input[Dataset], - model: Output[Model], - model_output_uri: OutputPath(str), - metrics: Output[Metrics], - hparams: dict, -): - return dsl.ContainerSpec( - image=TRAINING_IMAGE, - command=["python"], - args=[ - "training/train.py", - "--train-data", - train_data.path, - "--valid-data", - valid_data.path, - "--test-data", - test_data.path, - "--model", - model.path, - "--model-output-uri", - model_output_uri, - "--metrics", - metrics.path, - "--hparams", - hparams, - ], - ) - - -@dsl.pipeline(name="xgboost-train-pipeline") -def pipeline( - project_id: str = os.environ.get("VERTEX_PROJECT_ID"), - project_location: str = os.environ.get("VERTEX_LOCATION"), - ingestion_project_id: str = os.environ.get("VERTEX_PROJECT_ID"), - model_name: str = "simple_xgboost", - dataset_id: str = "preprocessing", - dataset_location: str = os.environ.get("VERTEX_LOCATION"), - ingestion_dataset_id: str = "chicago_taxi_trips", - timestamp: str = "2022-12-01 00:00:00", - resource_suffix: str = os.environ.get("RESOURCE_SUFFIX"), - test_dataset_uri: str = "", -): - """ - XGB training pipeline which: - 1. Splits and extracts a dataset from BQ to GCS - 2. Trains a model via Vertex AI CustomTrainingJob - 3. Evaluates the model against the current champion model - 4. If better the model becomes the new default model - - Args: - project_id (str): project id of the Google Cloud project - project_location (str): location of the Google Cloud project - ingestion_project_id (str): project id containing the source bigquery data - for ingestion. This can be the same as `project_id` if the source data is - in the same project where the ML pipeline is executed. - model_name (str): name of model - dataset_id (str): id of BQ dataset used to store all staging data & predictions - dataset_location (str): location of dataset - ingestion_dataset_id (str): dataset id of ingestion data - timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format - (YYYY-MM-DDThh:mm:ss.sss±hh:mm or YYYY-MM-DDThh:mm:ss). - If any time part is missing, it will be regarded as zero. - resource_suffix (str): Optional. Additional suffix to append GCS resources - that get overwritten. - test_dataset_uri (str): Optional. GCS URI of statis held-out test dataset. - """ - - # Create variables to ensure the same arguments are passed - # into different components of the pipeline - label_column_name = "total_fare" - time_column = "trip_start_timestamp" - ingestion_table = "taxi_trips" - table_suffix = f"_xgb_training_{resource_suffix}" # suffix to table names - ingested_table = "ingested_data" + table_suffix - preprocessed_table = "preprocessed_data" + table_suffix - train_table = "train_data" + table_suffix - valid_table = "valid_data" + table_suffix - test_table = "test_data" + table_suffix - primary_metric = "rootMeanSquaredError" - hparams = dict( - n_estimators=200, - early_stopping_rounds=10, - objective="reg:squarederror", - booster="gbtree", - learning_rate=0.3, - min_split_loss=0, - max_depth=6, - label=label_column_name, - ) - - # generate sql queries which are used in ingestion and preprocessing - # operations - - queries_folder = pathlib.Path(__file__).parent / "queries" - - preprocessing_query = generate_query( - queries_folder / "preprocessing.sql", - source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", - source_table=ingestion_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - ingested_table=ingested_table, - dataset_region=project_location, - filter_column=time_column, - target_column=label_column_name, - filter_start_value=timestamp, - train_table=train_table, - validation_table=valid_table, - test_table=test_table, - ) - - preprocessing = ( - BigqueryQueryJobOp( - project=project_id, - location=dataset_location, - query=preprocessing_query, - ) - .set_caching_options(False) - .set_display_name("Ingest & preprocess data") - ) - - # data extraction to gcs - - train_dataset = ( - extract_table( - bq_client_project_id=project_id, - source_project_id=project_id, - dataset_id=dataset_id, - table_name=train_table, - dataset_location=dataset_location, - ) - .after(preprocessing) - .set_display_name("Extract train data") - .set_caching_options(False) - ).outputs["dataset"] - valid_dataset = ( - extract_table( - bq_client_project_id=project_id, - source_project_id=project_id, - dataset_id=dataset_id, - table_name=valid_table, - dataset_location=dataset_location, - ) - .after(preprocessing) - .set_display_name("Extract validation data") - .set_caching_options(False) - ).outputs["dataset"] - test_dataset = ( - extract_table( - bq_client_project_id=project_id, - source_project_id=project_id, - dataset_id=dataset_id, - table_name=test_table, - dataset_location=dataset_location, - destination_gcs_uri=test_dataset_uri, - ) - .after(preprocessing) - .set_display_name("Extract test data") - .set_caching_options(False) - ).outputs["dataset"] - - train_model = train( - train_data=train_dataset, - valid_data=valid_dataset, - test_data=test_dataset, - hparams=hparams, - ).set_display_name("Train model") - - upload_model_op = upload_model( - project_id=project_id, - project_location=project_location, - model=train_model.outputs["model"], - model_evaluation=train_model.outputs["metrics"], - test_dataset=test_dataset, - eval_metric=primary_metric, - eval_lower_is_better=True, - serving_container_image=SERVING_IMAGE, - model_name=model_name, - pipeline_job_id="{{$.pipeline_job_name}}", - ).set_display_name("Upload model") diff --git a/pipelines/src/pipelines/training/queries/preprocessing.sql b/pipelines/src/pipelines/training/queries/preprocessing.sql deleted file mode 100644 index a7291038..00000000 --- a/pipelines/src/pipelines/training/queries/preprocessing.sql +++ /dev/null @@ -1,88 +0,0 @@ --- If preprocessing dataset don't exist, create it -CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` - OPTIONS ( - description = 'Preprocessing Dataset', - location = '{{ dataset_region }}'); - --- We recreate the ingestion table every time the pipeline run, --- so we need to drop the generated in the previous run -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( -WITH filter_start_values AS ( -SELECT - IF('{{ filter_start_value }}' = '', - CURRENT_DATETIME(), - CAST('{{ filter_start_value }}' AS DATETIME)) AS filter_start_value -) --- Ingest data between 2 and 3 months ago -,filtered_data AS ( - SELECT - * - FROM `{{ source_dataset }}.{{ source_table }}`, filter_start_values - WHERE - DATE({{ filter_column }}) BETWEEN - DATE_SUB(DATE(CAST(filter_start_values.filter_start_value AS DATETIME)), INTERVAL 3 MONTH) AND - DATE_SUB(DATE(filter_start_value), INTERVAL 2 MONTH) -) --- Use the average trip_seconds as a replacement for NULL or 0 values -,mean_time AS ( - SELECT CAST(avg(trip_seconds) AS INT64) as avg_trip_seconds - FROM filtered_data -) -SELECT - CAST(EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS FLOAT64) AS dayofweek, - CAST(EXTRACT(HOUR FROM trip_start_timestamp) AS FLOAT64) AS hourofday, - ST_DISTANCE( - ST_GEOGPOINT(pickup_longitude, pickup_latitude), - ST_GEOGPOINT(dropoff_longitude, dropoff_latitude)) AS trip_distance, - trip_miles, - CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds - WHEN trip_seconds <= 0 then m.avg_trip_seconds - ELSE trip_seconds - END AS FLOAT64) AS trip_seconds, - payment_type, - company, - (fare + tips + tolls + extras) AS `{{ target_column }}`, -FROM filtered_data AS t, mean_time AS m -WHERE - trip_miles > 0 AND fare > 0 AND fare < 1500 - {% for field in ['fare', 'trip_start_timestamp', 'pickup_longitude', - 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude','payment_type','company'] %} - AND `{{ field }}` IS NOT NULL - {% endfor %} -); - --- Drop and creation of train, testing and validations tables -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ train_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ train_table }}` AS ( -SELECT - * -FROM - `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t -WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - 10) IN (0, 1, 2, 3, 4, 5, 6, 7)); - -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ validation_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ validation_table }}` AS ( -SELECT - * -FROM - `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t -WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - 10) IN (8)); - -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ test_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ test_table }}` AS ( -SELECT - * -FROM - `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t -WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - 10) IN (9)); diff --git a/pipelines/tests/conftest.py b/pipelines/src/pipelines/utils/query.py similarity index 53% rename from pipelines/tests/conftest.py rename to pipelines/src/pipelines/utils/query.py index 8083a83c..c25ddb39 100644 --- a/pipelines/tests/conftest.py +++ b/pipelines/src/pipelines/utils/query.py @@ -11,19 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path +from jinja2 import Template -import pytest +def generate_query(input_file: Path, **replacements) -> str: + """ + Read input file and replace placeholder using Jinja. -def pytest_addoption(parser): - parser.addoption( - "--enable_caching", - type=str, - help="Whether to enable or disable caching for all pipeline steps", - default=None, - ) + Args: + input_file (Path): input file to read + replacements: keyword arguments to use to replace placeholders + Returns: + str: replaced content of input file + """ + with open(input_file, "r") as f: + query_template = f.read() -@pytest.fixture(scope="session") -def enable_caching(pytestconfig): - return pytestconfig.getoption("enable_caching") + return Template(query_template).render(**replacements) diff --git a/terraform/modules/scheduled_pipelines/scheduled_jobs.auto.tfvars.example b/terraform/modules/scheduled_pipelines/scheduled_jobs.auto.tfvars.example index f864e7e9..77682e81 100644 --- a/terraform/modules/scheduled_pipelines/scheduled_jobs.auto.tfvars.example +++ b/terraform/modules/scheduled_pipelines/scheduled_jobs.auto.tfvars.example @@ -16,41 +16,41 @@ cloud_schedulers_config = { - xgboost_training = { - description = "Trigger my training pipeline in Vertex" + training = { + description = "Trigger training pipeline in Vertex AI" schedule = "0 0 * * 0" time_zone = "UTC" template_path = "https://-kfp.pkg.dev//vertex-pipelines/xgboost-train-pipeline/" enable_caching = null pipeline_parameters = { - project_id = "my-project-id" - project_location = "europe-west2" - ingestion_project_id = "my-project-id" - model_name = "xgboost_with_preprocessing" - dataset_id = "preprocessing" - dataset_location = "europe-west2" - ingestion_dataset_id = "chicago_taxi_trips" + project = "my-project-id" + location = "europe-west2" + bq_location = "US" + bq_source_uri = "bigquery-public-data.chicago_taxi_trips.taxi_trips" + model_name = "xgb_regressor" + dataset = "turbo_templates" timestamp = "2022-12-01 00:00:00" + test_data_gcs_uri: str = "" } }, - xgboost_prediction = { - description = "Trigger my prediction pipeline in Vertex" + prediction = { + description = "Trigger prediction pipeline in Vertex AI" schedule = "0 0 * * 0" time_zone = "UTC" template_path = "https://-kfp.pkg.dev//vertex-pipelines/xgboost-prediction-pipeline/" enable_caching = null pipeline_parameters = { - project_id = "my-project-id" - project_location = "europe-west2" - ingestion_project_id = "my-project-id" - model_name = "xgboost_with_preprocessing" - dataset_location = "europe-west2" - ingestion_dataset_id = "chicago_taxi_trips" + project = "my-project-id" + location = "europe-west2" + bq_location = "US" + bq_source_uri = "bigquery-public-data.chicago_taxi_trips.taxi_trips" + model_name = "xgb_regressor" + dataset = "turbo_templates" timestamp = "2022-12-01 00:00:00" - batch_prediction_machine_type = "n1-standard-4" - batch_prediction_min_replicas = 3 - batch_prediction_max_replicas = 5 + machine_type = "n2-standard-4" + min_replicas = 3 + max_replicas = 10 } } }