From 1d5f44710270d1c615537f0d05ab49e699d3a6e5 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Wed, 25 Mar 2020 08:53:39 +0100 Subject: [PATCH] [FLINK-16317][operators] return this from (Keyed)MultipleInputTransformation#addInput --- .../KeyedMultipleInputTransformation.java | 3 ++- .../MultipleInputTransformation.java | 3 ++- .../api/graph/StreamGraphGeneratorTest.java | 8 ++++---- .../streaming/runtime/MultipleInputITCase.java | 17 ++++++++--------- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedMultipleInputTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedMultipleInputTransformation.java index 9d15c972827bd..f9cd900ae2364 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedMultipleInputTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedMultipleInputTransformation.java @@ -45,9 +45,10 @@ public KeyedMultipleInputTransformation( this.stateKeyType = stateKeyType; } - public void addInput(Transformation input, KeySelector keySelector) { + public KeyedMultipleInputTransformation addInput(Transformation input, KeySelector keySelector) { inputs.add(input); getStateKeySelectors().add(keySelector); + return this; } public TypeInformation getStateKeyType() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/MultipleInputTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/MultipleInputTransformation.java index 9acc8938a4610..cd650adcb7623 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/MultipleInputTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/MultipleInputTransformation.java @@ -36,7 +36,8 @@ public MultipleInputTransformation( super(name, operatorFactory, outputType, parallelism); } - public void addInput(Transformation input) { + public MultipleInputTransformation addInput(Transformation input) { inputs.add(input); + return this; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 91ea7dd62573a..3d0f502b0d7ed 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -331,11 +331,11 @@ public void testMultipleInputTransformation() throws Exception { BasicTypeInfo.STRING_TYPE_INFO, 3); - transform.addInput(source1.getTransformation()); - transform.addInput(source2.getTransformation()); - transform.addInput(source3.getTransformation()); + env.addOperator(transform + .addInput(source1.getTransformation()) + .addInput(source2.getTransformation()) + .addInput(source3.getTransformation())); - env.addOperator(transform); StreamGraph streamGraph = env.getStreamGraph(); assertEquals(4, streamGraph.getStreamNodes().size()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java index d90ea4649a6cb..28c23f65ce236 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java @@ -72,11 +72,10 @@ public void test() throws Exception { BasicTypeInfo.LONG_TYPE_INFO, 1); - transform.addInput(source1.getTransformation()); - transform.addInput(source2.getTransformation()); - transform.addInput(source3.getTransformation()); - - env.addOperator(transform); + env.addOperator(transform + .addInput(source1.getTransformation()) + .addInput(source2.getTransformation()) + .addInput(source3.getTransformation())); new MultipleConnectedStreams(env) .transform(transform) @@ -109,10 +108,10 @@ public void testKeyedState() throws Exception { BasicTypeInfo.LONG_TYPE_INFO); KeySelector keySelector = (KeySelector) value -> value % 3; - transform.addInput(source1.getTransformation(), keySelector); - transform.addInput(source2.getTransformation(), keySelector); - transform.addInput(source3.getTransformation(), keySelector); - env.addOperator(transform); + env.addOperator(transform + .addInput(source1.getTransformation(), keySelector) + .addInput(source2.getTransformation(), keySelector) + .addInput(source3.getTransformation(), keySelector)); new MultipleConnectedStreams(env) .transform(transform)