From de7c771b3f81092483605c2a1975f2ba0dd9b73c Mon Sep 17 00:00:00 2001 From: kmonte Date: Wed, 27 Mar 2024 14:56:57 -0700 Subject: [PATCH] Encode producer component id and output key when CWP is created from an OutputChannel PiperOrigin-RevId: 619667393 --- tfx/dsl/compiler/node_inputs_compiler.py | 3 +- tfx/dsl/compiler/node_inputs_compiler_test.py | 9 ++-- tfx/dsl/compiler/placeholder_utils.py | 1 + ...omposable_pipeline_async_input_v2_ir.pbtxt | 4 +- .../composable_pipeline_input_v2_ir.pbtxt | 4 +- .../conditional_pipeline_input_v2_ir.pbtxt | 8 ++-- ...exec_properties_pipeline_input_v2_ir.pbtxt | 2 +- ...ipeline_with_annotations_input_v2_ir.pbtxt | 2 +- tfx/types/channel.py | 7 ++- tfx/types/channel_utils.py | 4 +- tfx/types/channel_wrapped_placeholder_test.py | 44 ++++++++++--------- ...to_placeholder_future_value_operator.pbtxt | 2 +- 12 files changed, 50 insertions(+), 40 deletions(-) diff --git a/tfx/dsl/compiler/node_inputs_compiler.py b/tfx/dsl/compiler/node_inputs_compiler.py index bd6423ecae..8551263154 100644 --- a/tfx/dsl/compiler/node_inputs_compiler.py +++ b/tfx/dsl/compiler/node_inputs_compiler.py @@ -268,7 +268,8 @@ def _compile_input_spec( name=channel.pipeline_name, ) result_input_channel.metadata_connection_config.Pack(config) - + elif isinstance(channel, channel_types.ChannelWrappedPlaceholder): + print(f'Channel: {tfx_node.id}.{input_key} was a CWP!') # Note that this path is *usually* not taken, as most output channels already # exist in pipeline_ctx.channels, as they are added in after # compiler._generate_input_spec_for_outputs is called. diff --git a/tfx/dsl/compiler/node_inputs_compiler_test.py b/tfx/dsl/compiler/node_inputs_compiler_test.py index 5bb2844e4f..973b8259af 100644 --- a/tfx/dsl/compiler/node_inputs_compiler_test.py +++ b/tfx/dsl/compiler/node_inputs_compiler_test.py @@ -326,7 +326,8 @@ def testCompileConditionals(self): self.assertEqual(result.inputs[cond_input_key].min_count, 1) self.assertLen(result.conditionals, 1) cond = list(result.conditionals.values())[0] - self.assertProtoEquals(""" + self.assertProtoEquals( + """ operator { compare_op { op: EQUAL @@ -343,7 +344,7 @@ def testCompileConditionals(self): index_op { expression { placeholder { - key: "%s" + key: "CondNode_x" } } } @@ -354,7 +355,9 @@ def testCompileConditionals(self): } } } - """ % cond_input_key, cond.placeholder_expression) + """, + cond.placeholder_expression, + ) def testCompileInputsForDynamicProperties(self): producer = DummyNode('Producer') diff --git a/tfx/dsl/compiler/placeholder_utils.py b/tfx/dsl/compiler/placeholder_utils.py index 979301bd51..31356467ec 100644 --- a/tfx/dsl/compiler/placeholder_utils.py +++ b/tfx/dsl/compiler/placeholder_utils.py @@ -107,6 +107,7 @@ def resolve_placeholder_expression( debug_str(expression), err.placeholder, ) + logging.warning("Context: %s", context) return None except Exception as e: raise ValueError( diff --git a/tfx/dsl/compiler/testdata/composable_pipeline_async_input_v2_ir.pbtxt b/tfx/dsl/compiler/testdata/composable_pipeline_async_input_v2_ir.pbtxt index 4ddfe7f4b4..9098f48af3 100644 --- a/tfx/dsl/compiler/testdata/composable_pipeline_async_input_v2_ir.pbtxt +++ b/tfx/dsl/compiler/testdata/composable_pipeline_async_input_v2_ir.pbtxt @@ -2109,7 +2109,7 @@ nodes { index_op { expression { placeholder { - key: "blessing" + key: "Evaluator_blessing" } } } @@ -3318,7 +3318,7 @@ nodes { index_op { expression { placeholder { - key: "_infra-validator-pipeline.blessing" + key: "infra-validator-pipeline_blessing" } } } diff --git a/tfx/dsl/compiler/testdata/composable_pipeline_input_v2_ir.pbtxt b/tfx/dsl/compiler/testdata/composable_pipeline_input_v2_ir.pbtxt index 2a4b8c1c44..25606c96e8 100644 --- a/tfx/dsl/compiler/testdata/composable_pipeline_input_v2_ir.pbtxt +++ b/tfx/dsl/compiler/testdata/composable_pipeline_input_v2_ir.pbtxt @@ -2368,7 +2368,7 @@ nodes { index_op { expression { placeholder { - key: "_Evaluator.blessing" + key: "Evaluator_blessing" } } } @@ -3686,7 +3686,7 @@ nodes { index_op { expression { placeholder { - key: "_infra-validator-pipeline.blessing" + key: "infra-validator-pipeline_blessing" } } } diff --git a/tfx/dsl/compiler/testdata/conditional_pipeline_input_v2_ir.pbtxt b/tfx/dsl/compiler/testdata/conditional_pipeline_input_v2_ir.pbtxt index 34bd7e9a89..999bd5f99e 100644 --- a/tfx/dsl/compiler/testdata/conditional_pipeline_input_v2_ir.pbtxt +++ b/tfx/dsl/compiler/testdata/conditional_pipeline_input_v2_ir.pbtxt @@ -1001,7 +1001,7 @@ nodes { index_op { expression { placeholder { - key: "_Evaluator.blessing" + key: "Evaluator_blessing" } } } @@ -1264,7 +1264,7 @@ nodes { index_op { expression { placeholder { - key: "_Evaluator.blessing" + key: "Evaluator_blessing" } } } @@ -1301,7 +1301,7 @@ nodes { index_op { expression { placeholder { - key: "_InfraValidator.blessing" + key: "InfraValidator_blessing" } } } @@ -1333,7 +1333,7 @@ nodes { index_op { expression { placeholder { - key: "model" + key: "Trainer_model" } } } diff --git a/tfx/dsl/compiler/testdata/dynamic_exec_properties_pipeline_input_v2_ir.pbtxt b/tfx/dsl/compiler/testdata/dynamic_exec_properties_pipeline_input_v2_ir.pbtxt index 549dbfecb2..ebfa13e432 100644 --- a/tfx/dsl/compiler/testdata/dynamic_exec_properties_pipeline_input_v2_ir.pbtxt +++ b/tfx/dsl/compiler/testdata/dynamic_exec_properties_pipeline_input_v2_ir.pbtxt @@ -180,7 +180,7 @@ nodes { index_op { expression { placeholder { - key: "_UpstreamComponent.num" + key: "UpstreamComponent_num" } } } diff --git a/tfx/dsl/compiler/testdata/pipeline_with_annotations_input_v2_ir.pbtxt b/tfx/dsl/compiler/testdata/pipeline_with_annotations_input_v2_ir.pbtxt index 02346d1514..c1d5f170b9 100644 --- a/tfx/dsl/compiler/testdata/pipeline_with_annotations_input_v2_ir.pbtxt +++ b/tfx/dsl/compiler/testdata/pipeline_with_annotations_input_v2_ir.pbtxt @@ -221,7 +221,7 @@ nodes { index_op { expression { placeholder { - key: "_UpstreamComponent.num" + key: "UpstreamComponent_num" } } } diff --git a/tfx/types/channel.py b/tfx/types/channel.py index 9c79ea7e4b..195c840248 100644 --- a/tfx/types/channel.py +++ b/tfx/types/channel.py @@ -558,7 +558,9 @@ def set_as_async_channel(self) -> None: self._is_async = True def future(self) -> ChannelWrappedPlaceholder: - return ChannelWrappedPlaceholder(self) + return ChannelWrappedPlaceholder( + self, key=f'{self.producer_component_id}_{self.output_key}' + ) @doc_controls.do_not_generate_docs @@ -793,7 +795,8 @@ def set_key(self, key: Optional[str]): Args: key: The new key for the channel. """ - self._key = key + del key # unused. + return def __getitem__(self, index: int) -> ChannelWrappedPlaceholder: if self._index is not None: diff --git a/tfx/types/channel_utils.py b/tfx/types/channel_utils.py index 3712553833..a866199f61 100644 --- a/tfx/types/channel_utils.py +++ b/tfx/types/channel_utils.py @@ -214,10 +214,8 @@ def unwrap_simple_channel_placeholder( # proto paths above and been getting default messages all along. If this # sub-message is present, then the whole chain was correct. not index_op.expression.HasField('placeholder') - # ChannelWrappedPlaceholder uses INPUT_ARTIFACT for some reason, and has - # no key when encoded with encode(). + # ChannelWrappedPlaceholder uses INPUT_ARTIFACT for some reason. or cwp.type != placeholder_pb2.Placeholder.Type.INPUT_ARTIFACT - or cwp.key # For the `[0]` part of the desired shape. or index_op.index != 0 ): diff --git a/tfx/types/channel_wrapped_placeholder_test.py b/tfx/types/channel_wrapped_placeholder_test.py index 781e86fe72..feb87e921c 100644 --- a/tfx/types/channel_wrapped_placeholder_test.py +++ b/tfx/types/channel_wrapped_placeholder_test.py @@ -61,7 +61,7 @@ def testProtoFutureValueOperator(self): output_key='num', ) placeholder = output_channel.future()[0].value - channel_to_key = {output_channel: '_component.num'} + channel_to_key = {output_channel: 'producer_num'} self.assertProtoEquals( channel_utils.encode_placeholder_with_channels( placeholder, lambda k: channel_to_key[k] @@ -161,7 +161,7 @@ def testEncodeWithKeys(self): index_op { expression { placeholder { - key: "MyTypeName" + key: "producer_foo" } } } @@ -351,7 +351,9 @@ def testEncode(self): operator { index_op { expression { - placeholder {} + placeholder { + key: "a_foo" + } } } } @@ -366,7 +368,9 @@ def testEncode(self): operator { index_op { expression { - placeholder {} + placeholder { + key: "b_bar" + } } } } @@ -413,7 +417,7 @@ def testEncodeWithKeys(self): index_op { expression { placeholder { - key: "channel_1_key" + key: "a_foo" } } } @@ -430,7 +434,7 @@ def testEncodeWithKeys(self): index_op { expression { placeholder { - key: "channel_2_key" + key: "b_bar" } } } @@ -482,7 +486,7 @@ def testNegation(self): index_op { expression { placeholder { - key: "channel_1_key" + key: "a_foo" } } } @@ -499,7 +503,7 @@ def testNegation(self): index_op { expression { placeholder { - key: "channel_2_key" + key: "b_bar" } } } @@ -553,7 +557,7 @@ def testDoubleNegation(self): index_op { expression { placeholder { - key: "channel_1_key" + key: "a_foo" } } } @@ -570,7 +574,7 @@ def testDoubleNegation(self): index_op { expression { placeholder { - key: "channel_2_key" + key: "b_bar" } } } @@ -622,7 +626,7 @@ def testComparison_notEqual(self): index_op { expression { placeholder { - key: "channel_1_key" + key: "a_foo" } } } @@ -639,7 +643,7 @@ def testComparison_notEqual(self): index_op { expression { placeholder { - key: "channel_2_key" + key: "b_bar" } } } @@ -695,7 +699,7 @@ def testComparison_lessThanOrEqual(self): index_op { expression { placeholder { - key: "channel_1_key" + key: "a_foo" } } } @@ -712,7 +716,7 @@ def testComparison_lessThanOrEqual(self): index_op { expression { placeholder { - key: "channel_2_key" + key: "b_bar" } } } @@ -768,7 +772,7 @@ def testComparison_greaterThanOrEqual(self): index_op { expression { placeholder { - key: "channel_1_key" + key: "a_foo" } } } @@ -785,7 +789,7 @@ def testComparison_greaterThanOrEqual(self): index_op { expression { placeholder { - key: "channel_2_key" + key: "b_bar" } } } @@ -868,7 +872,7 @@ def testNestedLogicalOps(self): index_op { expression { placeholder { - key: "channel_11_key" + key: "a_1" } } } @@ -885,7 +889,7 @@ def testNestedLogicalOps(self): index_op { expression { placeholder { - key: "channel_12_key" + key: "b_2" } } } @@ -913,7 +917,7 @@ def testNestedLogicalOps(self): index_op { expression { placeholder { - key: "channel_21_key" + key: "c_3" } } } @@ -930,7 +934,7 @@ def testNestedLogicalOps(self): index_op { expression { placeholder { - key: "channel_22_key" + key: "d_4" } } } diff --git a/tfx/types/testdata/proto_placeholder_future_value_operator.pbtxt b/tfx/types/testdata/proto_placeholder_future_value_operator.pbtxt index 6b260aec6a..f3dbfaa56a 100644 --- a/tfx/types/testdata/proto_placeholder_future_value_operator.pbtxt +++ b/tfx/types/testdata/proto_placeholder_future_value_operator.pbtxt @@ -8,7 +8,7 @@ operator { index_op { expression { placeholder { - key: "_component.num" + key: "producer_num" } } }