Skip to content

Commit

Permalink
[FLINK-16317][operators] return this from (Keyed)MultipleInputTransfo…
Browse files Browse the repository at this point in the history
…rmation#addInput
  • Loading branch information
pnowojski committed Mar 26, 2020
1 parent 05883b7 commit 1d5f447
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ public KeyedMultipleInputTransformation(
this.stateKeyType = stateKeyType;
}

public void addInput(Transformation<?> input, KeySelector<?, ?> keySelector) {
public KeyedMultipleInputTransformation<OUT> addInput(Transformation<?> input, KeySelector<?, ?> keySelector) {
inputs.add(input);
getStateKeySelectors().add(keySelector);
return this;
}

public TypeInformation<?> getStateKeyType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public MultipleInputTransformation(
super(name, operatorFactory, outputType, parallelism);
}

public void addInput(Transformation<?> input) {
public MultipleInputTransformation<OUT> addInput(Transformation<?> input) {
inputs.add(input);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,11 @@ public void testMultipleInputTransformation() throws Exception {
BasicTypeInfo.STRING_TYPE_INFO,
3);

transform.addInput(source1.getTransformation());
transform.addInput(source2.getTransformation());
transform.addInput(source3.getTransformation());
env.addOperator(transform
.addInput(source1.getTransformation())
.addInput(source2.getTransformation())
.addInput(source3.getTransformation()));

env.addOperator(transform);
StreamGraph streamGraph = env.getStreamGraph();
assertEquals(4, streamGraph.getStreamNodes().size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ public void test() throws Exception {
BasicTypeInfo.LONG_TYPE_INFO,
1);

transform.addInput(source1.getTransformation());
transform.addInput(source2.getTransformation());
transform.addInput(source3.getTransformation());

env.addOperator(transform);
env.addOperator(transform
.addInput(source1.getTransformation())
.addInput(source2.getTransformation())
.addInput(source3.getTransformation()));

new MultipleConnectedStreams(env)
.transform(transform)
Expand Down Expand Up @@ -109,10 +108,10 @@ public void testKeyedState() throws Exception {
BasicTypeInfo.LONG_TYPE_INFO);
KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value -> value % 3;

transform.addInput(source1.getTransformation(), keySelector);
transform.addInput(source2.getTransformation(), keySelector);
transform.addInput(source3.getTransformation(), keySelector);
env.addOperator(transform);
env.addOperator(transform
.addInput(source1.getTransformation(), keySelector)
.addInput(source2.getTransformation(), keySelector)
.addInput(source3.getTransformation(), keySelector));

new MultipleConnectedStreams(env)
.transform(transform)
Expand Down

0 comments on commit 1d5f447

Please sign in to comment.