From 10a69de3a9cd13f836ce0fe04a58edc1449f9885 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Thu, 29 Aug 2024 04:23:09 +0000 Subject: [PATCH 01/18] Deprecate KFP v1 support --- build/BUILD | 1 - tfx/dependencies.py | 33 +- .../penguin/penguin_pipeline_kubeflow.py | 49 +- .../penguin/penguin_pipeline_kubeflow_test.py | 28 +- .../templates/taxi/kubeflow_runner.py | 100 ---- tfx/orchestration/data_types.py | 2 +- tfx/orchestration/kubeflow/base_component.py | 166 ------ .../kubeflow/base_component_test.py | 213 -------- .../kubeflow/kubeflow_dag_runner.py | 471 ------------------ .../kubeflow/kubeflow_dag_runner_test.py | 329 ------------ tfx/orchestration/kubeflow/proto/BUILD | 25 - .../kubeflow/proto/kubeflow.proto | 52 -- tfx/orchestration/pipeline.py | 2 +- .../handler/kubeflow_dag_runner_patcher.py | 86 ---- .../kubeflow_dag_runner_patcher_test.py | 71 --- tfx/v1/orchestration/experimental/__init__.py | 17 - tfx/v1/proto/__init__.py | 4 +- 17 files changed, 54 insertions(+), 1595 deletions(-) delete mode 100644 tfx/experimental/templates/taxi/kubeflow_runner.py delete mode 100644 tfx/orchestration/kubeflow/base_component.py delete mode 100644 tfx/orchestration/kubeflow/base_component_test.py delete mode 100644 tfx/orchestration/kubeflow/kubeflow_dag_runner.py delete mode 100644 tfx/orchestration/kubeflow/kubeflow_dag_runner_test.py delete mode 100644 tfx/orchestration/kubeflow/proto/BUILD delete mode 100644 tfx/orchestration/kubeflow/proto/kubeflow.proto delete mode 100644 tfx/tools/cli/handler/kubeflow_dag_runner_patcher.py delete mode 100644 tfx/tools/cli/handler/kubeflow_dag_runner_patcher_test.py diff --git a/build/BUILD b/build/BUILD index 4d596ef5b2..60607e96b3 100644 --- a/build/BUILD +++ b/build/BUILD @@ -25,7 +25,6 @@ sh_binary( "//tfx/extensions/experimental/kfp_compatibility/proto:kfp_component_spec_pb2.py", "//tfx/extensions/google_cloud_big_query/experimental/elwc_example_gen/proto:elwc_config_pb2.py", "//tfx/orchestration/experimental/core:component_generated_alert_pb2.py", - "//tfx/orchestration/kubeflow/proto:kubeflow_pb2.py", "//tfx/proto:bulk_inferrer_pb2.py", "//tfx/proto:distribution_validator_pb2.py", "//tfx/proto:evaluator_pb2.py", diff --git a/tfx/dependencies.py b/tfx/dependencies.py index 24b07a24cf..5fdb30e0ca 100644 --- a/tfx/dependencies.py +++ b/tfx/dependencies.py @@ -69,9 +69,8 @@ def make_pipeline_sdk_required_install_packages(): # TODO(b/176812386): Deprecate usage of jinja2 for placeholders. 'jinja2>=2.7.3,<4', # typing-extensions allows consistent & future-proof interface for typing. - # Since kfp<2 uses typing-extensions<4, lower bound is the latest 3.x, and - # upper bound is <5 as the semver started from 4.0 according to their doc. - 'typing-extensions>=3.10.0.2,<5', + # Upper bound is <5 as the semver started from 4.0 according to their doc. + 'typing-extensions<5', ] @@ -87,7 +86,7 @@ def make_required_install_packages(): 'google-cloud-bigquery>=3,<4', 'grpcio>=1.28.1,<2', 'keras-tuner>=1.0.4,<2,!=1.4.0,!=1.4.1', - 'kubernetes>=10.0.1,<13', + 'kubernetes>=10.0.1,<27', 'numpy>=1.16,<2', 'pyarrow>=10,<11', # TODO: b/358471141 - Orjson 3.10.7 breaks TFX OSS tests. @@ -146,9 +145,8 @@ def make_extra_packages_airflow(): def make_extra_packages_kfp(): """Prepare extra packages needed for Kubeflow Pipelines orchestrator.""" return [ - # TODO(b/304892416): Migrate from KFP SDK v1 to v2. - 'kfp>=1.8.14,<2', - 'kfp-pipeline-spec>0.1.13,<0.2', + 'kfp>=2', + 'kfp-pipeline-spec>=0.2.2', ] @@ -156,17 +154,20 @@ def make_extra_packages_test(): """Prepare extra packages needed for running unit tests.""" # Note: It is okay to pin packages to exact versions in this list to minimize # conflicts. - return make_extra_packages_airflow() + make_extra_packages_kfp() + [ - 'pytest>=5,<7', - ] + return ( + make_extra_packages_airflow() + + make_extra_packages_kfp() + + [ + 'pytest>=5,<7', + ] + ) def make_extra_packages_docker_image(): # Packages needed for tfx docker image. return [ - # TODO(b/304892416): Migrate from KFP SDK v1 to v2. - 'kfp>=1.8.14,<2', - 'kfp-pipeline-spec>0.1.13,<0.2', + 'kfp>=2', + 'kfp-pipeline-spec>=0.3.0', 'mmh>=2.2,<3', 'python-snappy>=0.5,<0.6', # Required for tfx/examples/penguin/penguin_utils_cloud_tuner.py @@ -194,10 +195,12 @@ def make_extra_packages_tf_ranking(): # Packages needed for tf-ranking which is used in tfx/examples/ranking. return [ 'tensorflow-ranking>=0.5,<0.6', - 'struct2tensor' + select_constraint( + 'struct2tensor' + + select_constraint( default='>=0.46.0,<0.47.0', nightly='>=0.47.0.dev', - git_master='@git+https://github.com/google/struct2tensor@master'), + git_master='@git+https://github.com/google/struct2tensor@master', + ), ] diff --git a/tfx/examples/penguin/penguin_pipeline_kubeflow.py b/tfx/examples/penguin/penguin_pipeline_kubeflow.py index 26c82cc02e..8c0e2e46ec 100644 --- a/tfx/examples/penguin/penguin_pipeline_kubeflow.py +++ b/tfx/examples/penguin/penguin_pipeline_kubeflow.py @@ -501,33 +501,28 @@ def main(): else: beam_pipeline_args = _beam_pipeline_args_by_runner['DirectRunner'] - if use_vertex: - dag_runner = tfx.orchestration.experimental.KubeflowV2DagRunner( - config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(), - output_filename=_pipeline_definition_file) - else: - dag_runner = tfx.orchestration.experimental.KubeflowDagRunner( - config=tfx.orchestration.experimental.KubeflowDagRunnerConfig( - kubeflow_metadata_config=tfx.orchestration.experimental - .get_default_kubeflow_metadata_config())) - - dag_runner.run( - create_pipeline( - pipeline_name=_pipeline_name, - pipeline_root=_pipeline_root, - data_root=_data_root, - module_file=_module_file, - enable_tuning=False, - enable_cache=True, - user_provided_schema_path=_user_provided_schema, - ai_platform_training_args=_ai_platform_training_args, - ai_platform_serving_args=_ai_platform_serving_args, - beam_pipeline_args=beam_pipeline_args, - use_cloud_component=use_cloud_component, - use_aip=use_aip, - use_vertex=use_vertex, - serving_model_dir=_serving_model_dir, - )) + dag_runner = tfx.orchestration.experimental.KubeflowV2DagRunner( + config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(), + output_filename=_pipeline_definition_file, + ) + dag_runner.run( + create_pipeline( + pipeline_name=_pipeline_name, + pipeline_root=_pipeline_root, + data_root=_data_root, + module_file=_module_file, + enable_tuning=False, + enable_cache=True, + user_provided_schema_path=_user_provided_schema, + ai_platform_training_args=_ai_platform_training_args, + ai_platform_serving_args=_ai_platform_serving_args, + beam_pipeline_args=beam_pipeline_args, + use_cloud_component=use_cloud_component, + use_aip=use_aip, + use_vertex=use_vertex, + serving_model_dir=_serving_model_dir, + ) + ) # To compile the pipeline: diff --git a/tfx/examples/penguin/penguin_pipeline_kubeflow_test.py b/tfx/examples/penguin/penguin_pipeline_kubeflow_test.py index d36178b9b5..f267749cfe 100644 --- a/tfx/examples/penguin/penguin_pipeline_kubeflow_test.py +++ b/tfx/examples/penguin/penguin_pipeline_kubeflow_test.py @@ -64,24 +64,16 @@ def testPenguinPipelineConstructionAndDefinitionFileExists( serving_model_dir=penguin_pipeline_kubeflow._serving_model_dir) self.assertLen(kubeflow_pipeline.components, 9) - if use_vertex: - v2_dag_runner = orchestration.experimental.KubeflowV2DagRunner( - config=orchestration.experimental.KubeflowV2DagRunnerConfig(), - output_dir=self.tmp_dir, - output_filename=penguin_pipeline_kubeflow._pipeline_definition_file) - v2_dag_runner.run(kubeflow_pipeline) - file_path = os.path.join( - self.tmp_dir, penguin_pipeline_kubeflow._pipeline_definition_file) - self.assertTrue(fileio.exists(file_path)) - else: - v1_dag_runner = orchestration.experimental.KubeflowDagRunner( - config=orchestration.experimental.KubeflowDagRunnerConfig( - kubeflow_metadata_config=orchestration.experimental - .get_default_kubeflow_metadata_config())) - v1_dag_runner.run(kubeflow_pipeline) - file_path = os.path.join(self.tmp_dir, 'penguin-kubeflow.tar.gz') - self.assertTrue(fileio.exists(file_path)) - + v2_dag_runner = orchestration.experimental.KubeflowV2DagRunner( + config=orchestration.experimental.KubeflowV2DagRunnerConfig(), + output_dir=self.tmp_dir, + output_filename=penguin_pipeline_kubeflow._pipeline_definition_file, + ) + v2_dag_runner.run(kubeflow_pipeline) + file_path = os.path.join( + self.tmp_dir, penguin_pipeline_kubeflow._pipeline_definition_file + ) + self.assertTrue(fileio.exists(file_path)) if __name__ == '__main__': tf.test.main() diff --git a/tfx/experimental/templates/taxi/kubeflow_runner.py b/tfx/experimental/templates/taxi/kubeflow_runner.py deleted file mode 100644 index 74d873f0f7..0000000000 --- a/tfx/experimental/templates/taxi/kubeflow_runner.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright 2020 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Define KubeflowDagRunner to run the pipeline using Kubeflow.""" - -import os -from absl import logging - -from tfx import v1 as tfx -from tfx.experimental.templates.taxi.pipeline import configs -from tfx.experimental.templates.taxi.pipeline import pipeline - -# TFX pipeline produces many output files and metadata. All output data will be -# stored under this OUTPUT_DIR. -OUTPUT_DIR = os.path.join('gs://', configs.GCS_BUCKET_NAME) - -# TFX produces two types of outputs, files and metadata. -# - Files will be created under PIPELINE_ROOT directory. -PIPELINE_ROOT = os.path.join(OUTPUT_DIR, 'tfx_pipeline_output', - configs.PIPELINE_NAME) - -# The last component of the pipeline, "Pusher" will produce serving model under -# SERVING_MODEL_DIR. -SERVING_MODEL_DIR = os.path.join(PIPELINE_ROOT, 'serving_model') - -# Specifies data file directory. DATA_PATH should be a directory containing CSV -# files for CsvExampleGen in this example. By default, data files are in the -# GCS path: `gs://{GCS_BUCKET_NAME}/tfx-template/data/`. Using a GCS path is -# recommended for KFP. -# -# One can optionally choose to use a data source located inside of the container -# built by the template, by specifying -# DATA_PATH = 'data'. Note that Dataflow does not support use container as a -# dependency currently, so this means CsvExampleGen cannot be used with Dataflow -# (step 8 in the template notebook). - -DATA_PATH = 'gs://{}/tfx-template/data/taxi/'.format(configs.GCS_BUCKET_NAME) - - -def run(): - """Define a kubeflow pipeline.""" - - # Metadata config. The defaults works work with the installation of - # KF Pipelines using Kubeflow. If installing KF Pipelines using the - # lightweight deployment option, you may need to override the defaults. - # If you use Kubeflow, metadata will be written to MySQL database inside - # Kubeflow cluster. - metadata_config = tfx.orchestration.experimental.get_default_kubeflow_metadata_config( - ) - - runner_config = tfx.orchestration.experimental.KubeflowDagRunnerConfig( - kubeflow_metadata_config=metadata_config, - tfx_image=configs.PIPELINE_IMAGE) - pod_labels = { - 'add-pod-env': 'true', - tfx.orchestration.experimental.LABEL_KFP_SDK_ENV: 'tfx-template' - } - tfx.orchestration.experimental.KubeflowDagRunner( - config=runner_config, pod_labels_to_attach=pod_labels - ).run( - pipeline.create_pipeline( - pipeline_name=configs.PIPELINE_NAME, - pipeline_root=PIPELINE_ROOT, - data_path=DATA_PATH, - # TODO(step 7): (Optional) Uncomment below to use BigQueryExampleGen. - # query=configs.BIG_QUERY_QUERY, - # TODO(step 5): (Optional) Set the path of the customized schema. - # schema_path=generated_schema_path, - preprocessing_fn=configs.PREPROCESSING_FN, - run_fn=configs.RUN_FN, - train_args=tfx.proto.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS), - eval_args=tfx.proto.EvalArgs(num_steps=configs.EVAL_NUM_STEPS), - eval_accuracy_threshold=configs.EVAL_ACCURACY_THRESHOLD, - serving_model_dir=SERVING_MODEL_DIR, - # TODO(step 7): (Optional) Uncomment below to use provide GCP related - # config for BigQuery with Beam DirectRunner. - # beam_pipeline_args=configs - # .BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS, - # TODO(step 8): (Optional) Uncomment below to use Dataflow. - # beam_pipeline_args=configs.DATAFLOW_BEAM_PIPELINE_ARGS, - # TODO(step 9): (Optional) Uncomment below to use Cloud AI Platform. - # ai_platform_training_args=configs.GCP_AI_PLATFORM_TRAINING_ARGS, - # TODO(step 9): (Optional) Uncomment below to use Cloud AI Platform. - # ai_platform_serving_args=configs.GCP_AI_PLATFORM_SERVING_ARGS, - )) - - -if __name__ == '__main__': - logging.set_verbosity(logging.INFO) - run() diff --git a/tfx/orchestration/data_types.py b/tfx/orchestration/data_types.py index aa4bb12c4b..10e88ec696 100644 --- a/tfx/orchestration/data_types.py +++ b/tfx/orchestration/data_types.py @@ -145,7 +145,7 @@ def component_run_context_name(self) -> str: class RuntimeParameter(json_utils.Jsonable): """Runtime parameter. - Currently only supported on KubeflowDagRunner. + Currently only supported on KubeflowV2DagRunner. For protos, use text type RuntimeParameter, which holds the proto json string, e.g., `'{"num_steps": 5}'` for TrainArgs proto. diff --git a/tfx/orchestration/kubeflow/base_component.py b/tfx/orchestration/kubeflow/base_component.py deleted file mode 100644 index 11eeb34a87..0000000000 --- a/tfx/orchestration/kubeflow/base_component.py +++ /dev/null @@ -1,166 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Kubeflow Pipelines based implementation of TFX components. - -These components are lightweight wrappers around the KFP DSL's ContainerOp, -and ensure that the container gets called with the right set of input -arguments. It also ensures that each component exports named output -attributes that are consistent with those provided by the native TFX -components, thus ensuring that both types of pipeline definitions are -compatible. -Note: This requires Kubeflow Pipelines SDK to be installed. -""" - -from typing import Dict, List, Set - -from absl import logging -from kfp import dsl -from kubernetes import client as k8s_client -from tfx.dsl.components.base import base_node as tfx_base_node -from tfx.orchestration import data_types -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.kubeflow.proto import kubeflow_pb2 -from tfx.proto.orchestration import pipeline_pb2 - -from google.protobuf import json_format - -# TODO(b/166202742): Consolidate container entrypoint with TFX image's default. -_COMMAND = ['python', '-m', 'tfx.orchestration.kubeflow.container_entrypoint'] - -_WORKFLOW_ID_KEY = 'WORKFLOW_ID' - - -def _encode_runtime_parameter(param: data_types.RuntimeParameter) -> str: - """Encode a runtime parameter into a placeholder for value substitution.""" - if param.ptype is int: - type_enum = pipeline_pb2.RuntimeParameter.INT - elif param.ptype is float: - type_enum = pipeline_pb2.RuntimeParameter.DOUBLE - else: - type_enum = pipeline_pb2.RuntimeParameter.STRING - type_str = pipeline_pb2.RuntimeParameter.Type.Name(type_enum) - return f'{param.name}={type_str}:{str(dsl.PipelineParam(name=param.name))}' - - -def _replace_placeholder(component: tfx_base_node.BaseNode) -> None: - """Replaces the RuntimeParameter placeholders with kfp.dsl.PipelineParam.""" - keys = list(component.exec_properties.keys()) - for key in keys: - exec_property = component.exec_properties[key] - if not isinstance(exec_property, data_types.RuntimeParameter): - continue - component.exec_properties[key] = str( - dsl.PipelineParam(name=exec_property.name)) - - -# TODO(hongyes): renaming the name to KubeflowComponent. -class BaseComponent: - """Base component for all Kubeflow pipelines TFX components. - - Returns a wrapper around a KFP DSL ContainerOp class, and adds named output - attributes that match the output names for the corresponding native TFX - components. - """ - - def __init__(self, - component: tfx_base_node.BaseNode, - depends_on: Set[dsl.ContainerOp], - pipeline: tfx_pipeline.Pipeline, - pipeline_root: dsl.PipelineParam, - tfx_image: str, - kubeflow_metadata_config: kubeflow_pb2.KubeflowMetadataConfig, - tfx_ir: pipeline_pb2.Pipeline, - pod_labels_to_attach: Dict[str, str], - runtime_parameters: List[data_types.RuntimeParameter], - metadata_ui_path: str = '/mlpipeline-ui-metadata.json'): - """Creates a new Kubeflow-based component. - - This class essentially wraps a dsl.ContainerOp construct in Kubeflow - Pipelines. - - Args: - component: The logical TFX component to wrap. - depends_on: The set of upstream KFP ContainerOp components that this - component will depend on. - pipeline: The logical TFX pipeline to which this component belongs. - pipeline_root: The pipeline root specified, as a dsl.PipelineParam - tfx_image: The container image to use for this component. - kubeflow_metadata_config: Configuration settings for connecting to the - MLMD store in a Kubeflow cluster. - tfx_ir: The TFX intermedia representation of the pipeline. - pod_labels_to_attach: Dict of pod labels to attach to the GKE pod. - runtime_parameters: Runtime parameters of the pipeline. - metadata_ui_path: File location for metadata-ui-metadata.json file. - """ - - _replace_placeholder(component) - - arguments = [ - '--pipeline_root', - pipeline_root, - '--kubeflow_metadata_config', - json_format.MessageToJson( - message=kubeflow_metadata_config, preserving_proto_field_name=True), - '--node_id', - component.id, - # TODO(b/182220464): write IR to pipeline_root and let - # container_entrypoint.py read it back to avoid future issue that IR - # exeeds the flag size limit. - '--tfx_ir', - json_format.MessageToJson(tfx_ir), - '--metadata_ui_path', - metadata_ui_path, - ] - - for param in runtime_parameters: - arguments.append('--runtime_parameter') - arguments.append(_encode_runtime_parameter(param)) - - self.container_op = dsl.ContainerOp( - name=component.id, - command=_COMMAND, - image=tfx_image, - arguments=arguments, - output_artifact_paths={ - 'mlpipeline-ui-metadata': metadata_ui_path, - }, - ) - - logging.info('Adding upstream dependencies for component %s', - self.container_op.name) - for op in depends_on: - logging.info(' -> Component: %s', op.name) - self.container_op.after(op) - - # TODO(b/140172100): Document the use of additional_pipeline_args. - if _WORKFLOW_ID_KEY in pipeline.additional_pipeline_args: - # Allow overriding pipeline's run_id externally, primarily for testing. - self.container_op.container.add_env_variable( - k8s_client.V1EnvVar( - name=_WORKFLOW_ID_KEY, - value=pipeline.additional_pipeline_args[_WORKFLOW_ID_KEY])) - else: - # Add the Argo workflow ID to the container's environment variable so it - # can be used to uniquely place pipeline outputs under the pipeline_root. - field_path = "metadata.labels['workflows.argoproj.io/workflow']" - self.container_op.container.add_env_variable( - k8s_client.V1EnvVar( - name=_WORKFLOW_ID_KEY, - value_from=k8s_client.V1EnvVarSource( - field_ref=k8s_client.V1ObjectFieldSelector( - field_path=field_path)))) - - if pod_labels_to_attach: - for k, v in pod_labels_to_attach.items(): - self.container_op.add_pod_label(k, v) diff --git a/tfx/orchestration/kubeflow/base_component_test.py b/tfx/orchestration/kubeflow/base_component_test.py deleted file mode 100644 index 5d4c1c54fc..0000000000 --- a/tfx/orchestration/kubeflow/base_component_test.py +++ /dev/null @@ -1,213 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.orchestration.kubeflow.base_component.""" - -import json - -from absl import logging -from kfp import dsl -import tensorflow as tf -from tfx.components.example_gen.csv_example_gen import component as csv_example_gen_component -from tfx.components.statistics_gen import component as statistics_gen_component -from tfx.orchestration import data_types -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.kubeflow import base_component -from tfx.orchestration.kubeflow.proto import kubeflow_pb2 -from tfx.proto.orchestration import pipeline_pb2 - -from ml_metadata.proto import metadata_store_pb2 - - -class BaseComponentTest(tf.test.TestCase): - maxDiff = None # pylint: disable=invalid-name - _test_pipeline_name = 'test_pipeline' - - def setUp(self): - super().setUp() - example_gen = csv_example_gen_component.CsvExampleGen( - input_base='data_input') - statistics_gen = statistics_gen_component.StatisticsGen( - examples=example_gen.outputs['examples']).with_id('foo') - - pipeline = tfx_pipeline.Pipeline( - pipeline_name=self._test_pipeline_name, - pipeline_root='test_pipeline_root', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[example_gen, statistics_gen], - ) - - test_pipeline_root = dsl.PipelineParam(name='pipeline-root-param') - - self._metadata_config = kubeflow_pb2.KubeflowMetadataConfig() - self._metadata_config.mysql_db_service_host.environment_variable = 'MYSQL_SERVICE_HOST' - self._tfx_ir = pipeline_pb2.Pipeline() - with dsl.Pipeline('test_pipeline'): - self.component = base_component.BaseComponent( - component=statistics_gen, - depends_on=set(), - pipeline=pipeline, - pipeline_root=test_pipeline_root, - tfx_image='container_image', - kubeflow_metadata_config=self._metadata_config, - tfx_ir=self._tfx_ir, - pod_labels_to_attach={}, - runtime_parameters=[] - ) - self.tfx_component = statistics_gen - - def testContainerOpArguments(self): - expected_args = [ - '--pipeline_root', - '{{pipelineparam:op=;name=pipeline-root-param}}', - '--kubeflow_metadata_config', - '{\n' - ' "mysql_db_service_host": {\n' - ' "environment_variable": "MYSQL_SERVICE_HOST"\n' - ' }\n' - '}', - '--node_id', - 'foo', - ] - try: - self.assertEqual( - self.component.container_op.arguments[:len(expected_args)], - expected_args) - - except AssertionError: - # Print out full arguments for debugging. - logging.error('==== BEGIN CONTAINER OP ARGUMENT DUMP ====') - logging.error(json.dumps(self.component.container_op.arguments, indent=2)) - logging.error('==== END CONTAINER OP ARGUMENT DUMP ====') - raise - - def testContainerOpName(self): - self.assertEqual('foo', self.tfx_component.id) - self.assertEqual('foo', self.component.container_op.name) - - -class BaseComponentWithPipelineParamTest(tf.test.TestCase): - """Test the usage of RuntimeParameter.""" - maxDiff = None # pylint: disable=invalid-name - _test_pipeline_name = 'test_pipeline' - - def setUp(self): - super().setUp() - - example_gen_output_config = data_types.RuntimeParameter( - name='example-gen-output-config', ptype=str) - - example_gen = csv_example_gen_component.CsvExampleGen( - input_base='data_root', output_config=example_gen_output_config) - statistics_gen = statistics_gen_component.StatisticsGen( - examples=example_gen.outputs['examples']).with_id('foo') - - test_pipeline_root = dsl.PipelineParam(name='pipeline-root-param') - pipeline = tfx_pipeline.Pipeline( - pipeline_name=self._test_pipeline_name, - pipeline_root='test_pipeline_root', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[example_gen, statistics_gen], - ) - - self._metadata_config = kubeflow_pb2.KubeflowMetadataConfig() - self._metadata_config.mysql_db_service_host.environment_variable = 'MYSQL_SERVICE_HOST' - self._tfx_ir = pipeline_pb2.Pipeline() - with dsl.Pipeline('test_pipeline'): - self.example_gen = base_component.BaseComponent( - component=example_gen, - depends_on=set(), - pipeline=pipeline, - pipeline_root=test_pipeline_root, - tfx_image='container_image', - kubeflow_metadata_config=self._metadata_config, - tfx_ir=self._tfx_ir, - pod_labels_to_attach={}, - runtime_parameters=[example_gen_output_config]) - self.statistics_gen = base_component.BaseComponent( - component=statistics_gen, - depends_on=set(), - pipeline=pipeline, - pipeline_root=test_pipeline_root, - tfx_image='container_image', - kubeflow_metadata_config=self._metadata_config, - tfx_ir=self._tfx_ir, - pod_labels_to_attach={}, - runtime_parameters=[] - ) - - self.tfx_example_gen = example_gen - self.tfx_statistics_gen = statistics_gen - - def testContainerOpArguments(self): - statistics_gen_expected_args = [ - '--pipeline_root', - '{{pipelineparam:op=;name=pipeline-root-param}}', - '--kubeflow_metadata_config', - '{\n' - ' "mysql_db_service_host": {\n' - ' "environment_variable": "MYSQL_SERVICE_HOST"\n' - ' }\n' - '}', - '--node_id', - 'foo', - '--tfx_ir', - '{}', - '--metadata_ui_path', - '/mlpipeline-ui-metadata.json', - ] - example_gen_expected_args = [ - '--pipeline_root', - '{{pipelineparam:op=;name=pipeline-root-param}}', - '--kubeflow_metadata_config', - '{\n' - ' "mysql_db_service_host": {\n' - ' "environment_variable": "MYSQL_SERVICE_HOST"\n' - ' }\n' - '}', - '--node_id', - 'CsvExampleGen', - '--tfx_ir', - '{}', - '--metadata_ui_path', - '/mlpipeline-ui-metadata.json', - '--runtime_parameter', - 'example-gen-output-config=STRING:{{pipelineparam:op=;name=example-gen-output-config}}', - ] - try: - self.assertEqual( - self.statistics_gen.container_op - .arguments, - statistics_gen_expected_args) - self.assertEqual( - self.example_gen.container_op.arguments, - example_gen_expected_args) - except AssertionError: - # Print out full arguments for debugging. - logging.error('==== BEGIN STATISTICSGEN CONTAINER OP ARGUMENT DUMP ====') - logging.error( - json.dumps(self.statistics_gen.container_op.arguments, indent=2)) - logging.error('==== END STATISTICSGEN CONTAINER OP ARGUMENT DUMP ====') - logging.error('==== BEGIN EXAMPLEGEN CONTAINER OP ARGUMENT DUMP ====') - logging.error( - json.dumps(self.example_gen.container_op.arguments, indent=2)) - logging.error('==== END EXAMPLEGEN CONTAINER OP ARGUMENT DUMP ====') - raise - - def testContainerOpName(self): - self.assertEqual('foo', self.tfx_statistics_gen.id) - self.assertEqual('foo', self.statistics_gen.container_op.name) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/orchestration/kubeflow/kubeflow_dag_runner.py b/tfx/orchestration/kubeflow/kubeflow_dag_runner.py deleted file mode 100644 index 1d320aeaf5..0000000000 --- a/tfx/orchestration/kubeflow/kubeflow_dag_runner.py +++ /dev/null @@ -1,471 +0,0 @@ -# Copyright 2019 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""TFX runner for Kubeflow.""" - -import collections -import copy -import os -from typing import Any, Callable, Dict, List, Optional, Type, cast, MutableMapping -from absl import logging - -from kfp import compiler -from kfp import dsl -from kfp import gcp -from kubernetes import client as k8s_client -from tfx import version -from tfx.dsl.compiler import compiler as tfx_compiler -from tfx.dsl.components.base import base_component as tfx_base_component -from tfx.dsl.components.base import base_node -from tfx.orchestration import data_types -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration import tfx_runner -from tfx.orchestration.config import pipeline_config -from tfx.orchestration.kubeflow import base_component -from tfx.orchestration.kubeflow import utils -from tfx.orchestration.kubeflow.proto import kubeflow_pb2 -from tfx.orchestration.launcher import base_component_launcher -from tfx.orchestration.launcher import in_process_component_launcher -from tfx.orchestration.launcher import kubernetes_component_launcher -from tfx.proto.orchestration import pipeline_pb2 -from tfx.utils import telemetry_utils - - -# OpFunc represents the type of a function that takes as input a -# dsl.ContainerOp and returns the same object. Common operations such as adding -# k8s secrets, mounting volumes, specifying the use of TPUs and so on can be -# specified as an OpFunc. -# See example usage here: -# https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/gcp.py -OpFunc = Callable[[dsl.ContainerOp], dsl.ContainerOp] - -# Default secret name for GCP credentials. This secret is installed as part of -# a typical Kubeflow installation when the component is GKE. -_KUBEFLOW_GCP_SECRET_NAME = 'user-gcp-sa' - -# Default TFX container image to use in KubeflowDagRunner. -DEFAULT_KUBEFLOW_TFX_IMAGE = 'tensorflow/tfx:%s' % (version.__version__,) - - -def _mount_config_map_op(config_map_name: str) -> OpFunc: - """Mounts all key-value pairs found in the named Kubernetes ConfigMap. - - All key-value pairs in the ConfigMap are mounted as environment variables. - - Args: - config_map_name: The name of the ConfigMap resource. - - Returns: - An OpFunc for mounting the ConfigMap. - """ - - def mount_config_map(container_op: dsl.ContainerOp): - config_map_ref = k8s_client.V1ConfigMapEnvSource( - name=config_map_name, optional=True) - container_op.container.add_env_from( - k8s_client.V1EnvFromSource(config_map_ref=config_map_ref)) - - return mount_config_map - - -def _mount_secret_op(secret_name: str) -> OpFunc: - """Mounts all key-value pairs found in the named Kubernetes Secret. - - All key-value pairs in the Secret are mounted as environment variables. - - Args: - secret_name: The name of the Secret resource. - - Returns: - An OpFunc for mounting the Secret. - """ - - def mount_secret(container_op: dsl.ContainerOp): - secret_ref = k8s_client.V1ConfigMapEnvSource( - name=secret_name, optional=True) - - container_op.container.add_env_from( - k8s_client.V1EnvFromSource(secret_ref=secret_ref)) - - return mount_secret - - -def get_default_pipeline_operator_funcs( - use_gcp_sa: bool = False) -> List[OpFunc]: - """Returns a default list of pipeline operator functions. - - Args: - use_gcp_sa: If true, mount a GCP service account secret to each pod, with - the name _KUBEFLOW_GCP_SECRET_NAME. - - Returns: - A list of functions with type OpFunc. - """ - # Enables authentication for GCP services if needed. - gcp_secret_op = gcp.use_gcp_secret(_KUBEFLOW_GCP_SECRET_NAME) - - # Mounts configmap containing Metadata gRPC server configuration. - mount_config_map_op = _mount_config_map_op('metadata-grpc-configmap') - if use_gcp_sa: - return [gcp_secret_op, mount_config_map_op] - else: - return [mount_config_map_op] - - -def get_default_kubeflow_metadata_config( -) -> kubeflow_pb2.KubeflowMetadataConfig: - """Returns the default metadata connection config for Kubeflow. - - Returns: - A config proto that will be serialized as JSON and passed to the running - container so the TFX component driver is able to communicate with MLMD in - a Kubeflow cluster. - """ - # The default metadata configuration for a Kubeflow Pipelines cluster is - # codified as a Kubernetes ConfigMap - # https://github.com/kubeflow/pipelines/blob/master/manifests/kustomize/base/metadata/metadata-grpc-configmap.yaml - - config = kubeflow_pb2.KubeflowMetadataConfig() - # The environment variable to use to obtain the Metadata gRPC service host in - # the cluster that is backing Kubeflow Metadata. Note that the key in the - # config map and therefore environment variable used, are lower-cased. - config.grpc_config.grpc_service_host.environment_variable = 'METADATA_GRPC_SERVICE_HOST' - # The environment variable to use to obtain the Metadata grpc service port in - # the cluster that is backing Kubeflow Metadata. - config.grpc_config.grpc_service_port.environment_variable = 'METADATA_GRPC_SERVICE_PORT' - - return config - - -def get_default_pod_labels() -> Dict[str, str]: - """Returns the default pod label dict for Kubeflow.""" - # KFP default transformers add pod env: - # https://github.com/kubeflow/pipelines/blob/0.1.32/sdk/python/kfp/compiler/_default_transformers.py - result = { - 'add-pod-env': 'true', - telemetry_utils.LABEL_KFP_SDK_ENV: 'tfx' - } - return result - - -def get_default_output_filename(pipeline_name: str) -> str: - return pipeline_name + '.tar.gz' - - -class KubeflowDagRunnerConfig(pipeline_config.PipelineConfig): - """Runtime configuration parameters specific to execution on Kubeflow.""" - - def __init__( - self, - pipeline_operator_funcs: Optional[List[OpFunc]] = None, - tfx_image: Optional[str] = None, - kubeflow_metadata_config: Optional[ - kubeflow_pb2.KubeflowMetadataConfig] = None, - # TODO(b/143883035): Figure out the best practice to put the - # SUPPORTED_LAUNCHER_CLASSES - supported_launcher_classes: Optional[List[Type[ - base_component_launcher.BaseComponentLauncher]]] = None, - metadata_ui_path: str = '/mlpipeline-ui-metadata.json', - **kwargs): - """Creates a KubeflowDagRunnerConfig object. - - The user can use pipeline_operator_funcs to apply modifications to - ContainerOps used in the pipeline. For example, to ensure the pipeline - steps mount a GCP secret, and a Persistent Volume, one can create config - object like so: - - from kfp import gcp, onprem - mount_secret_op = gcp.use_secret('my-secret-name) - mount_volume_op = onprem.mount_pvc( - "my-persistent-volume-claim", - "my-volume-name", - "/mnt/volume-mount-path") - - config = KubeflowDagRunnerConfig( - pipeline_operator_funcs=[mount_secret_op, mount_volume_op] - ) - - Args: - pipeline_operator_funcs: A list of ContainerOp modifying functions that - will be applied to every container step in the pipeline. - tfx_image: The TFX container image to use in the pipeline. - kubeflow_metadata_config: Runtime configuration to use to connect to - Kubeflow metadata. - supported_launcher_classes: A list of component launcher classes that are - supported by the current pipeline. List sequence determines the order in - which launchers are chosen for each component being run. - metadata_ui_path: File location for metadata-ui-metadata.json file. - **kwargs: keyword args for PipelineConfig. - """ - supported_launcher_classes = supported_launcher_classes or [ - in_process_component_launcher.InProcessComponentLauncher, - kubernetes_component_launcher.KubernetesComponentLauncher, - ] - super().__init__( - supported_launcher_classes=supported_launcher_classes, **kwargs) - self.pipeline_operator_funcs = ( - pipeline_operator_funcs or get_default_pipeline_operator_funcs()) - self.tfx_image = tfx_image or DEFAULT_KUBEFLOW_TFX_IMAGE - self.kubeflow_metadata_config = ( - kubeflow_metadata_config or get_default_kubeflow_metadata_config()) - self.metadata_ui_path = metadata_ui_path - - -class KubeflowDagRunner(tfx_runner.TfxRunner): - """Kubeflow Pipelines runner. - - Constructs a pipeline definition YAML file based on the TFX logical pipeline. - """ - - def __init__(self, - output_dir: Optional[str] = None, - output_filename: Optional[str] = None, - config: Optional[KubeflowDagRunnerConfig] = None, - pod_labels_to_attach: Optional[Dict[str, str]] = None): - """Initializes KubeflowDagRunner for compiling a Kubeflow Pipeline. - - Args: - output_dir: An optional output directory into which to output the pipeline - definition files. Defaults to the current working directory. - output_filename: An optional output file name for the pipeline definition - file. Defaults to pipeline_name.tar.gz when compiling a TFX pipeline. - Currently supports .tar.gz, .tgz, .zip, .yaml, .yml formats. See - https://github.com/kubeflow/pipelines/blob/181de66cf9fa87bcd0fe9291926790c400140783/sdk/python/kfp/compiler/compiler.py#L851 - for format restriction. - config: An optional KubeflowDagRunnerConfig object to specify runtime - configuration when running the pipeline under Kubeflow. - pod_labels_to_attach: Optional set of pod labels to attach to GKE pod - spinned up for this pipeline. Default to the 3 labels: - 1. add-pod-env: true, - 2. pipeline SDK type, - 3. pipeline unique ID, - where 2 and 3 are instrumentation of usage tracking. - """ - if config and not isinstance(config, KubeflowDagRunnerConfig): - raise TypeError('config must be type of KubeflowDagRunnerConfig.') - super().__init__(config or KubeflowDagRunnerConfig()) - self._config = cast(KubeflowDagRunnerConfig, self._config) - self._output_dir = output_dir or os.getcwd() - self._output_filename = output_filename - self._compiler = compiler.Compiler() - self._tfx_compiler = tfx_compiler.Compiler() - self._params = [] # List of dsl.PipelineParam used in this pipeline. - self._params_by_component_id = collections.defaultdict(list) - self._deduped_parameter_names = set() # Set of unique param names used. - self._exit_handler = None - if pod_labels_to_attach is None: - self._pod_labels_to_attach = get_default_pod_labels() - else: - self._pod_labels_to_attach = pod_labels_to_attach - - def _parse_parameter_from_component( - self, component: tfx_base_component.BaseComponent) -> None: - """Extract embedded RuntimeParameter placeholders from a component. - - Extract embedded RuntimeParameter placeholders from a component, then append - the corresponding dsl.PipelineParam to KubeflowDagRunner. - - Args: - component: a TFX component. - """ - - deduped_parameter_names_for_component = set() - for parameter in component.exec_properties.values(): - if not isinstance(parameter, data_types.RuntimeParameter): - continue - # Ignore pipeline root because it will be added later. - if parameter.name == tfx_pipeline.ROOT_PARAMETER.name: - continue - if parameter.name in deduped_parameter_names_for_component: - continue - - deduped_parameter_names_for_component.add(parameter.name) - self._params_by_component_id[component.id].append(parameter) - if parameter.name not in self._deduped_parameter_names: - self._deduped_parameter_names.add(parameter.name) - # TODO(b/178436919): Create a test to cover default value rendering - # and move the external code reference over there. - # The default needs to be serialized then passed to dsl.PipelineParam. - # See - # https://github.com/kubeflow/pipelines/blob/f65391309650fdc967586529e79af178241b4c2c/sdk/python/kfp/dsl/_pipeline_param.py#L154 - dsl_parameter = dsl.PipelineParam( - name=parameter.name, value=str(parameter.default)) - self._params.append(dsl_parameter) - - def _parse_parameter_from_pipeline(self, - pipeline: tfx_pipeline.Pipeline) -> None: - """Extract all the RuntimeParameter placeholders from the pipeline.""" - - for component in pipeline.components: - self._parse_parameter_from_component(component) - - def _construct_pipeline_graph(self, pipeline: tfx_pipeline.Pipeline, - pipeline_root: dsl.PipelineParam): - """Constructs a Kubeflow Pipeline graph. - - Args: - pipeline: The logical TFX pipeline to base the construction on. - pipeline_root: dsl.PipelineParam representing the pipeline root. - """ - component_to_kfp_op = {} - - for component in pipeline.components: - utils.replace_exec_properties(component) - tfx_ir = self._generate_tfx_ir(pipeline) - - # Assumption: There is a partial ordering of components in the list, i.e., - # if component A depends on component B and C, then A appears after B and C - # in the list. - for component in pipeline.components: - # Keep track of the set of upstream dsl.ContainerOps for this component. - depends_on = set() - - for upstream_component in component.upstream_nodes: - depends_on.add(component_to_kfp_op[upstream_component]) - - # remove the extra pipeline node information - tfx_node_ir = self._dehydrate_tfx_ir(tfx_ir, component.id) - - # Disable cache for exit_handler - if self._exit_handler and component.id == self._exit_handler.id: - tfx_node_ir.nodes[ - 0].pipeline_node.execution_options.caching_options.enable_cache = False - - kfp_component = base_component.BaseComponent( - component=component, - depends_on=depends_on, - pipeline=pipeline, - pipeline_root=pipeline_root, - tfx_image=self._config.tfx_image, - kubeflow_metadata_config=self._config.kubeflow_metadata_config, - pod_labels_to_attach=self._pod_labels_to_attach, - tfx_ir=tfx_node_ir, - metadata_ui_path=self._config.metadata_ui_path, - runtime_parameters=(self._params_by_component_id[component.id] + - [tfx_pipeline.ROOT_PARAMETER])) - - for operator in self._config.pipeline_operator_funcs: - kfp_component.container_op.apply(operator) - - component_to_kfp_op[component] = kfp_component.container_op - - # If exit handler defined create an exit handler and add all ops to it. - if self._exit_handler: - exit_op = component_to_kfp_op[self._exit_handler] - with dsl.ExitHandler(exit_op) as exit_handler_group: - exit_handler_group.name = utils.TFX_DAG_NAME - # KFP get_default_pipeline should have the pipeline object when invoked - # while compiling. This allows us to retrieve all ops from pipeline - # group (should be the only group in the pipeline). - pipeline_group = dsl.Pipeline.get_default_pipeline().groups[0] - - # Transfer all ops to exit_handler_group which will now contain all ops. - exit_handler_group.ops = pipeline_group.ops - # remove all ops from pipeline_group. Otherwise compiler fails in - # https://github.com/kubeflow/pipelines/blob/8aee62142aa13ae42b2dd18257d7e034861b7e5e/sdk/python/kfp/compiler/compiler.py#L893 - pipeline_group.ops = [] - - def _del_unused_field(self, node_id: str, message_dict: MutableMapping[str, - Any]): - for item in list(message_dict.keys()): - if item != node_id: - del message_dict[item] - - def _dehydrate_tfx_ir(self, original_pipeline: pipeline_pb2.Pipeline, - node_id: str) -> pipeline_pb2.Pipeline: - pipeline = copy.deepcopy(original_pipeline) - for node in pipeline.nodes: - if (node.WhichOneof('node') == 'pipeline_node' and - node.pipeline_node.node_info.id == node_id): - del pipeline.nodes[:] - pipeline.nodes.extend([node]) - break - - deployment_config = pipeline_pb2.IntermediateDeploymentConfig() - pipeline.deployment_config.Unpack(deployment_config) - self._del_unused_field(node_id, deployment_config.executor_specs) - self._del_unused_field(node_id, deployment_config.custom_driver_specs) - self._del_unused_field(node_id, - deployment_config.node_level_platform_configs) - pipeline.deployment_config.Pack(deployment_config) - return pipeline - - def _generate_tfx_ir( - self, pipeline: tfx_pipeline.Pipeline) -> Optional[pipeline_pb2.Pipeline]: - result = self._tfx_compiler.compile(pipeline) - return result - - def run(self, pipeline: tfx_pipeline.Pipeline): - """Compiles and outputs a Kubeflow Pipeline YAML definition file. - - Args: - pipeline: The logical TFX pipeline to use when building the Kubeflow - pipeline. - """ - # If exit handler is defined, append to existing pipeline components. - if self._exit_handler: - original_pipeline = pipeline - pipeline = copy.copy(original_pipeline) - pipeline.components = [*pipeline.components, self._exit_handler] - - for component in pipeline.components: - # TODO(b/187122662): Pass through pip dependencies as a first-class - # component flag. - if isinstance(component, tfx_base_component.BaseComponent): - component._resolve_pip_dependencies( # pylint: disable=protected-access - pipeline.pipeline_info.pipeline_root) - - # KFP DSL representation of pipeline root parameter. - dsl_pipeline_root = dsl.PipelineParam( - name=tfx_pipeline.ROOT_PARAMETER.name, - value=pipeline.pipeline_info.pipeline_root) - self._params.append(dsl_pipeline_root) - - def _construct_pipeline(): - """Constructs a Kubeflow pipeline. - - Creates Kubeflow ContainerOps for each TFX component encountered in the - logical pipeline definition. - """ - self._construct_pipeline_graph(pipeline, dsl_pipeline_root) - - # Need to run this first to get self._params populated. Then KFP compiler - # can correctly match default value with PipelineParam. - self._parse_parameter_from_pipeline(pipeline) - - file_name = self._output_filename or get_default_output_filename( - pipeline.pipeline_info.pipeline_name) - # Create workflow spec and write out to package. - self._compiler._create_and_write_workflow( # pylint: disable=protected-access - pipeline_func=_construct_pipeline, - pipeline_name=pipeline.pipeline_info.pipeline_name, - params_list=self._params, - package_path=os.path.join(self._output_dir, file_name)) - - def set_exit_handler(self, exit_handler: base_node.BaseNode): - """Set exit handler components for the Kubeflow dag runner. - - This feature is currently experimental without backward compatibility - gaurantee. - - Args: - exit_handler: exit handler component. - """ - if not exit_handler: - logging.error('Setting empty exit handler is not allowed.') - return - assert not exit_handler.downstream_nodes, ('Exit handler should not depend ' - 'on any other node.') - assert not exit_handler.upstream_nodes, ('Exit handler should not depend on' - ' any other node.') - self._exit_handler = exit_handler diff --git a/tfx/orchestration/kubeflow/kubeflow_dag_runner_test.py b/tfx/orchestration/kubeflow/kubeflow_dag_runner_test.py deleted file mode 100644 index 47ac982f48..0000000000 --- a/tfx/orchestration/kubeflow/kubeflow_dag_runner_test.py +++ /dev/null @@ -1,329 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.orchestration.kubeflow.kubeflow_dag_runner.""" - -import json -import os -import tarfile -from typing import List - -from kfp import onprem -import tensorflow as tf -from tfx.components.statistics_gen import component as statistics_gen_component -from tfx.dsl.component.experimental import executor_specs -from tfx.dsl.component.experimental.annotations import Parameter -from tfx.dsl.component.experimental.decorators import component -from tfx.dsl.components.base import base_component -from tfx.dsl.io import fileio -from tfx.extensions.google_cloud_big_query.example_gen import component as big_query_example_gen_component -from tfx.orchestration import data_types -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.kubeflow import kubeflow_dag_runner -from tfx.orchestration.kubeflow.decorators import FinalStatusStr -from tfx.proto import example_gen_pb2 -from tfx.types import component_spec -from tfx.utils import telemetry_utils -from tfx.utils import test_case_utils -import yaml - -from ml_metadata.proto import metadata_store_pb2 - - -@component -def say_hi(status: Parameter[str]): - print(status) - - -# 2-step pipeline under test. -def _two_step_pipeline() -> tfx_pipeline.Pipeline: - default_input_config = json.dumps({ - 'splits': [{ - 'name': 'single_split', - 'pattern': 'SELECT * FROM default-table' - }] - }) - input_config = data_types.RuntimeParameter( - name='input_config', ptype=str, default=default_input_config) - example_gen = big_query_example_gen_component.BigQueryExampleGen( - input_config=input_config, output_config=example_gen_pb2.Output()) - statistics_gen = statistics_gen_component.StatisticsGen( - examples=example_gen.outputs['examples']) - return tfx_pipeline.Pipeline( - pipeline_name='two_step_pipeline', - pipeline_root='pipeline_root', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[example_gen, statistics_gen], - ) - - -class _DummySpec(component_spec.ComponentSpec): - INPUTS = {} - OUTPUTS = {} - PARAMETERS = {} - - -class _DummyComponent(base_component.BaseComponent): - SPEC_CLASS = _DummySpec - EXECUTOR_SPEC = executor_specs.TemplatedExecutorContainerSpec( - image='dummy:latest', command=['ls']) - - def __init__(self): - super().__init__(_DummySpec()) - - -def _container_component_pipeline() -> tfx_pipeline.Pipeline: - return tfx_pipeline.Pipeline( - pipeline_name='container_component_pipeline', - pipeline_root='pipeline_root', - metadata_connection_config=metadata_store_pb2.ConnectionConfig(), - components=[_DummyComponent()], - ) - - -class KubeflowDagRunnerTest(test_case_utils.TfxTest): - - def setUp(self): - super().setUp() - self._source_data_dir = os.path.join( - os.path.dirname(os.path.abspath(__file__)), 'testdata') - self.enter_context(test_case_utils.change_working_dir(self.tmp_dir)) - - def _compare_tfx_ir_against_testdata(self, args: List[str], golden_file: str): - index_of_tfx_ir_flag = args.index('--tfx_ir') - self.assertAllGreater(len(args), index_of_tfx_ir_flag) - real_tfx_ir = json.loads(args[index_of_tfx_ir_flag + 1]) - real_tfx_ir_str = json.dumps(real_tfx_ir, sort_keys=True) - with open(os.path.join(self._source_data_dir, - golden_file)) as tfx_ir_json_file: - formatted_tfx_ir = json.dumps(json.load(tfx_ir_json_file), sort_keys=True) - self.assertEqual(real_tfx_ir_str, formatted_tfx_ir) - - def testTwoStepPipeline(self): - """Sanity-checks the construction and dependencies for a 2-step pipeline.""" - kubeflow_dag_runner.KubeflowDagRunner().run(_two_step_pipeline()) - file_path = os.path.join(self.tmp_dir, 'two_step_pipeline.tar.gz') - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertEqual(2, len(containers)) - - big_query_container = [ - c for c in containers if c['name'] == 'bigqueryexamplegen' - ] - self.assertEqual(1, len(big_query_container)) - self.assertEqual([ - 'python', - '-m', - 'tfx.orchestration.kubeflow.container_entrypoint', - ], big_query_container[0]['container']['command']) - self.assertIn('--tfx_ir', big_query_container[0]['container']['args']) - self.assertIn('--node_id', big_query_container[0]['container']['args']) - self._compare_tfx_ir_against_testdata( - big_query_container[0]['container']['args'], - 'two_step_pipeline_post_dehydrate_ir.json') - - statistics_gen_container = [ - c for c in containers if c['name'] == 'statisticsgen' - ] - self.assertEqual(1, len(statistics_gen_container)) - - # Ensure the pod labels are correctly appended. - metadata = [ - c['metadata'] for c in pipeline['spec']['templates'] if 'dag' not in c - ] - for m in metadata: - self.assertEqual('tfx', m['labels'][telemetry_utils.LABEL_KFP_SDK_ENV]) - - # Ensure dependencies between components are captured. - dag = [c for c in pipeline['spec']['templates'] if 'dag' in c] - self.assertEqual(1, len(dag)) - - self.assertEqual( - { - 'tasks': [{ - 'name': 'bigqueryexamplegen', - 'template': 'bigqueryexamplegen', - 'arguments': { - 'parameters': [{ - 'name': 'input_config', - 'value': '{{inputs.parameters.input_config}}' - }, { - 'name': 'pipeline-root', - 'value': '{{inputs.parameters.pipeline-root}}' - }] - } - }, { - 'name': 'statisticsgen', - 'template': 'statisticsgen', - 'arguments': { - 'parameters': [{ - 'name': 'pipeline-root', - 'value': '{{inputs.parameters.pipeline-root}}' - }] - }, - 'dependencies': ['bigqueryexamplegen'], - }] - }, dag[0]['dag']) - - def testDefaultPipelineOperatorFuncs(self): - kubeflow_dag_runner.KubeflowDagRunner().run(_two_step_pipeline()) - file_path = 'two_step_pipeline.tar.gz' - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertEqual(2, len(containers)) - - def testMountGcpServiceAccount(self): - kubeflow_dag_runner.KubeflowDagRunner( - config=kubeflow_dag_runner.KubeflowDagRunnerConfig( - pipeline_operator_funcs=kubeflow_dag_runner - .get_default_pipeline_operator_funcs(use_gcp_sa=True))).run( - _two_step_pipeline()) - file_path = 'two_step_pipeline.tar.gz' - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertEqual(2, len(containers)) - - # Check that each container has default GCP credentials. - - container_0 = containers[0] - env = [ - env for env in container_0['container']['env'] - if env['name'] == 'GOOGLE_APPLICATION_CREDENTIALS' - ] - self.assertEqual(1, len(env)) - self.assertEqual('/secret/gcp-credentials/user-gcp-sa.json', - env[0]['value']) - - container_1 = containers[0] - env = [ - env for env in container_1['container']['env'] - if env['name'] == 'GOOGLE_APPLICATION_CREDENTIALS' - ] - self.assertEqual(1, len(env)) - self.assertEqual('/secret/gcp-credentials/user-gcp-sa.json', - env[0]['value']) - - def testVolumeMountingPipelineOperatorFuncs(self): - mount_volume_op = onprem.mount_pvc('my-persistent-volume-claim', - 'my-volume-name', - '/mnt/volume-mount-path') - config = kubeflow_dag_runner.KubeflowDagRunnerConfig( - pipeline_operator_funcs=[mount_volume_op]) - - kubeflow_dag_runner.KubeflowDagRunner(config=config).run( - _two_step_pipeline()) - file_path = 'two_step_pipeline.tar.gz' - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - - container_templates = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertEqual(2, len(container_templates)) - - volumes = [{ - 'name': 'my-volume-name', - 'persistentVolumeClaim': { - 'claimName': 'my-persistent-volume-claim' - } - }] - - # Check that the PVC is specified for kfp<=0.1.31.1. - if 'volumes' in pipeline['spec']: - self.assertEqual(volumes, pipeline['spec']['volumes']) - - for template in container_templates: - # Check that each container has the volume mounted. - self.assertEqual([{ - 'name': 'my-volume-name', - 'mountPath': '/mnt/volume-mount-path' - }], template['container']['volumeMounts']) - - # Check that each template has the PVC specified for kfp>=0.1.31.2. - if 'volumes' in template: - self.assertEqual(volumes, template['volumes']) - - def testContainerComponent(self): - kubeflow_dag_runner.KubeflowDagRunner().run(_container_component_pipeline()) - file_path = os.path.join(self.tmp_dir, - 'container_component_pipeline.tar.gz') - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertLen(containers, 1) - component_args = containers[0]['container']['args'] - self.assertIn('--node_id', component_args) - - def testExitHandler(self): - dag_runner = kubeflow_dag_runner.KubeflowDagRunner() - dag_runner.set_exit_handler(say_hi(status=FinalStatusStr())) - pipeline = _container_component_pipeline() - pipeline.enable_cache = True - dag_runner.run(pipeline) - file_path = os.path.join(self.tmp_dir, - 'container_component_pipeline.tar.gz') - self.assertTrue(fileio.exists(file_path)) - - with tarfile.TarFile.open(file_path).extractfile( - 'pipeline.yaml') as pipeline_file: - self.assertIsNotNone(pipeline_file) - pipeline = yaml.safe_load(pipeline_file) - self.assertIn('onExit', pipeline['spec']) - containers = [ - c for c in pipeline['spec']['templates'] if 'container' in c - ] - self.assertLen(containers, 2) - exit_component_args = ' '.join(containers[1]['container']['args']) - self.assertIn('{{workflow.status}}', exit_component_args) - self.assertNotIn('enableCache', exit_component_args) - first_component_args = ' '.join(containers[0]['container']['args']) - self.assertNotIn('{{workflow.status}}', first_component_args) - self.assertIn('enableCache', first_component_args) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/orchestration/kubeflow/proto/BUILD b/tfx/orchestration/kubeflow/proto/BUILD deleted file mode 100644 index b0ee822ee6..0000000000 --- a/tfx/orchestration/kubeflow/proto/BUILD +++ /dev/null @@ -1,25 +0,0 @@ -load("//tfx:tfx.bzl", "tfx_py_proto_library") - -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) # Apache 2.0 - -exports_files(["LICENSE"]) - -tfx_py_proto_library( - name = "kubeflow_proto_py_pb2", - srcs = ["kubeflow.proto"], -) diff --git a/tfx/orchestration/kubeflow/proto/kubeflow.proto b/tfx/orchestration/kubeflow/proto/kubeflow.proto deleted file mode 100644 index bab34bdc69..0000000000 --- a/tfx/orchestration/kubeflow/proto/kubeflow.proto +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2019 Google LLC. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -syntax = "proto3"; - -package tfx.orchestration.kubeflow.proto; - -// ConfigValue specifies how Kubeflow components should obtain a runtime -// configuration parameter value. -message ConfigValue { - oneof value_from { - // Specifies a literal value to use. - string value = 1; - // Specifies that the parameter value should be obtained from the - // environment variable with this specified value. - string environment_variable = 2; - } -} - -// Message to specify the gRPC server configuration. -message KubeflowGrpcMetadataConfig { - // ML Metadata gRPC service host in the cluster. - ConfigValue grpc_service_host = 1; - // ML Metadata gRPC service port in the cluster. - ConfigValue grpc_service_port = 2; -} - -// Message to specify Metadata configuration. -message KubeflowMetadataConfig { - // Following mysql connection configuration fields will be deprecated soon in - // favor of oneof connection_config. - - ConfigValue mysql_db_service_host = 1 [deprecated = true]; - ConfigValue mysql_db_service_port = 2 [deprecated = true]; - ConfigValue mysql_db_name = 3 [deprecated = true]; - ConfigValue mysql_db_user = 4 [deprecated = true]; - ConfigValue mysql_db_password = 5 [deprecated = true]; - - oneof connection_config { - KubeflowGrpcMetadataConfig grpc_config = 7; - } -} diff --git a/tfx/orchestration/pipeline.py b/tfx/orchestration/pipeline.py index b2622eda97..6920441576 100644 --- a/tfx/orchestration/pipeline.py +++ b/tfx/orchestration/pipeline.py @@ -40,7 +40,7 @@ _MAX_PIPELINE_NAME_LENGTH = 63 # Pipeline root is by default specified as a RuntimeParameter when runnning on -# KubeflowDagRunner. This constant offers users an easy access to the pipeline +# KubeflowV2DagRunner. This constant offers users an easy access to the pipeline # root placeholder when defining a pipeline. For example, # # pusher = Pusher( diff --git a/tfx/tools/cli/handler/kubeflow_dag_runner_patcher.py b/tfx/tools/cli/handler/kubeflow_dag_runner_patcher.py deleted file mode 100644 index 01ea50d940..0000000000 --- a/tfx/tools/cli/handler/kubeflow_dag_runner_patcher.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2021 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Patches KubeflowDagRunner to read and update argument during compilation.""" - -import os -import tempfile -import typing -from typing import Any, Callable, MutableMapping, Optional, Type - -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration import tfx_runner -from tfx.orchestration.kubeflow import kubeflow_dag_runner -from tfx.tools.cli.handler import dag_runner_patcher - - -def _get_temporary_package_filename(pipeline_name: str, directory: str) -> str: - # mkstemp will create and open a file named 'temp_xxxxx.tar.gz'. - fd, path = tempfile.mkstemp('.tar.gz', f'temp_{pipeline_name}', directory) - os.close(fd) - return os.path.basename(path) - - -class KubeflowDagRunnerPatcher(dag_runner_patcher.DagRunnerPatcher): - """Patches KubeflowDagRunner.run() with several customizations for CLI.""" - - USE_TEMPORARY_OUTPUT_FILE = 'use_temporary_output_file' - OUTPUT_FILE_PATH = 'output_file_path' - - def __init__(self, - call_real_run: bool, - use_temporary_output_file: bool = False, - build_image_fn: Optional[Callable[[str], str]] = None): - """Initialize KubeflowDagRunnerPatcher. - - Args: - call_real_run: Specify KubeflowDagRunner.run() should be called. - use_temporary_output_file: If True, we will override the default value of - the pipeline package output path. Even if it is set to True, if users - specified a path in KubeflowDagRunner then this option will be ignored. - build_image_fn: If specified, call the function with the configured - tfx_image in the pipeline. The result of the function will be - substituted as a new tfx_image of the pipeline. - """ - super().__init__(call_real_run) - self._build_image_fn = build_image_fn - self._use_temporary_output_file = use_temporary_output_file - - def _before_run(self, runner: tfx_runner.TfxRunner, - pipeline: tfx_pipeline.Pipeline, - context: MutableMapping[str, Any]) -> None: - runner = typing.cast(kubeflow_dag_runner.KubeflowDagRunner, runner) - runner_config = typing.cast(kubeflow_dag_runner.KubeflowDagRunnerConfig, - runner.config) - if self._build_image_fn is not None: - # Replace the image for the pipeline with the newly built image name. - # This new image name will include the sha256 image id. - runner_config.tfx_image = self._build_image_fn(runner_config.tfx_image) - - # pylint: disable=protected-access - context[self.USE_TEMPORARY_OUTPUT_FILE] = ( - runner._output_filename is None and self._use_temporary_output_file) - if context[self.USE_TEMPORARY_OUTPUT_FILE]: - # Replace the output of the kfp compile to a temporary file. - # This file will be deleted after job submission in kubeflow_handler.py - runner._output_filename = _get_temporary_package_filename( - context[self.PIPELINE_NAME], runner._output_dir) - output_filename = ( - runner._output_filename or - kubeflow_dag_runner.get_default_output_filename( - context[self.PIPELINE_NAME])) - context[self.OUTPUT_FILE_PATH] = os.path.join(runner._output_dir, - output_filename) - - def get_runner_class(self) -> Type[tfx_runner.TfxRunner]: - return kubeflow_dag_runner.KubeflowDagRunner diff --git a/tfx/tools/cli/handler/kubeflow_dag_runner_patcher_test.py b/tfx/tools/cli/handler/kubeflow_dag_runner_patcher_test.py deleted file mode 100644 index e1b2459caa..0000000000 --- a/tfx/tools/cli/handler/kubeflow_dag_runner_patcher_test.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2021 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for tfx.tools.cli.handler.kubeflow_dag_runner_patcher.""" - -import os -from unittest import mock - -import tensorflow as tf -from tfx.orchestration import pipeline as tfx_pipeline -from tfx.orchestration.kubeflow import kubeflow_dag_runner -from tfx.tools.cli.handler import kubeflow_dag_runner_patcher -from tfx.utils import test_case_utils - - -class KubeflowDagRunnerPatcherTest(test_case_utils.TfxTest): - - def setUp(self): - super().setUp() - self.enter_context(test_case_utils.change_working_dir(self.tmp_dir)) - - def testPatcher(self): - given_image_name = 'foo/bar' - built_image_name = 'foo/bar@sha256:1234567890' - - mock_build_image_fn = mock.MagicMock(return_value=built_image_name) - patcher = kubeflow_dag_runner_patcher.KubeflowDagRunnerPatcher( - call_real_run=True, - build_image_fn=mock_build_image_fn, - use_temporary_output_file=True) - runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig( - tfx_image=given_image_name) - runner = kubeflow_dag_runner.KubeflowDagRunner(config=runner_config) - pipeline = tfx_pipeline.Pipeline('dummy', 'dummy_root') - with patcher.patch() as context: - runner.run(pipeline) - self.assertTrue(context[patcher.USE_TEMPORARY_OUTPUT_FILE]) - self.assertIn(patcher.OUTPUT_FILE_PATH, context) - - mock_build_image_fn.assert_called_once_with(given_image_name) - self.assertEqual(runner_config.tfx_image, built_image_name) - - def testPatcherWithOutputFile(self): - output_filename = 'foo.tar.gz' - patcher = kubeflow_dag_runner_patcher.KubeflowDagRunnerPatcher( - call_real_run=False, - build_image_fn=None, - use_temporary_output_file=True) - runner = kubeflow_dag_runner.KubeflowDagRunner( - output_filename=output_filename) - pipeline = tfx_pipeline.Pipeline('dummy', 'dummy_root') - with patcher.patch() as context: - runner.run(pipeline) - self.assertFalse(context[patcher.USE_TEMPORARY_OUTPUT_FILE]) - self.assertEqual( - os.path.basename(context[patcher.OUTPUT_FILE_PATH]), output_filename) - self.assertEqual(runner._output_filename, output_filename) - - -if __name__ == '__main__': - tf.test.main() diff --git a/tfx/v1/orchestration/experimental/__init__.py b/tfx/v1/orchestration/experimental/__init__.py index 7963c45a1f..d222954eea 100644 --- a/tfx/v1/orchestration/experimental/__init__.py +++ b/tfx/v1/orchestration/experimental/__init__.py @@ -12,23 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. """TFX orchestration.experimental module.""" - -try: # pylint: disable=g-statement-before-imports - from tfx.orchestration.kubeflow import kubeflow_dag_runner # pylint: disable=g-import-not-at-top - from tfx.orchestration.kubeflow.decorators import exit_handler # pylint: disable=g-import-not-at-top - from tfx.orchestration.kubeflow.decorators import FinalStatusStr # pylint: disable=g-import-not-at-top - from tfx.utils import telemetry_utils # pylint: disable=g-import-not-at-top - - KubeflowDagRunner = kubeflow_dag_runner.KubeflowDagRunner - KubeflowDagRunnerConfig = kubeflow_dag_runner.KubeflowDagRunnerConfig - get_default_kubeflow_metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config - LABEL_KFP_SDK_ENV = telemetry_utils.LABEL_KFP_SDK_ENV - - del telemetry_utils - del kubeflow_dag_runner -except ImportError: # Import will fail without kfp package. - pass - try: from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner # pylint: disable=g-import-not-at-top diff --git a/tfx/v1/proto/__init__.py b/tfx/v1/proto/__init__.py index eb6bdb30a7..edfcd92c72 100644 --- a/tfx/v1/proto/__init__.py +++ b/tfx/v1/proto/__init__.py @@ -142,7 +142,7 @@ """ KubernetesConfig.__doc__ = """ -Kubernetes configuration. We currently only support the use case when infra validator is run by `orchestration.KubeflowDagRunner`. +Kubernetes configuration. Model server will be launched in the same namespace KFP is running on, as well as same service account will be used (unless specified). Model server will have `ownerReferences` to the infra validator, which delegates the strict cleanup guarantee to the kubernetes cluster. """ @@ -262,4 +262,4 @@ PairedExampleSkew.__doc__ = """ Configurations related to Example Diff on feature pairing level. -""" \ No newline at end of file +""" From a6fd9bc2f458f47664c399dc114130d4e49615f4 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Tue, 26 Nov 2024 03:02:11 +0000 Subject: [PATCH 02/18] 1.16 rc --- test_constraints.txt | 351 +++++++++++++++++- .../testdata/module_file/trainer_module.py | 2 - tfx/dependencies.py | 30 +- .../taxi_utils_native_keras.py | 130 +++---- tfx/examples/imdb/imdb_utils_native_keras.py | 30 +- .../mnist/mnist_utils_native_keras.py | 3 +- .../mnist/mnist_utils_native_keras_base.py | 4 +- .../penguin_pipeline_sklearn_local.py | 26 -- ...penguin_pipeline_sklearn_local_e2e_test.py | 3 +- tfx/examples/penguin/penguin_utils_keras.py | 26 +- 10 files changed, 454 insertions(+), 151 deletions(-) diff --git a/test_constraints.txt b/test_constraints.txt index 14290324ad..21053a8243 100644 --- a/test_constraints.txt +++ b/test_constraints.txt @@ -12,5 +12,352 @@ Flask-session<0.6.0 #TODO(b/329181965): Remove once we migrate TFX to 2.16. -tensorflow==2.15.1 -tensorflow-text==2.15.0 +tensorflow==2.16.2 +tensorflow-text==2.16.1 + +absl-py==1.4.0 +aiohappyeyeballs==2.4.3 +aiosignal==1.3.1 +alembic==1.13.3 +annotated-types==0.7.0 +anyio==4.6.0 +apache-airflow==2.10.3 +apache-beam==2.59.0 +apispec==6.6.1 +argcomplete==3.5.1 +argon2-cffi==23.1.0 +argon2-cffi-bindings==21.2.0 +array_record==0.5.1 +arrow==1.3.0 +asgiref==3.8.1 +astunparse==1.6.3 +async-lru==2.0.4 +async-timeout==4.0.3 +attrs==23.2.0 +babel==2.16.0 +backcall==0.2.0 +beautifulsoup4==4.12.3 +bleach==6.1.0 +blinker==1.8.2 +cachelib==0.9.0 +cachetools==5.5.0 +certifi==2024.8.30 +cffi==1.17.1 +cfgv==3.4.0 +charset-normalizer==3.4.0 +chex==0.1.86 +click==8.1.7 +clickclick==20.10.2 +cloudpickle==2.2.1 +colorama==0.4.6 +colorlog==6.8.2 +comm==0.2.2 +ConfigUpdater==3.2 +connexion==2.14.2 +cramjam==2.8.4 +crcmod==1.7 +cron-descriptor==1.4.5 +croniter==3.0.3 +cryptography==43.0.1 +Cython==3.0.11 +debugpy==1.8.7 +decorator==5.1.1 +defusedxml==0.7.1 +Deprecated==1.2.14 +dill==0.3.1.1 +distlib==0.3.9 +dm-tree==0.1.8 +dnspython==2.7.0 +docker==7.1.0 +docopt==0.6.2 +docstring_parser==0.16 +docutils==0.21.2 +email_validator==2.2.0 +etils==1.5.2 +exceptiongroup==1.2.2 +fastavro==1.9.7 +fasteners==0.19 +fastjsonschema==2.20.0 +filelock==3.16.1 +Flask==2.2.5 +Flask-Babel==2.0.0 +Flask-Caching==2.3.0 +Flask-JWT-Extended==4.6.0 +Flask-Limiter==3.8.0 +Flask-Login==0.6.3 +Flask-Session==0.5.0 +Flask-SQLAlchemy==2.5.1 +Flask-WTF==1.2.1 +flatbuffers==24.3.25 +flax==0.8.4 +fqdn==1.5.1 +frozenlist==1.4.1 +fsspec==2024.9.0 +gast==0.6.0 +google-api-core==2.21.0 +google-api-python-client==1.12.11 +google-apitools==0.5.31 +google-auth==2.35.0 +google-auth-httplib2==0.2.0 +google-auth-oauthlib==1.2.1 +google-cloud-aiplatform==1.70.0 +google-cloud-bigquery==3.26.0 +google-cloud-bigquery-storage==2.26.0 +google-cloud-bigtable==2.26.0 +google-cloud-core==2.4.1 +google-cloud-datastore==2.20.1 +google-cloud-dlp==3.23.0 +google-cloud-language==2.14.0 +google-cloud-pubsub==2.26.0 +google-cloud-pubsublite==1.11.1 +google-cloud-recommendations-ai==0.10.12 +google-cloud-resource-manager==1.12.5 +google-cloud-spanner==3.49.1 +google-cloud-storage==2.18.2 +google-cloud-videointelligence==2.13.5 +google-cloud-vision==3.7.4 +google-crc32c==1.6.0 +google-pasta==0.2.0 +google-re2==1.1.20240702 +google-resumable-media==2.7.2 +googleapis-common-protos==1.65.0 +greenlet==3.1.1 +grpc-google-iam-v1==0.13.1 +grpc-interceptor==0.15.4 +grpcio==1.66.2 +grpcio-status==1.48.2 +gunicorn==23.0.0 +h11==0.14.0 +h5py==3.12.1 +hdfs==2.7.3 +httpcore==1.0.6 +httplib2==0.22.0 +httpx==0.27.2 +identify==2.6.1 +idna==3.10 +importlib_metadata==8.4.0 +importlib_resources==6.4.5 +inflection==0.5.1 +iniconfig==2.0.0 +ipykernel==6.29.5 +ipython-genutils==0.2.0 +ipywidgets==7.8.4 +isoduration==20.11.0 +itsdangerous==2.2.0 +jax==0.4.23 +jaxlib==0.4.23 +jedi==0.19.1 +Jinja2==3.1.4 +jmespath==1.0.1 +joblib==1.4.2 +Js2Py==0.74 +json5==0.9.25 +jsonpickle==3.3.0 +jsonpointer==3.0.0 +jsonschema==4.23.0 +jsonschema-specifications==2024.10.1 +jupyter-events==0.10.0 +jupyter-lsp==2.2.5 +jupyter_client==8.6.3 +jupyter_core==5.7.2 +jupyter_server==2.13.0 +jupyter_server_terminals==0.5.3 +jupyterlab==4.2.5 +jupyterlab_pygments==0.3.0 +jupyterlab_server==2.27.3 +jupyterlab_widgets==1.1.10 +tf-keras==2.16.0 +keras-tuner==1.4.7 +kfp==2.5.0 +kfp-pipeline-spec==0.2.2 +kfp-server-api==2.0.5 +kt-legacy==1.0.5 +kubernetes==26.1.0 +lazy-object-proxy==1.10.0 +libclang==18.1.1 +limits==3.13.0 +linkify-it-py==2.0.3 +lockfile==0.12.2 +lxml==5.3.0 +Mako==1.3.5 +Markdown==3.7 +markdown-it-py==3.0.0 +MarkupSafe==3.0.1 +marshmallow==3.22.0 +marshmallow-oneofschema==3.1.1 +marshmallow-sqlalchemy==0.28.2 +matplotlib-inline==0.1.7 +mdit-py-plugins==0.4.2 +mdurl==0.1.2 +methodtools==0.4.7 +mistune==3.0.2 +ml-dtypes==0.3.2 +ml-metadata>=1.16.0 +mmh==2.2 +more-itertools==10.5.0 +msgpack==1.1.0 +multidict==6.1.0 +mysql-connector-python==9.1.0 +mysqlclient==2.2.4 +nbclient==0.10.0 +nbconvert==7.16.4 +nbformat==5.10.4 +nest-asyncio==1.6.0 +nltk==3.9.1 +nodeenv==1.9.1 +notebook==7.2.2 +notebook_shim==0.2.4 +numpy==1.26.4 +oauth2client==4.1.3 +oauthlib==3.2.2 +objsize==0.7.0 +opentelemetry-api==1.27.0 +opentelemetry-exporter-otlp==1.27.0 +opentelemetry-exporter-otlp-proto-common==1.27.0 +opentelemetry-exporter-otlp-proto-grpc==1.27.0 +opentelemetry-exporter-otlp-proto-http==1.27.0 +opentelemetry-proto==1.27.0 +opentelemetry-sdk==1.27.0 +opentelemetry-semantic-conventions==0.48b0 +opt_einsum==3.4.0 +optax==0.2.2 +orbax-checkpoint==0.5.16 +ordered-set==4.1.0 +orjson==3.10.6 +overrides==7.7.0 +packaging==23.2 +pandas==1.5.3 +pandocfilters==1.5.1 +parso==0.8.4 +pathspec==0.12.1 +pendulum==3.0.0 +pexpect==4.9.0 +pickleshare==0.7.5 +pillow==10.4.0 +platformdirs==4.3.6 +pluggy==1.5.0 +portalocker==2.10.1 +portpicker==1.6.0 +pre_commit==4.0.1 +presto-python-client==0.7.0 +prison==0.2.1 +prometheus_client==0.21.0 +promise==2.3 +prompt_toolkit==3.0.48 +propcache==0.2.0 +proto-plus==1.24.0 +protobuf==3.20.3 +psutil==6.0.0 +ptyprocess==0.7.0 +pyarrow-hotfix==0.6 +pyasn1==0.6.1 +pyasn1_modules==0.4.1 +pybind11==2.13.6 +pycparser==2.22 +pydantic==2.9.2 +pydantic_core==2.23.4 +pydot==1.4.2 +pyfarmhash==0.3.2 +Pygments==2.18.0 +pyjsparser==2.7.1 +PyJWT==2.9.0 +pymongo==4.10.1 +pyparsing==3.1.4 +pytest==8.0.0 +pytest-subtests==0.13.1 +python-daemon==3.0.1 +python-dateutil==2.9.0.post0 +python-json-logger==2.0.7 +python-nvd3==0.16.0 +python-slugify==8.0.4 +python-snappy==0.7.3 +pytz==2024.2 +PyYAML==6.0.2 +pyzmq==26.2.0 +redis==5.1.1 +referencing==0.35.1 +regex==2024.9.11 +requests==2.32.3 +requests-oauthlib==2.0.0 +requests-toolbelt==0.10.1 +rfc3339-validator==0.1.4 +rfc3986-validator==0.1.1 +rich==13.9.2 +rich-argparse==1.5.2 +rouge_score==0.1.2 +rpds-py==0.20.0 +rsa==4.9 +sacrebleu==2.4.3 +scikit-learn==1.5.1 +scipy==1.12.0 +Send2Trash==1.8.3 +setproctitle==1.3.3 +shapely==2.0.6 +six==1.16.0 +slackclient==2.9.4 +sniffio==1.3.1 +sounddevice==0.5.0 +soupsieve==2.6 +SQLAlchemy==1.4.54 +SQLAlchemy-JSONField==1.0.2 +SQLAlchemy-Utils==0.41.2 +sqlparse==0.5.1 +struct2tensor>=0.47.0 +tabulate==0.9.0 +tenacity==9.0.0 +tensorboard==2.16.2 +tensorboard-data-server==0.7.2 +tensorflow==2.16.2 +tensorflow-cloud==0.1.16 +tensorflow-data-validation>=1.16.1 +tensorflow-datasets==4.9.3 +tensorflow-decision-forests==1.9.2 +tensorflow-estimator==2.15.0 +tensorflow-hub==0.15.0 +tensorflow-io==0.24.0 +tensorflow-io-gcs-filesystem==0.24.0 +tensorflow-metadata>=1.16.1 +# tensorflow-ranking==0.5.5 +tensorflow-serving-api==2.16.1 +tensorflow-text==2.16.1 +tensorflow-transform>=1.16.0 +tensorflow_model_analysis>=0.47.0 +tensorflowjs==4.17.0 +tensorstore==0.1.66 +termcolor==2.5.0 +terminado==0.18.1 +text-unidecode==1.3 +tflite-support==0.4.4 +tfx-bsl>=1.16.1 +threadpoolctl==3.5.0 +time-machine==2.16.0 +tinycss2==1.3.0 +toml==0.10.2 +tomli==2.0.2 +toolz==1.0.0 +tornado==6.4.1 +tqdm==4.66.5 +traitlets==5.14.3 +types-python-dateutil==2.9.0.20241003 +typing_extensions==4.12.2 +tzdata==2024.2 +tzlocal==5.2 +uc-micro-py==1.0.3 +unicodecsv==0.14.1 +universal_pathlib==0.2.5 +uri-template==1.3.0 +uritemplate==3.0.1 +urllib3==1.26.20 +virtualenv==20.26.6 +wcwidth==0.2.13 +webcolors==24.8.0 +webencodings==0.5.1 +websocket-client==0.59.0 +widgetsnbextension==3.6.9 +wirerope==0.4.7 +wrapt==1.14.1 +WTForms==3.1.2 +wurlitzer==3.1.1 +yarl==1.14.0 +zipp==3.20.2 +zstandard==0.23.0 diff --git a/tfx/components/testdata/module_file/trainer_module.py b/tfx/components/testdata/module_file/trainer_module.py index 30f24cecc4..6bc36767a0 100644 --- a/tfx/components/testdata/module_file/trainer_module.py +++ b/tfx/components/testdata/module_file/trainer_module.py @@ -205,8 +205,6 @@ def _build_keras_model( **wide_categorical_input, } - # TODO(b/161952382): Replace with Keras premade models and - # Keras preprocessing layers. deep = tf.keras.layers.concatenate( [tf.keras.layers.Normalization()(layer) for layer in deep_input.values()] ) diff --git a/tfx/dependencies.py b/tfx/dependencies.py index 7666dd185a..c3b2239bbd 100644 --- a/tfx/dependencies.py +++ b/tfx/dependencies.py @@ -58,9 +58,9 @@ def make_pipeline_sdk_required_install_packages(): "ml-metadata" + select_constraint( # LINT.IfChange - default=">=1.15.0,<1.16.0", + default=">=1.16.0,<1.17.0", # LINT.ThenChange(tfx/workspace.bzl) - nightly=">=1.16.0.dev", + nightly=">=1.17.0.dev", git_master="@git+https://github.com/google/ml-metadata@master", ), "packaging>=22", @@ -105,32 +105,32 @@ def make_required_install_packages(): # Pip might stuck in a TF 1.15 dependency although there is a working # dependency set with TF 2.x without the sync. # pylint: disable=line-too-long - "tensorflow" + select_constraint(">=2.15.0,<2.16"), + "tensorflow" + select_constraint(">=2.16.0,<2.17"), # pylint: enable=line-too-long "tensorflow-hub>=0.15.0,<0.16", "tensorflow-data-validation" + select_constraint( - default=">=1.15.1,<1.16.0", - nightly=">=1.16.0.dev", + default=">=1.16.1,<1.17.0", + nightly=">=1.17.0.dev", git_master=("@git+https://github.com/tensorflow/data-validation@master"), ), "tensorflow-model-analysis" + select_constraint( - default=">=0.46.0,<0.47.0", - nightly=">=0.47.0.dev", + default=">=0.47.0,<0.48.0", + nightly=">=0.48.0.dev", git_master="@git+https://github.com/tensorflow/model-analysis@master", ), - "tensorflow-serving-api>=2.15,<2.16", + "tensorflow-serving-api>=2.16,<2.17", "tensorflow-transform" + select_constraint( - default=">=1.15.0,<1.16.0", - nightly=">=1.16.0.dev", + default=">=1.16.0,<1.17.0", + nightly=">=1.17.0.dev", git_master="@git+https://github.com/tensorflow/transform@master", ), "tfx-bsl" + select_constraint( - default=">=1.15.1,<1.16.0", - nightly=">=1.16.0.dev", + default=">=1.16.1,<1.17.0", + nightly=">=1.17.0.dev", git_master="@git+https://github.com/tensorflow/tfx-bsl@master", ), ] @@ -199,8 +199,8 @@ def make_extra_packages_tf_ranking(): "tensorflow-ranking>=0.5,<0.6", "struct2tensor" + select_constraint( - default=">=0.46.0,<0.47.0", - nightly=">=0.47.0.dev", + default=">=0.47.0,<0.48.0", + nightly=">=0.48.0.dev", git_master="@git+https://github.com/google/struct2tensor@master", ), ] @@ -211,7 +211,7 @@ def make_extra_packages_tfdf(): # Required for tfx/examples/penguin/penguin_utils_tfdf_experimental.py return [ # NOTE: TFDF 1.0.1 is only compatible with TF 2.10.x. - "tensorflow-decision-forests>=1.0.1,<1.9", + "tensorflow-decision-forests>=1.8.1,<2", ] diff --git a/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py b/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py index d113e89c51..cbf1183d1a 100644 --- a/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py +++ b/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py @@ -28,7 +28,7 @@ from tfx_bsl.tfxio import dataset_options # Categorical features are assumed to each have a maximum value in the dataset. -_MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12] +_MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 13] _CATEGORICAL_FEATURE_KEYS = [ 'trip_start_hour', 'trip_start_day', 'trip_start_month', @@ -164,7 +164,6 @@ def _input_fn(file_pattern: List[str], batch_size=batch_size, label_key=_transformed_name(_LABEL_KEY)), tf_transform_output.transformed_metadata.schema).repeat() - def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model: """Creates a DNN Keras model for classifying taxi data. @@ -172,94 +171,76 @@ def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model: hidden_units: [int], the layer sizes of the DNN (input layer first). Returns: - A keras Model. - """ - real_valued_columns = [ - tf.feature_column.numeric_column(key, shape=()) - for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS) - ] - categorical_columns = [ - tf.feature_column.categorical_column_with_identity( - key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0) - for key in _transformed_names(_VOCAB_FEATURE_KEYS) - ] - categorical_columns += [ - tf.feature_column.categorical_column_with_identity( - key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0) - for key in _transformed_names(_BUCKET_FEATURE_KEYS) - ] - categorical_columns += [ - tf.feature_column.categorical_column_with_identity( # pylint: disable=g-complex-comprehension - key, - num_buckets=num_buckets, - default_value=0) for key, num_buckets in zip( - _transformed_names(_CATEGORICAL_FEATURE_KEYS), - _MAX_CATEGORICAL_FEATURE_VALUES) - ] - indicator_column = [ - tf.feature_column.indicator_column(categorical_column) - for categorical_column in categorical_columns - ] - - model = _wide_and_deep_classifier( - # TODO(b/139668410) replace with premade wide_and_deep keras model - wide_columns=indicator_column, - deep_columns=real_valued_columns, - dnn_hidden_units=hidden_units or [100, 70, 50, 25]) - return model - - -def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units): - """Build a simple keras wide and deep model. - - Args: - wide_columns: Feature columns wrapped in indicator_column for wide (linear) - part of the model. - deep_columns: Feature columns for deep part of the model. - dnn_hidden_units: [int], the layer sizes of the hidden DNN. - - Returns: - A Wide and Deep Keras model + A Wide and Deep keras Model. """ # Following values are hard coded for simplicity in this example, # However prefarably they should be passsed in as hparams. # Keras needs the feature definitions at compile time. - # TODO(b/139081439): Automate generation of input layers from FeatureColumn. - input_layers = { - colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32) + deep_input = { + colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype=tf.float32) for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS) } - input_layers.update({ - colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32') + wide_vocab_input = { + colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') for colname in _transformed_names(_VOCAB_FEATURE_KEYS) - }) - input_layers.update({ - colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32') + } + wide_bucket_input = { + colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') for colname in _transformed_names(_BUCKET_FEATURE_KEYS) - }) - input_layers.update({ - colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32') + } + wide_categorical_input = { + colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS) - }) + } + input_layers = { + **deep_input, + **wide_vocab_input, + **wide_bucket_input, + **wide_categorical_input, + } - # TODO(b/161952382): Replace with Keras premade models and - # Keras preprocessing layers. - deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers) - for numnodes in dnn_hidden_units: + deep = tf.keras.layers.concatenate( + [tf.keras.layers.Normalization()(layer) for layer in deep_input.values()] + ) + for numnodes in (hidden_units or [100, 70, 50, 25]): deep = tf.keras.layers.Dense(numnodes)(deep) - wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers) - output = tf.keras.layers.Dense( - 1, activation='sigmoid')( - tf.keras.layers.concatenate([deep, wide])) - output = tf.squeeze(output, -1) + wide_layers = [] + for key in _transformed_names(_VOCAB_FEATURE_KEYS): + wide_layers.append( + tf.keras.layers.CategoryEncoding(num_tokens=_VOCAB_SIZE + _OOV_SIZE)( + input_layers[key] + ) + ) + for key in _transformed_names(_BUCKET_FEATURE_KEYS): + wide_layers.append( + tf.keras.layers.CategoryEncoding(num_tokens=_FEATURE_BUCKET_COUNT)( + input_layers[key] + ) + ) + for key, num_tokens in zip( + _transformed_names(_CATEGORICAL_FEATURE_KEYS), + _MAX_CATEGORICAL_FEATURE_VALUES, + ): + wide_layers.append( + tf.keras.layers.CategoryEncoding(num_tokens=num_tokens)( + input_layers[key] + ) + ) + wide = tf.keras.layers.concatenate(wide_layers) + + output = tf.keras.layers.Dense(1, activation='sigmoid')( + tf.keras.layers.concatenate([deep, wide]) + ) + output = tf.keras.layers.Reshape((1,))(output) model = tf.keras.Model(input_layers, output) model.compile( loss='binary_crossentropy', - optimizer=tf.keras.optimizers.Adam(lr=0.001), - metrics=[tf.keras.metrics.BinaryAccuracy()]) + optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), + metrics=[tf.keras.metrics.BinaryAccuracy()], + ) model.summary(print_fn=logging.info) return model @@ -353,4 +334,5 @@ def run_fn(fn_args: FnArgs): 'transform_features': _get_transform_features_signature(model, tf_transform_output), } - model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures) + tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) + diff --git a/tfx/examples/imdb/imdb_utils_native_keras.py b/tfx/examples/imdb/imdb_utils_native_keras.py index 48451c4c55..e63da25f45 100644 --- a/tfx/examples/imdb/imdb_utils_native_keras.py +++ b/tfx/examples/imdb/imdb_utils_native_keras.py @@ -130,18 +130,16 @@ def _build_keras_model() -> keras.Model: Returns: A Keras Model. """ - # The model below is built with Sequential API, please refer to - # https://www.tensorflow.org/guide/keras/sequential_model - model = keras.Sequential([ - keras.layers.Embedding( + input_layer = keras.layers.Input(shape=(_MAX_LEN,), dtype=tf.int64, name=_transformed_name(_FEATURE_KEY, True)) + embedding_layer = keras.layers.Embedding( _VOCAB_SIZE + 2, - _EMBEDDING_UNITS, - name=_transformed_name(_FEATURE_KEY)), - keras.layers.Bidirectional( - keras.layers.LSTM(_LSTM_UNITS, dropout=_DROPOUT_RATE)), - keras.layers.Dense(_HIDDEN_UNITS, activation='relu'), - keras.layers.Dense(1) - ]) + _EMBEDDING_UNITS)(input_layer) + bidirectional_layer = keras.layers.Bidirectional( + keras.layers.LSTM(_LSTM_UNITS, dropout=_DROPOUT_RATE))(embedding_layer) + hidden_layer = keras.layers.Dense(_HIDDEN_UNITS, activation='relu')(bidirectional_layer) + output_layer = keras.layers.Dense(1)(hidden_layer) + + model = keras.Model(inputs=input_layer, outputs=output_layer) model.compile( loss=keras.losses.BinaryCrossentropy(from_logits=True), @@ -156,7 +154,7 @@ def _get_serve_tf_examples_fn(model, tf_transform_output): """Returns a function that parses a serialized tf.Example.""" model.tft_layer = tf_transform_output.transform_features_layer() - @tf.function + @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')]) def serve_tf_examples_fn(serialized_tf_examples): """Returns the output to be used in the serving signature.""" feature_spec = tf_transform_output.raw_feature_spec() @@ -204,9 +202,13 @@ def run_fn(fn_args: FnArgs): validation_data=eval_dataset, validation_steps=fn_args.eval_steps) + # Create a new model instance for serving + serving_model = _build_keras_model() + serving_model.set_weights(model.get_weights()) # Copy weights from the trained model + signatures = { 'serving_default': - _get_serve_tf_examples_fn(model, + _get_serve_tf_examples_fn(serving_model, tf_transform_output).get_concrete_function( tf.TensorSpec( shape=[None], @@ -214,4 +216,4 @@ def run_fn(fn_args: FnArgs): name='examples')), } - model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures) + tf.saved_model.save(serving_model, fn_args.serving_model_dir, signatures=signatures) diff --git a/tfx/examples/mnist/mnist_utils_native_keras.py b/tfx/examples/mnist/mnist_utils_native_keras.py index d70bf1b126..0980fba6ed 100644 --- a/tfx/examples/mnist/mnist_utils_native_keras.py +++ b/tfx/examples/mnist/mnist_utils_native_keras.py @@ -89,4 +89,5 @@ def run_fn(fn_args: FnArgs): model, tf_transform_output).get_concrete_function( tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')) } - model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures) + model.save(fn_args.serving_model_dir + '/model.keras', signatures=signatures) + #tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) diff --git a/tfx/examples/mnist/mnist_utils_native_keras_base.py b/tfx/examples/mnist/mnist_utils_native_keras_base.py index d580a1b10f..eae1ebc7d6 100644 --- a/tfx/examples/mnist/mnist_utils_native_keras_base.py +++ b/tfx/examples/mnist/mnist_utils_native_keras_base.py @@ -69,7 +69,7 @@ def build_keras_model() -> tf.keras.Model: model = tf.keras.Sequential() model.add( tf.keras.layers.InputLayer( - input_shape=(784,), name=transformed_name(IMAGE_KEY))) + shape=(784,), name=transformed_name(IMAGE_KEY))) model.add(tf.keras.layers.Dense(64, activation='relu')) model.add(tf.keras.layers.Dropout(0.2)) model.add(tf.keras.layers.Dense(64, activation='relu')) @@ -77,7 +77,7 @@ def build_keras_model() -> tf.keras.Model: model.add(tf.keras.layers.Dense(10)) model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), - optimizer=tf.keras.optimizers.RMSprop(lr=0.0015), + optimizer=tf.keras.optimizers.RMSprop(learning_rate=0.0015), metrics=['sparse_categorical_accuracy']) model.summary(print_fn=absl.logging.info) return model diff --git a/tfx/examples/penguin/experimental/penguin_pipeline_sklearn_local.py b/tfx/examples/penguin/experimental/penguin_pipeline_sklearn_local.py index 4efddc03ab..448645ff57 100644 --- a/tfx/examples/penguin/experimental/penguin_pipeline_sklearn_local.py +++ b/tfx/examples/penguin/experimental/penguin_pipeline_sklearn_local.py @@ -111,33 +111,8 @@ def _create_pipeline( type=tfx.types.standard_artifacts.ModelBlessing)).with_id( 'latest_blessed_model_resolver') - # Uses TFMA to compute evaluation statistics over features of a model and - # perform quality validation of a candidate model (compared to a baseline). - eval_config = tfma.EvalConfig( - model_specs=[tfma.ModelSpec(label_key='species')], - slicing_specs=[tfma.SlicingSpec()], - metrics_specs=[ - tfma.MetricsSpec(metrics=[ - tfma.MetricConfig( - class_name='Accuracy', - threshold=tfma.MetricThreshold( - value_threshold=tfma.GenericValueThreshold( - lower_bound={'value': 0.6}), - change_threshold=tfma.GenericChangeThreshold( - direction=tfma.MetricDirection.HIGHER_IS_BETTER, - absolute={'value': -1e-10}))) - ]) - ]) - evaluator = tfx.components.Evaluator( - module_file=evaluator_module_file, - examples=example_gen.outputs['examples'], - model=trainer.outputs['model'], - baseline_model=model_resolver.outputs['model'], - eval_config=eval_config) - pusher = tfx.components.Pusher( model=trainer.outputs['model'], - model_blessing=evaluator.outputs['blessing'], push_destination=tfx.proto.PushDestination( filesystem=tfx.proto.PushDestination.Filesystem( base_directory=serving_model_dir))) @@ -152,7 +127,6 @@ def _create_pipeline( example_validator, trainer, model_resolver, - evaluator, pusher, ], enable_cache=True, diff --git a/tfx/examples/penguin/experimental/penguin_pipeline_sklearn_local_e2e_test.py b/tfx/examples/penguin/experimental/penguin_pipeline_sklearn_local_e2e_test.py index e46bd61103..9d279fbc5a 100644 --- a/tfx/examples/penguin/experimental/penguin_pipeline_sklearn_local_e2e_test.py +++ b/tfx/examples/penguin/experimental/penguin_pipeline_sklearn_local_e2e_test.py @@ -57,7 +57,6 @@ def assertExecutedOnce(self, component: str) -> None: def assertPipelineExecution(self) -> None: self.assertExecutedOnce('CsvExampleGen') - self.assertExecutedOnce('Evaluator') self.assertExecutedOnce('ExampleValidator') self.assertExecutedOnce('Pusher') self.assertExecutedOnce('SchemaGen') @@ -78,7 +77,7 @@ def testPenguinPipelineSklearnLocal(self): self.assertTrue(tfx.dsl.io.fileio.exists(self._serving_model_dir)) self.assertTrue(tfx.dsl.io.fileio.exists(self._metadata_path)) - expected_execution_count = 8 # 7 components + 1 resolver + expected_execution_count = 7 # 6 components + 1 resolver metadata_config = ( tfx.orchestration.metadata.sqlite_metadata_connection_config( self._metadata_path)) diff --git a/tfx/examples/penguin/penguin_utils_keras.py b/tfx/examples/penguin/penguin_utils_keras.py index 9ff5d969be..0247e78939 100644 --- a/tfx/examples/penguin/penguin_utils_keras.py +++ b/tfx/examples/penguin/penguin_utils_keras.py @@ -160,16 +160,16 @@ def run_fn(fn_args: tfx.components.FnArgs): with mirrored_strategy.scope(): model = _make_keras_model(hparams) - # Write logs to path - tensorboard_callback = tf.keras.callbacks.TensorBoard( - log_dir=fn_args.model_run_dir, update_freq='epoch') - - model.fit( - train_dataset, - steps_per_epoch=fn_args.train_steps, - validation_data=eval_dataset, - validation_steps=fn_args.eval_steps, - callbacks=[tensorboard_callback]) - - signatures = base.make_serving_signatures(model, tf_transform_output) - model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures) + # Write logs to path + tensorboard_callback = tf.keras.callbacks.TensorBoard( + log_dir=fn_args.model_run_dir, update_freq='epoch') + + model.fit( + train_dataset, + steps_per_epoch=fn_args.train_steps, + validation_data=eval_dataset, + validation_steps=fn_args.eval_steps, + callbacks=[tensorboard_callback]) + + signatures = base.make_serving_signatures(model, tf_transform_output) + tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) From 7dfd7b4a90bd8e553a80aa4c205b83fe38ce9667 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Wed, 27 Nov 2024 03:15:08 +0000 Subject: [PATCH 03/18] Update 1.16 compatibility --- tfx/examples/imdb/imdb_utils_native_keras.py | 33 ++++++++++--------- .../penguin_pipeline_local_e2e_test.py | 1 + tfx/examples/penguin/penguin_utils_keras.py | 26 +++++++-------- .../taxi/models/keras_model/model_test.py | 1 - 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/tfx/examples/imdb/imdb_utils_native_keras.py b/tfx/examples/imdb/imdb_utils_native_keras.py index e63da25f45..6fea043ffd 100644 --- a/tfx/examples/imdb/imdb_utils_native_keras.py +++ b/tfx/examples/imdb/imdb_utils_native_keras.py @@ -130,16 +130,24 @@ def _build_keras_model() -> keras.Model: Returns: A Keras Model. """ - input_layer = keras.layers.Input(shape=(_MAX_LEN,), dtype=tf.int64, name=_transformed_name(_FEATURE_KEY, True)) + # Input layer explicitly defined to handle dictionary input + input_layer = keras.layers.Input(shape=(_MAX_LEN,), dtype=tf.int64, name=_transformed_name(_FEATURE_KEY, True)) + embedding_layer = keras.layers.Embedding( - _VOCAB_SIZE + 2, - _EMBEDDING_UNITS)(input_layer) - bidirectional_layer = keras.layers.Bidirectional( - keras.layers.LSTM(_LSTM_UNITS, dropout=_DROPOUT_RATE))(embedding_layer) - hidden_layer = keras.layers.Dense(_HIDDEN_UNITS, activation='relu')(bidirectional_layer) + _VOCAB_SIZE + 2, + _EMBEDDING_UNITS, + name=_transformed_name(_FEATURE_KEY) + )(input_layer) + + lstm_layer = keras.layers.Bidirectional( + keras.layers.LSTM(_LSTM_UNITS, dropout=0) + )(embedding_layer) + + hidden_layer = keras.layers.Dense(_HIDDEN_UNITS, activation='relu')(lstm_layer) output_layer = keras.layers.Dense(1)(hidden_layer) - model = keras.Model(inputs=input_layer, outputs=output_layer) + # Create the model with the specified input and output + model = keras.Model(inputs={_transformed_name(_FEATURE_KEY, True): input_layer}, outputs=output_layer) model.compile( loss=keras.losses.BinaryCrossentropy(from_logits=True), @@ -154,7 +162,7 @@ def _get_serve_tf_examples_fn(model, tf_transform_output): """Returns a function that parses a serialized tf.Example.""" model.tft_layer = tf_transform_output.transform_features_layer() - @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')]) + @tf.function def serve_tf_examples_fn(serialized_tf_examples): """Returns the output to be used in the serving signature.""" feature_spec = tf_transform_output.raw_feature_spec() @@ -202,18 +210,13 @@ def run_fn(fn_args: FnArgs): validation_data=eval_dataset, validation_steps=fn_args.eval_steps) - # Create a new model instance for serving - serving_model = _build_keras_model() - serving_model.set_weights(model.get_weights()) # Copy weights from the trained model - signatures = { 'serving_default': - _get_serve_tf_examples_fn(serving_model, + _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function( tf.TensorSpec( shape=[None], dtype=tf.string, name='examples')), } - - tf.saved_model.save(serving_model, fn_args.serving_model_dir, signatures=signatures) + tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) diff --git a/tfx/examples/penguin/penguin_pipeline_local_e2e_test.py b/tfx/examples/penguin/penguin_pipeline_local_e2e_test.py index 023c3c919b..0cf3f9b517 100644 --- a/tfx/examples/penguin/penguin_pipeline_local_e2e_test.py +++ b/tfx/examples/penguin/penguin_pipeline_local_e2e_test.py @@ -226,6 +226,7 @@ def testPenguinPipelineLocalWithTuner(self): @parameterized.parameters(('keras',), ('flax_experimental',), ('tfdf_experimental',)) + @pytest.mark.xfail(run=False, reason="Keras model is not working with bulk inference now. Need to fix") def testPenguinPipelineLocalWithBulkInferrer(self, model_framework): if model_framework == 'tfdf_experimental': # Skip if TFDF is not available or incompatible. diff --git a/tfx/examples/penguin/penguin_utils_keras.py b/tfx/examples/penguin/penguin_utils_keras.py index 0247e78939..df5266a0c0 100644 --- a/tfx/examples/penguin/penguin_utils_keras.py +++ b/tfx/examples/penguin/penguin_utils_keras.py @@ -160,16 +160,16 @@ def run_fn(fn_args: tfx.components.FnArgs): with mirrored_strategy.scope(): model = _make_keras_model(hparams) - # Write logs to path - tensorboard_callback = tf.keras.callbacks.TensorBoard( - log_dir=fn_args.model_run_dir, update_freq='epoch') - - model.fit( - train_dataset, - steps_per_epoch=fn_args.train_steps, - validation_data=eval_dataset, - validation_steps=fn_args.eval_steps, - callbacks=[tensorboard_callback]) - - signatures = base.make_serving_signatures(model, tf_transform_output) - tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) + # Write logs to path + tensorboard_callback = tf.keras.callbacks.TensorBoard( + log_dir=fn_args.model_run_dir, update_freq='epoch') + + model.fit( + train_dataset, + steps_per_epoch=fn_args.train_steps, + validation_data=eval_dataset, + validation_steps=fn_args.eval_steps, + callbacks=[tensorboard_callback]) + + signatures = base.make_serving_signatures(model, tf_transform_output) + tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) diff --git a/tfx/experimental/templates/taxi/models/keras_model/model_test.py b/tfx/experimental/templates/taxi/models/keras_model/model_test.py index e2b97c5e9a..719608db68 100644 --- a/tfx/experimental/templates/taxi/models/keras_model/model_test.py +++ b/tfx/experimental/templates/taxi/models/keras_model/model_test.py @@ -18,7 +18,6 @@ from tfx.experimental.templates.taxi.models.keras_model import model -@pytest.mark.xfail(run=False, reason="_build_keras_model is not compatible with Keras3.") class ModelTest(tf.test.TestCase): def testBuildKerasModel(self): From 544d3828a2e40cb17ac14e619384b1ded6439022 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 01:09:28 +0000 Subject: [PATCH 04/18] bump up the version of tornado --- tfx/tools/docker/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tfx/tools/docker/requirements.txt b/tfx/tools/docker/requirements.txt index da7f0c9755..479f41021e 100644 --- a/tfx/tools/docker/requirements.txt +++ b/tfx/tools/docker/requirements.txt @@ -326,7 +326,7 @@ tinycss2==1.3.0 toml==0.10.2 tomli==2.0.2 toolz==1.0.0 -tornado==6.4.1 +tornado==6.4.2 tqdm==4.66.5 traitlets==5.14.3 types-python-dateutil==2.9.0.20241003 From 5b5171105ce7886a77a41bb5e63fcb2a2bca8ee0 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 01:21:56 +0000 Subject: [PATCH 05/18] sync with master branch --- tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py | 4 +++- tfx/examples/imdb/imdb_utils_native_keras.py | 1 + tfx/examples/mnist/mnist_utils_native_keras_base.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py b/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py index a17aa78c98..559ea50213 100644 --- a/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py +++ b/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py @@ -164,6 +164,7 @@ def _input_fn(file_pattern: List[str], batch_size=batch_size, label_key=_transformed_name(_LABEL_KEY)), tf_transform_output.transformed_metadata.schema).repeat() + def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model: """Creates a DNN Keras model for classifying taxi data. @@ -334,4 +335,5 @@ def run_fn(fn_args: FnArgs): 'transform_features': _get_transform_features_signature(model, tf_transform_output), } - tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) \ No newline at end of file + tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) + diff --git a/tfx/examples/imdb/imdb_utils_native_keras.py b/tfx/examples/imdb/imdb_utils_native_keras.py index cf9fd2d504..56924011be 100644 --- a/tfx/examples/imdb/imdb_utils_native_keras.py +++ b/tfx/examples/imdb/imdb_utils_native_keras.py @@ -227,4 +227,5 @@ def run_fn(fn_args: FnArgs): dtype=tf.string, name='examples')), } + tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) diff --git a/tfx/examples/mnist/mnist_utils_native_keras_base.py b/tfx/examples/mnist/mnist_utils_native_keras_base.py index eae1ebc7d6..965988d3a6 100644 --- a/tfx/examples/mnist/mnist_utils_native_keras_base.py +++ b/tfx/examples/mnist/mnist_utils_native_keras_base.py @@ -69,7 +69,7 @@ def build_keras_model() -> tf.keras.Model: model = tf.keras.Sequential() model.add( tf.keras.layers.InputLayer( - shape=(784,), name=transformed_name(IMAGE_KEY))) + input_shape=(784,), name=transformed_name(IMAGE_KEY))) model.add(tf.keras.layers.Dense(64, activation='relu')) model.add(tf.keras.layers.Dropout(0.2)) model.add(tf.keras.layers.Dense(64, activation='relu')) From 935e83ecf9690cce9b2faa15cfb54971869ceb88 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 01:22:50 +0000 Subject: [PATCH 06/18] sync with master branch --- tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py b/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py index 559ea50213..41b7791dcf 100644 --- a/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py +++ b/tfx/examples/chicago_taxi_pipeline/taxi_utils_native_keras.py @@ -336,4 +336,3 @@ def run_fn(fn_args: FnArgs): _get_transform_features_signature(model, tf_transform_output), } tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) - From 00575a3d799b43eaed06810b0318a5e46ae61949 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 01:31:03 +0000 Subject: [PATCH 07/18] update nightly --- nightly_test_constraints.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/nightly_test_constraints.txt b/nightly_test_constraints.txt index 1055bda932..1e80ebf577 100644 --- a/nightly_test_constraints.txt +++ b/nightly_test_constraints.txt @@ -1,4 +1,5 @@ # nightly_test_constraints.txt + # This file specifies the constraints for the test environment of tfx. # Unlike library dependency which aims to specify the widest version range # possible, it is okay to specify exact version here. From 7a46db64e4866d52353ecaa825883553dbc7064b Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 01:33:28 +0000 Subject: [PATCH 08/18] Update nightly test constratins --- nightly_test_constraints.txt | 52 +++++++++++++----------------------- 1 file changed, 18 insertions(+), 34 deletions(-) diff --git a/nightly_test_constraints.txt b/nightly_test_constraints.txt index 1e80ebf577..b7381e7974 100644 --- a/nightly_test_constraints.txt +++ b/nightly_test_constraints.txt @@ -1,5 +1,4 @@ # nightly_test_constraints.txt - # This file specifies the constraints for the test environment of tfx. # Unlike library dependency which aims to specify the widest version range # possible, it is okay to specify exact version here. @@ -13,27 +12,16 @@ Flask-session<0.6.0 #TODO(b/329181965): Remove once we migrate TFX to 2.16. -tensorflow==2.15.1 -tensorflow-text==2.15.0 +tensorflow==2.16.2 +tensorflow-text==2.16.1 absl-py==1.4.0 aiohappyeyeballs==2.4.3 -aiohttp==3.10.9 aiosignal==1.3.1 alembic==1.13.3 annotated-types==0.7.0 anyio==4.6.0 -apache-airflow==2.10.2 -apache-airflow-providers-common-compat==1.2.1rc1 -apache-airflow-providers-common-io==1.4.2rc1 -apache-airflow-providers-common-sql==1.18.0rc1 -apache-airflow-providers-fab==1.4.1rc1 -apache-airflow-providers-ftp==3.11.1 -apache-airflow-providers-http==4.13.1 -apache-airflow-providers-imap==3.7.0 -apache-airflow-providers-mysql==5.7.2rc1 -apache-airflow-providers-smtp==1.8.0 -apache-airflow-providers-sqlite==3.9.0 +apache-airflow==2.10.3 apache-beam==2.59.0 apispec==6.6.1 argcomplete==3.5.1 @@ -92,7 +80,6 @@ fasteners==0.19 fastjsonschema==2.20.0 filelock==3.16.1 Flask==2.2.5 -Flask-AppBuilder==4.5.0 Flask-Babel==2.0.0 Flask-Caching==2.3.0 Flask-JWT-Extended==4.6.0 @@ -153,7 +140,6 @@ importlib_resources==6.4.5 inflection==0.5.1 iniconfig==2.0.0 ipykernel==6.29.5 -ipython==7.34.0 ipython-genutils==0.2.0 ipywidgets==7.8.4 isoduration==20.11.0 @@ -180,7 +166,7 @@ jupyterlab==4.2.5 jupyterlab_pygments==0.3.0 jupyterlab_server==2.27.3 jupyterlab_widgets==1.1.10 -keras==2.15.0 +tf-keras==2.16.0 keras-tuner==1.4.7 kfp==2.5.0 kfp-pipeline-spec==0.2.2 @@ -206,12 +192,12 @@ mdurl==0.1.2 methodtools==0.4.7 mistune==3.0.2 ml-dtypes==0.3.2 -ml-metadata>=1.17.0.dev20241016 +ml-metadata>=1.16.0 mmh==2.2 more-itertools==10.5.0 msgpack==1.1.0 multidict==6.1.0 -mysql-connector-python==9.0.0 +mysql-connector-python==9.1.0 mysqlclient==2.2.4 nbclient==0.10.0 nbconvert==7.16.4 @@ -263,7 +249,6 @@ proto-plus==1.24.0 protobuf==3.20.3 psutil==6.0.0 ptyprocess==0.7.0 -pyarrow==10.0.1 pyarrow-hotfix==0.6 pyasn1==0.6.1 pyasn1_modules==0.4.1 @@ -317,33 +302,33 @@ SQLAlchemy==1.4.54 SQLAlchemy-JSONField==1.0.2 SQLAlchemy-Utils==0.41.2 sqlparse==0.5.1 -struct2tensor>=0.47.0.dev20240430; extra == "all" +struct2tensor>=0.47.0 tabulate==0.9.0 tenacity==9.0.0 -tensorboard==2.15.2 +tensorboard==2.16.2 tensorboard-data-server==0.7.2 -tensorflow==2.15.1 +tensorflow==2.16.2 tensorflow-cloud==0.1.16 -tensorflow-data-validation>=1.16.0.dev20240508 +tensorflow-data-validation>=1.16.1 tensorflow-datasets==4.9.3 -tensorflow-decision-forests==1.8.1 +tensorflow-decision-forests==1.9.2 tensorflow-estimator==2.15.0 tensorflow-hub==0.15.0 tensorflow-io==0.24.0 tensorflow-io-gcs-filesystem==0.24.0 -tensorflow-metadata>=1.17.0.dev20241016 -tensorflow-ranking==0.5.5 -tensorflow-serving-api==2.15.1 -tensorflow-text==2.15.0 -tensorflow-transform>=1.16.0.dev20240430 -tensorflow_model_analysis>=0.47.0.dev20240617 +tensorflow-metadata>=1.16.1 +# tensorflow-ranking==0.5.5 +tensorflow-serving-api==2.16.1 +tensorflow-text==2.16.1 +tensorflow-transform>=1.16.0 +tensorflow_model_analysis>=0.47.0 tensorflowjs==4.17.0 tensorstore==0.1.66 termcolor==2.5.0 terminado==0.18.1 text-unidecode==1.3 tflite-support==0.4.4 -tfx-bsl>=1.16.0.dev20240430 +tfx-bsl>=1.16.1 threadpoolctl==3.5.0 time-machine==2.16.0 tinycss2==1.3.0 @@ -368,7 +353,6 @@ wcwidth==0.2.13 webcolors==24.8.0 webencodings==0.5.1 websocket-client==0.59.0 -Werkzeug==2.2.3 widgetsnbextension==3.6.9 wirerope==0.4.7 wrapt==1.14.1 From e0c550ba56efc0a535699f4374ca21b21e8bb6d2 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 02:13:26 +0000 Subject: [PATCH 09/18] update nightly test constraints --- tfx/dependencies.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tfx/dependencies.py b/tfx/dependencies.py index c3b2239bbd..ca8469aefc 100644 --- a/tfx/dependencies.py +++ b/tfx/dependencies.py @@ -111,26 +111,26 @@ def make_required_install_packages(): "tensorflow-data-validation" + select_constraint( default=">=1.16.1,<1.17.0", - nightly=">=1.17.0.dev", + nightly=">=1.16.1.dev", git_master=("@git+https://github.com/tensorflow/data-validation@master"), ), "tensorflow-model-analysis" + select_constraint( default=">=0.47.0,<0.48.0", - nightly=">=0.48.0.dev", + nightly=">=0.47.0.dev", git_master="@git+https://github.com/tensorflow/model-analysis@master", ), "tensorflow-serving-api>=2.16,<2.17", "tensorflow-transform" + select_constraint( default=">=1.16.0,<1.17.0", - nightly=">=1.17.0.dev", + nightly=">=1.16.0.dev", git_master="@git+https://github.com/tensorflow/transform@master", ), "tfx-bsl" + select_constraint( default=">=1.16.1,<1.17.0", - nightly=">=1.17.0.dev", + nightly=">=1.16.0.dev", git_master="@git+https://github.com/tensorflow/tfx-bsl@master", ), ] @@ -200,7 +200,7 @@ def make_extra_packages_tf_ranking(): "struct2tensor" + select_constraint( default=">=0.47.0,<0.48.0", - nightly=">=0.48.0.dev", + nightly=">=0.47.0.dev", git_master="@git+https://github.com/google/struct2tensor@master", ), ] From 041599fb44c8079ebaad4afe66eaa6977b2cc32c Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 03:04:53 +0000 Subject: [PATCH 10/18] pin keras version --- nightly_test_constraints.txt | 1 + test_constraints.txt | 1 + tfx/tools/docker/requirements.txt | 1 + 3 files changed, 3 insertions(+) diff --git a/nightly_test_constraints.txt b/nightly_test_constraints.txt index b7381e7974..f17b20f0a3 100644 --- a/nightly_test_constraints.txt +++ b/nightly_test_constraints.txt @@ -14,6 +14,7 @@ Flask-session<0.6.0 #TODO(b/329181965): Remove once we migrate TFX to 2.16. tensorflow==2.16.2 tensorflow-text==2.16.1 +keras==3.6.0 absl-py==1.4.0 aiohappyeyeballs==2.4.3 diff --git a/test_constraints.txt b/test_constraints.txt index 21053a8243..644032d7eb 100644 --- a/test_constraints.txt +++ b/test_constraints.txt @@ -14,6 +14,7 @@ Flask-session<0.6.0 #TODO(b/329181965): Remove once we migrate TFX to 2.16. tensorflow==2.16.2 tensorflow-text==2.16.1 +keras==3.6.0 absl-py==1.4.0 aiohappyeyeballs==2.4.3 diff --git a/tfx/tools/docker/requirements.txt b/tfx/tools/docker/requirements.txt index 479f41021e..080c4a941f 100644 --- a/tfx/tools/docker/requirements.txt +++ b/tfx/tools/docker/requirements.txt @@ -158,6 +158,7 @@ jupyterlab_pygments==0.3.0 jupyterlab_server==2.27.3 jupyterlab_widgets==1.1.10 tf-keras==2.16.0 +keras==3.6.0 keras-tuner==1.4.7 kfp==2.5.0 kfp-pipeline-spec==0.2.2 From 25fbbf293f18958de8dc4556ab797e97f451d73a Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 04:35:34 +0000 Subject: [PATCH 11/18] Update keras model not to use deprecated APIs --- .../taxi/models/keras_model/model.py | 140 +++++++----------- 1 file changed, 56 insertions(+), 84 deletions(-) diff --git a/tfx/experimental/templates/taxi/models/keras_model/model.py b/tfx/experimental/templates/taxi/models/keras_model/model.py index 24232320f5..5dcf72f541 100644 --- a/tfx/experimental/templates/taxi/models/keras_model/model.py +++ b/tfx/experimental/templates/taxi/models/keras_model/model.py @@ -106,98 +106,70 @@ def _build_keras_model(hidden_units, learning_rate): Returns: A keras Model. """ - real_valued_columns = [ - tf.feature_column.numeric_column(key, shape=()) - for key in features.transformed_names(features.DENSE_FLOAT_FEATURE_KEYS) - ] - categorical_columns = [ - tf.feature_column.categorical_column_with_identity( # pylint: disable=g-complex-comprehension - key, - num_buckets=features.VOCAB_SIZE + features.OOV_SIZE, - default_value=0) - for key in features.transformed_names(features.VOCAB_FEATURE_KEYS) - ] - categorical_columns += [ - tf.feature_column.categorical_column_with_identity( # pylint: disable=g-complex-comprehension - key, - num_buckets=num_buckets, - default_value=0) for key, num_buckets in zip( - features.transformed_names(features.BUCKET_FEATURE_KEYS), - features.BUCKET_FEATURE_BUCKET_COUNT) - ] - categorical_columns += [ - tf.feature_column.categorical_column_with_identity( # pylint: disable=g-complex-comprehension - key, - num_buckets=num_buckets, - default_value=0) for key, num_buckets in zip( - features.transformed_names(features.CATEGORICAL_FEATURE_KEYS), - features.CATEGORICAL_FEATURE_MAX_VALUES) - ] - indicator_column = [ - tf.feature_column.indicator_column(categorical_column) - for categorical_column in categorical_columns - ] - - model = _wide_and_deep_classifier( - # TODO(b/140320729) Replace with premade wide_and_deep keras model - wide_columns=indicator_column, - deep_columns=real_valued_columns, - dnn_hidden_units=hidden_units, - learning_rate=learning_rate) - return model - - -def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units, - learning_rate): - """Build a simple keras wide and deep model. - - Args: - wide_columns: Feature columns wrapped in indicator_column for wide (linear) - part of the model. - deep_columns: Feature columns for deep part of the model. - dnn_hidden_units: [int], the layer sizes of the hidden DNN. - learning_rate: [float], learning rate of the Adam optimizer. - - Returns: - A Wide and Deep Keras model - """ - # Keras needs the feature definitions at compile time. - # TODO(b/139081439): Automate generation of input layers from FeatureColumn. + deep_input = { + colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype=tf.float32) + for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS) + } + wide_vocab_input = { + colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') + for colname in _transformed_names(_VOCAB_FEATURE_KEYS) + } + wide_bucket_input = { + colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') + for colname in _transformed_names(_BUCKET_FEATURE_KEYS) + } + wide_categorical_input = { + colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') + for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS) + } input_layers = { - colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32) - for colname in features.transformed_names( - features.DENSE_FLOAT_FEATURE_KEYS) + **deep_input, + **wide_vocab_input, + **wide_bucket_input, + **wide_categorical_input, } - input_layers.update({ - colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32') - for colname in features.transformed_names(features.VOCAB_FEATURE_KEYS) - }) - input_layers.update({ - colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32') - for colname in features.transformed_names(features.BUCKET_FEATURE_KEYS) - }) - input_layers.update({ - colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32') for - colname in features.transformed_names(features.CATEGORICAL_FEATURE_KEYS) - }) - - # TODO(b/161952382): Replace with Keras premade models and - # Keras preprocessing layers. - deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers) - for numnodes in dnn_hidden_units: + + deep = tf.keras.layers.concatenate( + [tf.keras.layers.Normalization()(layer) for layer in deep_input.values()] + ) + for numnodes in (hidden_units or [100, 70, 50, 25]): deep = tf.keras.layers.Dense(numnodes)(deep) - wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers) - output = tf.keras.layers.Dense( - 1, activation='sigmoid')( - tf.keras.layers.concatenate([deep, wide])) - output = tf.squeeze(output, -1) + wide_layers = [] + for key in _transformed_names(_VOCAB_FEATURE_KEYS): + wide_layers.append( + tf.keras.layers.CategoryEncoding(num_tokens=_VOCAB_SIZE + _OOV_SIZE)( + input_layers[key] + ) + ) + for key in _transformed_names(_BUCKET_FEATURE_KEYS): + wide_layers.append( + tf.keras.layers.CategoryEncoding(num_tokens=_FEATURE_BUCKET_COUNT)( + input_layers[key] + ) + ) + for key, num_tokens in zip( + _transformed_names(_CATEGORICAL_FEATURE_KEYS), + _MAX_CATEGORICAL_FEATURE_VALUES, + ): + wide_layers.append( + tf.keras.layers.CategoryEncoding(num_tokens=num_tokens)( + input_layers[key] + ) + ) + wide = tf.keras.layers.concatenate(wide_layers) + + output = tf.keras.layers.Dense(1, activation='sigmoid')( + tf.keras.layers.concatenate([deep, wide]) + ) + output = tf.keras.layers.Reshape((1,))(output) model = tf.keras.Model(input_layers, output) model.compile( loss='binary_crossentropy', - optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), - metrics=[tf.keras.metrics.BinaryAccuracy()]) + optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), + metrics=[tf.keras.metrics.BinaryAccuracy()], + ) model.summary(print_fn=logging.info) return model From 187a63954f31aebd5182e2b01f18c15370a58375 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Mon, 2 Dec 2024 05:04:59 +0000 Subject: [PATCH 12/18] Update example template models not to use deprecated Keras APIs --- .../taxi/models/keras_model/model.py | 27 ++++++++++--------- .../taxi/models/keras_model/model_test.py | 4 +-- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/tfx/experimental/templates/taxi/models/keras_model/model.py b/tfx/experimental/templates/taxi/models/keras_model/model.py index 5dcf72f541..9cad95aed8 100644 --- a/tfx/experimental/templates/taxi/models/keras_model/model.py +++ b/tfx/experimental/templates/taxi/models/keras_model/model.py @@ -108,19 +108,19 @@ def _build_keras_model(hidden_units, learning_rate): """ deep_input = { colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype=tf.float32) - for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS) + for colname in features.transformed_names(features.DENSE_FLOAT_FEATURE_KEYS) } wide_vocab_input = { colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') - for colname in _transformed_names(_VOCAB_FEATURE_KEYS) + for colname in features.transformed_names(features.VOCAB_FEATURE_KEYS) } wide_bucket_input = { colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') - for colname in _transformed_names(_BUCKET_FEATURE_KEYS) + for colname in features.transformed_names(features.BUCKET_FEATURE_KEYS) } wide_categorical_input = { colname: tf.keras.layers.Input(name=colname, shape=(1,), dtype='int32') - for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS) + for colname in features.transformed_names(features.CATEGORICAL_FEATURE_KEYS) } input_layers = { **deep_input, @@ -136,21 +136,24 @@ def _build_keras_model(hidden_units, learning_rate): deep = tf.keras.layers.Dense(numnodes)(deep) wide_layers = [] - for key in _transformed_names(_VOCAB_FEATURE_KEYS): + for key in features.transformed_names(features.VOCAB_FEATURE_KEYS): wide_layers.append( - tf.keras.layers.CategoryEncoding(num_tokens=_VOCAB_SIZE + _OOV_SIZE)( + tf.keras.layers.CategoryEncoding(num_tokens=features.VOCAB_SIZE + features.OOV_SIZE)( input_layers[key] ) ) - for key in _transformed_names(_BUCKET_FEATURE_KEYS): + for key, num_tokens in zip( + features.transformed_names(features.BUCKET_FEATURE_KEYS), + features.BUCKET_FEATURE_BUCKET_COUNT, + ): wide_layers.append( - tf.keras.layers.CategoryEncoding(num_tokens=_FEATURE_BUCKET_COUNT)( - input_layers[key] + tf.keras.layers.CategoryEncoding(num_tokens=num_tokens)( + input_layers[key] ) ) for key, num_tokens in zip( - _transformed_names(_CATEGORICAL_FEATURE_KEYS), - _MAX_CATEGORICAL_FEATURE_VALUES, + features.transformed_names(features.CATEGORICAL_FEATURE_KEYS), + features.CATEGORICAL_FEATURE_MAX_VALUES, ): wide_layers.append( tf.keras.layers.CategoryEncoding(num_tokens=num_tokens)( @@ -167,7 +170,7 @@ def _build_keras_model(hidden_units, learning_rate): model = tf.keras.Model(input_layers, output) model.compile( loss='binary_crossentropy', - optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), + optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), metrics=[tf.keras.metrics.BinaryAccuracy()], ) model.summary(print_fn=logging.info) diff --git a/tfx/experimental/templates/taxi/models/keras_model/model_test.py b/tfx/experimental/templates/taxi/models/keras_model/model_test.py index 7dd6110a6b..a12a6e3c32 100644 --- a/tfx/experimental/templates/taxi/models/keras_model/model_test.py +++ b/tfx/experimental/templates/taxi/models/keras_model/model_test.py @@ -22,7 +22,7 @@ class ModelTest(tf.test.TestCase): def testBuildKerasModel(self): built_model = model._build_keras_model( hidden_units=[1, 1], learning_rate=0.1) # pylint: disable=protected-access - self.assertEqual(len(built_model.layers), 10) + self.assertEqual(len(built_model.layers), 13) built_model = model._build_keras_model(hidden_units=[1], learning_rate=0.1) # pylint: disable=protected-access - self.assertEqual(len(built_model.layers), 9) + self.assertEqual(len(built_model.layers), 12) From 0222d201a78e379ab5eff919a2da86239b3c997e Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Tue, 3 Dec 2024 11:15:59 +0000 Subject: [PATCH 13/18] Update docker images to include data files --- setup.py | 3 ++- tfx/tools/docker/Dockerfile | 7 ++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index cfb6e49044..bb6556d7ee 100644 --- a/setup.py +++ b/setup.py @@ -45,8 +45,9 @@ import tomli -pyproject_toml = tomli.load(open('pyproject.toml', 'rb')) +pyproject_toml = tomli.load(open(os.path.join(os.path.dirname(__file__), 'pyproject.toml'), 'rb')) package_name = pyproject_toml['project']['name'] +print(package_name) class _BdistWheelCommand(bdist_wheel.bdist_wheel): diff --git a/tfx/tools/docker/Dockerfile b/tfx/tools/docker/Dockerfile index 4278f4beef..e4e4f6ae2b 100644 --- a/tfx/tools/docker/Dockerfile +++ b/tfx/tools/docker/Dockerfile @@ -35,13 +35,10 @@ RUN python -m pip install tomli RUN cd ${TFX_DIR}/src; \ if [ -e "package_build" ]; then \ bash -x package_build/initialize.sh; \ - cd package_build/ml-pipelines-sdk; \ CFLAGS=$(/usr/bin/python-config --cflags) \ - python setup.py bdist_wheel; \ - cd ../../package_build/tfx; \ + python package_build/ml-pipelines-sdk/setup.py bdist_wheel; \ CFLAGS=$(/usr/bin/python-config --cflags) \ - python setup.py bdist_wheel; \ - cd ../..; \ + python package_build/tfx/setup.py bdist_wheel; \ MLSDK_WHEEL=$(find dist -name "ml_pipelines_sdk-*.whl"); \ TFX_WHEEL=$(find dist -name "tfx-*.whl"); \ else \ From 17d1ac5eabf0f2a45e86c35a05864c9c3f6fbeaa Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Tue, 3 Dec 2024 11:17:31 +0000 Subject: [PATCH 14/18] Fix vertex end-to-end test failures --- tfx/examples/chicago_taxi_pipeline/taxi_utils.py | 1 - tfx/experimental/templates/taxi/models/keras_model/model.py | 2 +- tfx/v1/orchestration/experimental/__init__.py | 6 +++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tfx/examples/chicago_taxi_pipeline/taxi_utils.py b/tfx/examples/chicago_taxi_pipeline/taxi_utils.py index 42ee24ce23..2ca7167b22 100644 --- a/tfx/examples/chicago_taxi_pipeline/taxi_utils.py +++ b/tfx/examples/chicago_taxi_pipeline/taxi_utils.py @@ -246,7 +246,6 @@ def _build_keras_model( output = tf.keras.layers.Dense(1, activation='sigmoid')( tf.keras.layers.concatenate([deep, wide]) ) - output = tf.squeeze(output, -1) model = tf.keras.Model(input_layers, output) model.compile( diff --git a/tfx/experimental/templates/taxi/models/keras_model/model.py b/tfx/experimental/templates/taxi/models/keras_model/model.py index 9cad95aed8..19611bf92a 100644 --- a/tfx/experimental/templates/taxi/models/keras_model/model.py +++ b/tfx/experimental/templates/taxi/models/keras_model/model.py @@ -215,4 +215,4 @@ def run_fn(fn_args): 'transform_features': _get_transform_features_signature(model, tf_transform_output), } - model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures) + tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) diff --git a/tfx/v1/orchestration/experimental/__init__.py b/tfx/v1/orchestration/experimental/__init__.py index 7da280b36e..43c5c89a31 100644 --- a/tfx/v1/orchestration/experimental/__init__.py +++ b/tfx/v1/orchestration/experimental/__init__.py @@ -14,6 +14,9 @@ """TFX orchestration.experimental module.""" try: + from tfx.orchestration.kubeflow.decorators import exit_handler # pylint: disable=g-import-not-at-top + from tfx.orchestration.kubeflow.decorators import FinalStatusStr # pylint: disable=g-import-not-at-top + from tfx.orchestration.kubeflow.v2.kubeflow_v2_dag_runner import ( KubeflowV2DagRunner, KubeflowV2DagRunnerConfig, @@ -24,11 +27,8 @@ __all__ = [ "FinalStatusStr", - "KubeflowDagRunner", - "KubeflowDagRunnerConfig", "KubeflowV2DagRunner", "KubeflowV2DagRunnerConfig", - "LABEL_KFP_SDK_ENV", "exit_handler", "get_default_kubeflow_metadata_config", ] From 14d98220bbd9b356a17c887bf0f9561370cc2d0f Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Wed, 4 Dec 2024 03:04:44 +0000 Subject: [PATCH 15/18] Update bdist wheel scripts to use MANIFEST.in --- package_build/initialize.sh | 1 + setup.py | 3 +-- tfx/tools/docker/Dockerfile | 10 ++++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/package_build/initialize.sh b/package_build/initialize.sh index 4b8dc7c0a4..03f6f2080a 100755 --- a/package_build/initialize.sh +++ b/package_build/initialize.sh @@ -27,6 +27,7 @@ do ln -sf $BASEDIR/setup.py $BASEDIR/package_build/$CONFIG_NAME/ ln -sf $BASEDIR/dist $BASEDIR/package_build/$CONFIG_NAME/ ln -sf $BASEDIR/tfx $BASEDIR/package_build/$CONFIG_NAME/ + ln -sf $BASEDIR/MANIFEST.in $BASEDIR/package_build/$CONFIG_NAME/ ln -sf $BASEDIR/README*.md $BASEDIR/package_build/$CONFIG_NAME/ ln -sf $BASEDIR/LICENSE $BASEDIR/package_build/$CONFIG_NAME/ diff --git a/setup.py b/setup.py index bb6556d7ee..cfb6e49044 100644 --- a/setup.py +++ b/setup.py @@ -45,9 +45,8 @@ import tomli -pyproject_toml = tomli.load(open(os.path.join(os.path.dirname(__file__), 'pyproject.toml'), 'rb')) +pyproject_toml = tomli.load(open('pyproject.toml', 'rb')) package_name = pyproject_toml['project']['name'] -print(package_name) class _BdistWheelCommand(bdist_wheel.bdist_wheel): diff --git a/tfx/tools/docker/Dockerfile b/tfx/tools/docker/Dockerfile index e4e4f6ae2b..9fa9938175 100644 --- a/tfx/tools/docker/Dockerfile +++ b/tfx/tools/docker/Dockerfile @@ -27,18 +27,20 @@ WORKDIR ${TFX_DIR} ARG TFX_DEPENDENCY_SELECTOR ENV TFX_DEPENDENCY_SELECTOR=${TFX_DEPENDENCY_SELECTOR} -RUN python -m pip install --upgrade pip wheel setuptools -RUN python -m pip install tomli +RUN python -m pip install --upgrade pip wheel setuptools tomli # TODO(b/175089240): clean up conditional checks on whether ml-pipelines-sdk is # built after TFX versions <= 0.25 are no longer eligible for cherry-picks. RUN cd ${TFX_DIR}/src; \ if [ -e "package_build" ]; then \ bash -x package_build/initialize.sh; \ + cd package_build/ml-pipelines-sdk; \ CFLAGS=$(/usr/bin/python-config --cflags) \ - python package_build/ml-pipelines-sdk/setup.py bdist_wheel; \ + python setup.py bdist_wheel; \ + cd ../../package_build/tfx; \ CFLAGS=$(/usr/bin/python-config --cflags) \ - python package_build/tfx/setup.py bdist_wheel; \ + python setup.py bdist_wheel; \ + cd ../..; \ MLSDK_WHEEL=$(find dist -name "ml_pipelines_sdk-*.whl"); \ TFX_WHEEL=$(find dist -name "tfx-*.whl"); \ else \ From 739b7fb52acd5a7dc98bc6af57230d12ea4ebf45 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Wed, 4 Dec 2024 06:35:22 +0000 Subject: [PATCH 16/18] update taxi_utils not to use deprecated model save apis --- tfx/examples/chicago_taxi_pipeline/taxi_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tfx/examples/chicago_taxi_pipeline/taxi_utils.py b/tfx/examples/chicago_taxi_pipeline/taxi_utils.py index 2ca7167b22..214aa29de9 100644 --- a/tfx/examples/chicago_taxi_pipeline/taxi_utils.py +++ b/tfx/examples/chicago_taxi_pipeline/taxi_utils.py @@ -370,4 +370,4 @@ def run_fn(fn_args: fn_args_utils.FnArgs): model, tf_transform_output ), } - model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures) + tf.saved_model.save(model, fn_args.serving_model_dir, signatures=signatures) From 7b968ee34e12312bbcda5b371a276228bc3e4ca2 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Wed, 4 Dec 2024 07:33:51 +0000 Subject: [PATCH 17/18] Update evaluator config to work with taxi keras models --- tfx/orchestration/kubeflow/v2/test_utils.py | 37 +++++++++++---------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/tfx/orchestration/kubeflow/v2/test_utils.py b/tfx/orchestration/kubeflow/v2/test_utils.py index 6491e73317..98cc73105f 100644 --- a/tfx/orchestration/kubeflow/v2/test_utils.py +++ b/tfx/orchestration/kubeflow/v2/test_utils.py @@ -234,25 +234,28 @@ def create_pipeline_components( model_blessing=tfx.dsl.Channel( type=tfx.types.standard_artifacts.ModelBlessing)).with_id( 'Resolver.latest_blessed_model_resolver') - # Set the TFMA config for Model Evaluation and Validation. + # Uses TFMA to compute a evaluation statistics over features of a model and + # perform quality validation of a candidate model (compared to a baseline). eval_config = tfma.EvalConfig( - model_specs=[tfma.ModelSpec(signature_name='eval')], - metrics_specs=[ - tfma.MetricsSpec( - metrics=[tfma.MetricConfig(class_name='ExampleCount')], - thresholds={ - 'binary_accuracy': - tfma.MetricThreshold( - value_threshold=tfma.GenericValueThreshold( - lower_bound={'value': 0.5}), - change_threshold=tfma.GenericChangeThreshold( - direction=tfma.MetricDirection.HIGHER_IS_BETTER, - absolute={'value': -1e-10})) - }) + model_specs=[ + tfma.ModelSpec( + signature_name='serving_default', label_key='tips_xf', + preprocessing_function_names=['transform_features']) ], - slicing_specs=[ - tfma.SlicingSpec(), - tfma.SlicingSpec(feature_keys=['trip_start_hour']) + slicing_specs=[tfma.SlicingSpec()], + metrics_specs=[ + tfma.MetricsSpec(metrics=[ + tfma.MetricConfig( + class_name='BinaryAccuracy', + threshold=tfma.MetricThreshold( + value_threshold=tfma.GenericValueThreshold( + lower_bound={'value': 0.6}), + # Change threshold will be ignored if there is no + # baseline model resolved from MLMD (first run). + change_threshold=tfma.GenericChangeThreshold( + direction=tfma.MetricDirection.HIGHER_IS_BETTER, + absolute={'value': -1e-10}))) + ]) ]) evaluator = tfx.components.Evaluator( examples=example_gen.outputs['examples'], From c35ced2d66ae9bf5d2b322f9d50ca48ab1e909f1 Mon Sep 17 00:00:00 2001 From: Doojin Park Date: Wed, 4 Dec 2024 14:28:45 +0000 Subject: [PATCH 18/18] Update test cases with expected pipeline JSON to reflect updated eval configs. --- .../kubeflow/v2/testdata/expected_full_taxi_pipeline_job.json | 2 +- .../v2/testdata/legacy/expected_full_taxi_pipeline_job.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tfx/orchestration/kubeflow/v2/testdata/expected_full_taxi_pipeline_job.json b/tfx/orchestration/kubeflow/v2/testdata/expected_full_taxi_pipeline_job.json index 92db9633ab..fba1cf9072 100644 --- a/tfx/orchestration/kubeflow/v2/testdata/expected_full_taxi_pipeline_job.json +++ b/tfx/orchestration/kubeflow/v2/testdata/expected_full_taxi_pipeline_job.json @@ -706,7 +706,7 @@ "parameters": { "eval_config": { "runtimeValue": { - "constant": "{\n \"metrics_specs\": [\n {\n \"metrics\": [\n {\n \"class_name\": \"ExampleCount\"\n }\n ],\n \"thresholds\": {\n \"binary_accuracy\": {\n \"change_threshold\": {\n \"absolute\": -1e-10,\n \"direction\": \"HIGHER_IS_BETTER\"\n },\n \"value_threshold\": {\n \"lower_bound\": 0.5\n }\n }\n }\n }\n ],\n \"model_specs\": [\n {\n \"signature_name\": \"eval\"\n }\n ],\n \"slicing_specs\": [\n {},\n {\n \"feature_keys\": [\n \"trip_start_hour\"\n ]\n }\n ]\n}" + "constant": "{\n \"metrics_specs\": [\n {\n \"metrics\": [\n {\n \"class_name\": \"BinaryAccuracy\",\n \"threshold\": {\n \"change_threshold\": {\n \"absolute\": -1e-10,\n \"direction\": \"HIGHER_IS_BETTER\"\n },\n \"value_threshold\": {\n \"lower_bound\": 0.6\n }\n }\n }\n ]\n }\n ],\n \"model_specs\": [\n {\n \"label_key\": \"tips_xf\",\n \"preprocessing_function_names\": [\n \"transform_features\"\n ],\n \"signature_name\": \"serving_default\"\n }\n ],\n \"slicing_specs\": [\n {}\n ]\n}" } }, "example_splits": { diff --git a/tfx/orchestration/kubeflow/v2/testdata/legacy/expected_full_taxi_pipeline_job.json b/tfx/orchestration/kubeflow/v2/testdata/legacy/expected_full_taxi_pipeline_job.json index da72f2eb64..8af0c0f92a 100644 --- a/tfx/orchestration/kubeflow/v2/testdata/legacy/expected_full_taxi_pipeline_job.json +++ b/tfx/orchestration/kubeflow/v2/testdata/legacy/expected_full_taxi_pipeline_job.json @@ -698,7 +698,7 @@ "eval_config": { "runtimeValue": { "constantValue": { - "stringValue": "{\n \"metrics_specs\": [\n {\n \"metrics\": [\n {\n \"class_name\": \"ExampleCount\"\n }\n ],\n \"thresholds\": {\n \"binary_accuracy\": {\n \"change_threshold\": {\n \"absolute\": -1e-10,\n \"direction\": \"HIGHER_IS_BETTER\"\n },\n \"value_threshold\": {\n \"lower_bound\": 0.5\n }\n }\n }\n }\n ],\n \"model_specs\": [\n {\n \"signature_name\": \"eval\"\n }\n ],\n \"slicing_specs\": [\n {},\n {\n \"feature_keys\": [\n \"trip_start_hour\"\n ]\n }\n ]\n}" + "stringValue": "{\n \"metrics_specs\": [\n {\n \"metrics\": [\n {\n \"class_name\": \"BinaryAccuracy\",\n \"threshold\": {\n \"change_threshold\": {\n \"absolute\": -1e-10,\n \"direction\": \"HIGHER_IS_BETTER\"\n },\n \"value_threshold\": {\n \"lower_bound\": 0.6\n }\n }\n }\n ]\n }\n ],\n \"model_specs\": [\n {\n \"label_key\": \"tips_xf\",\n \"preprocessing_function_names\": [\n \"transform_features\"\n ],\n \"signature_name\": \"serving_default\"\n }\n ],\n \"slicing_specs\": [\n {}\n ]\n}" } } },