From 29538dbee12e6df48bfa92ecd7ea28935579c9a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1ra=20Van=C4=9Bk?= Date: Fri, 24 Mar 2017 17:41:41 +0100 Subject: [PATCH] #14 [euphoria-flink] Allow WindowAssigner operator chaining --- .../euphoria/flink/streaming/ReduceStateByKeyTranslator.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java index 7c0f265f..a9634486 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -149,6 +150,9 @@ private static class WindowAssignerOperator private WindowAssignerOperator(WindowAssigner windowAssigner) { this.windowAssigner = windowAssigner; + + // allow chaining to optimize performance + this.chainingStrategy = ChainingStrategy.ALWAYS; } @Override