Skip to content

Commit

Permalink
Merge pull request #40 from teamdatatonic/refactor/train_pipeline
Browse files Browse the repository at this point in the history
Update training pipeline
  • Loading branch information
felix-datatonic authored Nov 15, 2023
2 parents fbc608f + 07fb10b commit 4c80faf
Show file tree
Hide file tree
Showing 35 changed files with 759 additions and 1,047 deletions.
56 changes: 32 additions & 24 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,56 @@
-include env.sh
export


help: ## Display this help screen
help: ## Display this help screen.
@grep -h -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

pre-commit: ## Runs the pre-commit checks over entire repo
cd pipelines && \
poetry run pre-commit run --all-files

env ?= dev
AUTO_APPROVE_FLAG :=
deploy: ## Deploy the Terraform infrastructure to your project. Requires VERTEX_PROJECT_ID and VERTEX_LOCATION env variables to be set in env.sh. Optionally specify env=<dev|test|prod> (default = dev)
deploy: ## Deploy infrastructure to your project. Optionally set env=<dev|test|prod> (default = dev).
@if [ "$(auto-approve)" = "true" ]; then \
AUTO_APPROVE_FLAG="-auto-approve"; \
fi; \
cd terraform/envs/$(env) && \
terraform init -backend-config='bucket=${VERTEX_PROJECT_ID}-tfstate' && \
terraform apply -var 'project_id=${VERTEX_PROJECT_ID}' -var 'region=${VERTEX_LOCATION}' $$AUTO_APPROVE_FLAG

undeploy: ## DESTROY the Terraform infrastructure in your project. Requires VERTEX_PROJECT_ID and VERTEX_LOCATION env variables to be set in env.sh. Optionally specify env=<dev|test|prod> (default = dev)
undeploy: ## DESTROY the infrastructure in your project. Optionally set env=<dev|test|prod> (default = dev).
@if [ "$(auto-approve)" = "true" ]; then \
AUTO_APPROVE_FLAG="-auto-approve"; \
fi; \
cd terraform/envs/$(env) && \
terraform init -backend-config='bucket=${VERTEX_PROJECT_ID}-tfstate' && \
terraform destroy -var 'project_id=${VERTEX_PROJECT_ID}' -var 'region=${VERTEX_LOCATION}' $$AUTO_APPROVE_FLAG

install: ## Set up local environment for Python development on pipelines
install: ## Set up local Python environment for development.
@cd pipelines && \
poetry install --with dev && \
cd ../components && \
poetry install --with dev
poetry install --with dev && \
cd ../model && \
poetry install

compile: ## Compile the pipeline to pipeline.yaml. Must specify pipeline=<training|prediction>
compile: ## Compile pipeline. Must set pipeline=<training|prediction>.
@cd pipelines/src && \
poetry run kfp dsl compile --py pipelines/${pipeline}/pipeline.py --output pipelines/${pipeline}/pipeline.yaml --function pipeline
echo "Compiling $$pipeline pipeline" && \
poetry run kfp dsl compile --py pipelines/${pipeline}.py --output pipelines/${pipeline}.yaml --function pipeline

targets ?= training serving
build: ## Build and push training and/or serving container(s) image using Docker. Specify targets=<training serving> e.g. targets=training or targets="training serving" (default)
images ?= training serving
build: ## Build and push container(s). Set images=<training serving> e.g. images=training (default = training serving).
@cd model && \
for target in $$targets ; do \
echo "Building $$target image" && \
for image in $$images ; do \
echo "Building $$image image" && \
gcloud builds submit . \
--region=${VERTEX_LOCATION} \
--project=${VERTEX_PROJECT_ID} \
--gcs-source-staging-dir=gs://${VERTEX_PROJECT_ID}-staging/source \
--substitutions=_DOCKER_TARGET=$$target,_DESTINATION_IMAGE_URI=${CONTAINER_IMAGE_REGISTRY}/$$target:${RESOURCE_SUFFIX} ; \
--substitutions=_DOCKER_TARGET=$$image,_DESTINATION_IMAGE_URI=${CONTAINER_IMAGE_REGISTRY}/$$image:${RESOURCE_SUFFIX} ; \
done


compile ?= true
build ?= true
wait ?= false
run: ## Run pipeline in sandbox environment. Must specify pipeline=<training|prediction>. Optionally specify wait=<true|false> (default = false). Set compile=false to skip recompiling the pipeline and set build=false to skip rebuilding container images
run: ## Run pipeline. Must set pipeline=<training|prediction>. Optionally set wait=<true|false> (default = false), compile=<true|false> (default = true) to recompile pipeline, build=<true|false> (default = true) to rebuild container image(s), images=<training serving> (default = training serving) to set which images are rebuilt.
@if [ $(compile) = "true" ]; then \
$(MAKE) compile ; \
elif [ $(compile) != "false" ]; then \
Expand All @@ -81,19 +78,30 @@ run: ## Run pipeline in sandbox environment. Must specify pipeline=<training|pre
exit ; \
fi && \
cd pipelines/src && \
poetry run python -m pipelines.utils.trigger_pipeline --template_path=pipelines/${pipeline}/pipeline.yaml --display_name=${pipeline} --wait=${wait}
echo "Running $$pipeline pipeline" && \
poetry run python -m pipelines.utils.trigger_pipeline --template_path=pipelines/${pipeline}.yaml --display_name=${pipeline} --wait=${wait}

training: ## Shortcut to run training pipeline. Rebuilds training and serving images. Supports same options as run.
$(MAKE) run pipeline=training images=training prediction

prediction: ## Shortcut to run prediction pipeline. Doesn't rebuilt images. Supports same options as run.
$(MAKE) run pipeline=prediction build=false

components ?= true
test: ## Run unit tests. Specify components=<true|false> to test scripts and optionally components
test: ## Run unit tests for pipelines. Optionally set components=<true|false> (default = true) to test components package.
@if [ $(components) = "true" ]; then \
echo "Testing components" && \
echo "Running unit tests in components" && \
cd components && \
poetry run pytest && \
cd .. ; \
elif [ $(components) != "false" ]; then \
echo "ValueError: components must be either true or false" ; \
exit ; \
fi && \
echo "Testing scripts" && \
echo "Running unit tests in pipelines" && \
cd pipelines && \
poetry run python -m pytest tests/utils
poetry run python -m pytest

pre-commit: ## Run pre-commit checks for pipelines.
cd pipelines && \
poetry run pre-commit run --all-files
23 changes: 15 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
-->

# Vertex Pipelines End-to-end Samples
# Vertex Pipelines End-to-End Samples

_AKA "Vertex AI Turbo Templates"_

Expand Down Expand Up @@ -71,19 +71,19 @@ Before your CI/CD pipelines can deploy the infrastructure, you will need to set
```bash
export DEV_PROJECT_ID=my-dev-gcp-project
export DEV_LOCATION=europe-west2
gsutil mb -l DEV_LOCATION -p DEV_PROJECT_ID --pap=enforced gs://DEV_PROJECT_ID-tfstate && \
gsutil ubla set on gs://DEV_PROJECT_ID-tfstate
gsutil mb -l $DEV_LOCATION -p $DEV_PROJECT_ID --pap=enforced gs://$DEV_PROJECT_ID-tfstate && \
gsutil ubla set on gs://$DEV_PROJECT_ID-tfstate
```

Enable APIs in admin project:

```bash
export ADMIN_PROJECT_ID=my-admin-gcp-project
gcloud services enable cloudresourcemanager.googleapis.com serviceusage.googleapis.com --project=ADMIN_PROJECT_ID
gcloud services enable cloudresourcemanager.googleapis.com serviceusage.googleapis.com --project=$ADMIN_PROJECT_ID
```

```bash
make deploy env=dev VERTEX_PROJECT_ID=<DEV PROJECT ID>
make deploy env=dev
```

More details about infrastructure is explained in [this README](docs/INFRASTRUCTURE.md).
Expand Down Expand Up @@ -117,10 +117,10 @@ You can modify this to suit your own use case.
Build the training and serving container images and push them to Artifact Registry with:

```bash
make build [ targets=training serving ]
make build [ images=training serving ]
```

Optionally specify the `targets` variable to only build one of the images.
Optionally specify the `images` variable to only build one of the images.

**Execute pipelines:** Vertex AI Pipelines uses KubeFlow to orchestrate your training steps, as such you'll need to:

Expand All @@ -136,10 +136,17 @@ make run pipeline=training [ wait=<true|false> ] [ build=<true|false> ] [ compil

The command has the following true/false flags:

- `build` - re-build containers for training & serving code (limit by setting targets=training to build only one of the containers)
- `build` - re-build containers for training & serving code (limit by setting images=training to build only one of the containers)
- `compile` - re-compile the pipeline to YAML
- `wait` - run the pipeline (a-)sync

**Shortcuts:** Use these commands which support the same options as `run` to run the training or prediction pipeline:

```bash
make training
make prediction
```

## Test

Unit tests are performed using [pytest](https://docs.pytest.org).
Expand Down
1 change: 1 addition & 0 deletions cloudbuild/e2e-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ steps:
- ENABLE_PIPELINE_CACHING=${_TEST_ENABLE_PIPELINE_CACHING}
- VERTEX_LOCATION=${_TEST_VERTEX_LOCATION}
- VERTEX_PROJECT_ID=${_TEST_VERTEX_PROJECT_ID}
- BQ_LOCATION=${_TEST_BQ_LOCATION}
- VERTEX_SA_EMAIL=${_TEST_VERTEX_SA_EMAIL}
- VERTEX_CMEK_IDENTIFIER=${_TEST_VERTEX_CMEK_IDENTIFIER}
- VERTEX_NETWORK=${_TEST_VERTEX_NETWORK}
Expand Down
4 changes: 2 additions & 2 deletions cloudbuild/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ steps:
cd pipelines && \
poetry run python -m pipelines.utils.upload_pipeline \
--dest=https://${_VERTEX_LOCATION}-kfp.pkg.dev/$$proj/vertex-pipelines \
--yaml=src/pipelines/training/pipeline.yaml \
--yaml=src/pipelines/training.yaml \
--tag=latest \
--tag=${TAG_NAME} && \
poetry run python -m pipelines.utils.upload_pipeline \
--dest=https://${_VERTEX_LOCATION}-kfp.pkg.dev/$$proj/vertex-pipelines \
--yaml=src/pipelines/prediction/pipeline.yaml \
--yaml=src/pipelines/prediction.yaml \
--tag=latest \
--tag=${TAG_NAME}; \
done
Expand Down
97 changes: 24 additions & 73 deletions components/src/components/extract_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,81 +11,32 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kfp.dsl import Dataset, Output, ContainerSpec, container_component

from kfp.dsl import Dataset, Output, component


@component(
base_image="python:3.9",
packages_to_install=["google-cloud-bigquery==2.30.0"],
)
@container_component
def extract_table(
bq_client_project_id: str,
source_project_id: str,
dataset_id: str,
table_name: str,
dataset: Output[Dataset],
destination_gcs_uri: str = None,
dataset_location: str = "EU",
extract_job_config: dict = None,
skip_if_exists: bool = True,
project: str,
location: str,
table: str,
data: Output[Dataset],
destination_format: str = "CSV",
compression: str = "NONE",
field_delimiter: str = ",",
print_header: str = "true",
):
"""
Extract BQ table in GCS.
Args:
bq_client_project_id (str): project id that will be used by the bq client
source_project_id (str): project id from where BQ table will be extracted
dataset_id (str): dataset id from where BQ table will be extracted
table_name (str): table name (without project id and dataset id)
dataset (Output[Dataset]): output dataset artifact generated by the operation,
this parameter will be passed automatically by the orchestrator
dataset_location (str): bq dataset location. Defaults to "EU".
extract_job_config (dict): dict containing optional parameters
required by the bq extract operation. Defaults to None.
See available parameters here
https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.ExtractJobConfig.html # noqa
destination_gcs_uri (str): GCS URI to use for saving query results (optional).
Returns:
Outputs (NamedTuple (str, list)): Output dataset directory and its GCS uri.
"""

import logging
from pathlib import Path
from google.cloud.exceptions import GoogleCloudError
from google.cloud import bigquery

# set uri of output dataset if destination_gcs_uri is provided
if destination_gcs_uri:
dataset.uri = destination_gcs_uri

logging.info(f"Checking if destination exists: {dataset.path}")
if Path(dataset.path).exists() and skip_if_exists:
logging.info("Destination already exists, skipping table extraction!")
return

full_table_id = f"{source_project_id}.{dataset_id}.{table_name}"
table = bigquery.table.Table(table_ref=full_table_id)

if extract_job_config is None:
extract_job_config = {}
job_config = bigquery.job.ExtractJobConfig(**extract_job_config)

logging.info(f"Extract table {table} to {dataset.uri}")
client = bigquery.client.Client(
project=bq_client_project_id, location=dataset_location
return ContainerSpec(
image="google/cloud-sdk:alpine",
command=["bq"],
args=[
"extract",
f"--project_id={project}",
f"--location={location}",
f"--destination_format={destination_format}",
f"--compression={compression}",
f"--field_delimiter={field_delimiter}",
f"--print_header={print_header}",
table,
data.uri,
],
)
extract_job = client.extract_table(
table,
dataset.uri,
job_config=job_config,
)

try:
result = extract_job.result()
logging.info("Table extracted, result: {}".format(result))
except GoogleCloudError as e:
logging.error(e)
logging.error(extract_job.error_result)
logging.error(extract_job.errors)
raise e
27 changes: 9 additions & 18 deletions components/src/components/lookup_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,19 @@
)
def lookup_model(
model_name: str,
project_location: str,
project_id: str,
location: str,
project: str,
model: Output[Model],
order_models_by: str = "create_time desc",
fail_on_model_not_found: bool = False,
) -> NamedTuple("Outputs", [("model_resource_name", str), ("training_dataset", dict)]):
"""
Fetch a model given a model name (display name) and export to GCS.
Args:
model_name (str): display name of the model
project_location (str): location of the Google Cloud project
project_id (str): project id of the Google Cloud project
location (str): location of the Google Cloud project
project (str): project id of the Google Cloud project
model (Output[Model]): a Vertex AI model
order_models_by (str): if multiple models are found based on the display name,
use a filter clause:
A comma-separated list of fields to order by, sorted in
ascending order. Use "desc" after a field name for descending.
Supported fields: `display_name`, `create_time`, `update_time`
Defaults to "create_time desc".
fail_on_model_not_found (bool): if set to True, raise runtime error if
model is not found
Expand All @@ -60,25 +53,23 @@ def lookup_model(
logging.info(f"listing models with display name {model_name}")
models = Model.list(
filter=f'display_name="{model_name}"',
order_by=order_models_by,
location=project_location,
project=project_id,
location=location,
project=project,
)
logging.info(f"found {len(models)} models")
logging.info(f"found {len(models)} model(s)")

training_dataset = {}
model_resource_name = ""
if len(models) == 0:
logging.error(
f"No model found with name {model_name}"
+ f"(project: {project_id} location: {project_location})"
f"No model found with name {model_name} "
+ f"(project: {project} location: {location})"
)
if fail_on_model_not_found:
raise RuntimeError(f"Failed as model was not found")
elif len(models) == 1:
target_model = models[0]
model_resource_name = target_model.resource_name
logging.info(f"choosing model by order ({order_models_by})")
logging.info(f"model display name: {target_model.display_name}")
logging.info(f"model resource name: {target_model.resource_name}")
logging.info(f"model uri: {target_model.uri}")
Expand Down
Loading

0 comments on commit 4c80faf

Please sign in to comment.