diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/ExecutionEnvironment.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/ExecutionEnvironment.java index bae682f8..9ca158b2 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/ExecutionEnvironment.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/ExecutionEnvironment.java @@ -21,6 +21,8 @@ import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.flink.batch.BatchElement; +import cz.seznam.euphoria.flink.streaming.StreamingElement; import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -138,7 +140,8 @@ private Set> getClassesToRegister(Set> registeredClasses) { ret.add(TimeInterval.class); ret.add(Pair.class); - ret.add(FlinkElement.class); + ret.add(StreamingElement.class); + ret.add(BatchElement.class); ret.add(KeyedMultiWindowedElement.class); return ret; } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java index afe1d53c..50971d4f 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java @@ -193,6 +193,10 @@ protected FlowTranslator createStreamTranslator(Settings settings, /** * See {@link ExecutionConfig#disableObjectReuse()} * and {@link ExecutionConfig#disableObjectReuse()}. + * + * @param reuse set TRUE for enabling object reuse + * + * @return this instance (for method chaining purposes) */ public FlinkExecutor setObjectReuse(boolean reuse){ this.objectReuse = reuse; diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkElement.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchElement.java similarity index 85% rename from euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkElement.java rename to euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchElement.java index 22f21975..8fdb14f8 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkElement.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchElement.java @@ -13,28 +13,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.flink; +package cz.seznam.euphoria.flink.batch; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; /** - * Single element flowing through Flink pipeline. Every such element + * Single element flowing through Flink batch pipeline. Every such element * is associated with a window identifier and timestamp. * @param type of the assigned window * @param type of the data element */ -public class FlinkElement implements WindowedElement { +public class BatchElement implements WindowedElement { private Window window; private long timestamp; private T element; // This class needs to ne POJO for effective serialization - public FlinkElement() { + public BatchElement() { } - public FlinkElement(W window, long timestamp, T element) { + public BatchElement(W window, long timestamp, T element) { this.window = window; this.timestamp = timestamp; this.element = element; @@ -51,20 +51,20 @@ public void setWindow(W window) { } @Override - public long getTimestamp() { - return timestamp; + public T getElement() { + return element; } - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; + public void setElement(T element) { + this.element = element; } @Override - public T getElement() { - return element; + public long getTimestamp() { + return timestamp; } - public void setElement(T element) { - this.element = element; + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/functions/UnaryFunctorWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchUnaryFunctorWrapper.java similarity index 70% rename from euphoria-flink/src/main/java/cz/seznam/euphoria/flink/functions/UnaryFunctorWrapper.java rename to euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchUnaryFunctorWrapper.java index 8edf5888..416c493f 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/functions/UnaryFunctorWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchUnaryFunctorWrapper.java @@ -13,12 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.flink.functions; +package cz.seznam.euphoria.flink.batch; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.io.Context; -import cz.seznam.euphoria.flink.FlinkElement; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -26,26 +25,26 @@ import java.util.Objects; -public class UnaryFunctorWrapper - implements FlatMapFunction, - FlinkElement>, - ResultTypeQueryable> { +public class BatchUnaryFunctorWrapper + implements FlatMapFunction, + BatchElement>, + ResultTypeQueryable> { private final UnaryFunctor f; - public UnaryFunctorWrapper(UnaryFunctor f) { + public BatchUnaryFunctorWrapper(UnaryFunctor f) { this.f = Objects.requireNonNull(f); } @Override - public void flatMap(FlinkElement value, - Collector> out) + public void flatMap(BatchElement value, + Collector> out) throws Exception { f.apply(value.getElement(), new Context() { @Override public void collect(OUT elem) { - out.collect(new FlinkElement<>( + out.collect(new BatchElement<>( value.getWindow(), value.getTimestamp(), elem)); } @Override @@ -57,7 +56,7 @@ public Object getWindow() { @SuppressWarnings("unchecked") @Override - public TypeInformation> getProducedType() { - return TypeInformation.of((Class) FlinkElement.class); + public TypeInformation> getProducedType() { + return TypeInformation.of((Class) BatchElement.class); } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java index 10f17907..33ba016b 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java @@ -17,9 +17,7 @@ import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.operator.FlatMap; -import cz.seznam.euphoria.flink.FlinkElement; import cz.seznam.euphoria.flink.FlinkOperator; -import cz.seznam.euphoria.flink.functions.UnaryFunctorWrapper; import org.apache.flink.api.java.DataSet; class FlatMapTranslator implements BatchOperatorTranslator { @@ -31,8 +29,8 @@ public DataSet translate(FlinkOperator operator, DataSet input = context.getSingleInputStream(operator); UnaryFunctor mapper = operator.getOriginalOperator().getFunctor(); return input - .flatMap(new UnaryFunctorWrapper<>(mapper)) - .returns((Class) FlinkElement.class) + .flatMap(new BatchUnaryFunctorWrapper<>(mapper)) + .returns((Class) BatchElement.class) .setParallelism(operator.getParallelism()) .name(operator.getName()); } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator.java index a9092ac4..093ec141 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator.java @@ -24,7 +24,6 @@ import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.util.Pair; -import cz.seznam.euphoria.flink.FlinkElement; import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.Utils; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; @@ -78,14 +77,14 @@ public DataSet translate(FlinkOperator operator, final UnaryFunction udfValue = origOperator.getValueExtractor(); // ~ extract key/value from input elements and assign windows - DataSet> tuples; + DataSet> tuples; { // FIXME require keyExtractor to deliver `Comparable`s UnaryFunction timeAssigner = origOperator.getEventTimeAssigner(); - FlatMapOperator> wAssigned = + FlatMapOperator> wAssigned = input.flatMap((i, c) -> { - FlinkElement wel = (FlinkElement) i; + BatchElement wel = (BatchElement) i; if (timeAssigner != null) { long stamp = timeAssigner.apply(wel.getElement()); wel.setTimestamp(stamp); @@ -96,18 +95,18 @@ public DataSet translate(FlinkOperator operator, long stamp = (wid instanceof TimedWindow) ? ((TimedWindow) wid).maxTimestamp() : wel.getTimestamp(); - c.collect(new FlinkElement<>( + c.collect(new BatchElement<>( wid, stamp, Pair.of(udfKey.apply(el), udfValue.apply(el)))); } }); tuples = wAssigned .name(operator.getName() + "::map-input") .setParallelism(operator.getParallelism()) - .returns(new TypeHint>() {}); + .returns(new TypeHint>() {}); } // ~ reduce the data now - Operator, ?> reduced; + Operator, ?> reduced; reduced = tuples .groupBy(new RBKKeySelector()) .reduce(new RBKReducer(reducer)); @@ -125,8 +124,8 @@ public DataSet translate(FlinkOperator operator, .partitionCustom( new PartitionerWrapper<>(origOperator.getPartitioning().getPartitioner()), Utils.wrapQueryable( - (KeySelector, Comparable>) - (FlinkElement we) -> (Comparable) we.getElement().getKey(), + (KeySelector, Comparable>) + (BatchElement we) -> (Comparable) we.getElement().getKey(), Comparable.class)) .setParallelism(operator.getParallelism()); } @@ -141,18 +140,18 @@ public DataSet translate(FlinkOperator operator, */ @SuppressWarnings("unchecked") static class RBKKeySelector - implements KeySelector, Tuple2> { + implements KeySelector, Tuple2> { @Override public Tuple2 getKey( - FlinkElement value) { + BatchElement value) { return new Tuple2(value.getWindow(), value.getElement().getKey()); } } static class RBKReducer - implements ReduceFunction> { + implements ReduceFunction> { final UnaryFunction reducer; @@ -161,11 +160,11 @@ static class RBKReducer } @Override - public FlinkElement - reduce(FlinkElement p1, FlinkElement p2) { + public BatchElement + reduce(BatchElement p1, BatchElement p2) { Window wid = p1.getWindow(); - return new FlinkElement<>( + return new BatchElement<>( wid, Math.max(p1.getTimestamp(), p2.getTimestamp()), Pair.of( diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java index 4ce03576..0ac1227c 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java @@ -17,7 +17,6 @@ import com.google.common.collect.Iterables; import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; import cz.seznam.euphoria.core.client.functional.StateFactory; @@ -29,7 +28,6 @@ import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.greduce.GroupReducer; import cz.seznam.euphoria.core.util.Settings; -import cz.seznam.euphoria.flink.FlinkElement; import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.Utils; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; @@ -75,9 +73,9 @@ public DataSet translate(FlinkOperator operator, UnaryFunction timeAssigner = origOperator.getEventTimeAssigner(); // FIXME require keyExtractor to deliver `Comparable`s - DataSet wAssigned = + DataSet wAssigned = input.flatMap((i, c) -> { - FlinkElement wel = (FlinkElement) i; + BatchElement wel = (BatchElement) i; // assign timestamp if timeAssigner defined if (timeAssigner != null) { @@ -86,27 +84,27 @@ public DataSet translate(FlinkOperator operator, Set assigned = windowing.assignWindowsToElement(wel); for (Window wid : assigned) { Object el = wel.getElement(); - c.collect(new FlinkElement<>( + c.collect(new BatchElement<>( wid, wel.getTimestamp(), Pair.of(udfKey.apply(el), udfValue.apply(el)))); } }) - .returns(FlinkElement.class) + .returns(BatchElement.class) .name(operator.getName() + "::map-input") .setParallelism(operator.getParallelism()); // ~ reduce the data now - DataSet> reduced = + DataSet> reduced = wAssigned.groupBy((KeySelector) Utils.wrapQueryable( // ~ FIXME if the underlying windowing is "non merging" we can group by // "key _and_ window", thus, better utilizing the available resources - (FlinkElement we) -> (Comparable) we.getElement().getFirst(), + (BatchElement we) -> (Comparable) we.getElement().getFirst(), Comparable.class)) .sortGroup(Utils.wrapQueryable( - (KeySelector, Long>) - FlinkElement::getTimestamp, Long.class), + (KeySelector, Long>) + BatchElement::getTimestamp, Long.class), Order.ASCENDING) .reduceGroup(new RSBKReducer(origOperator, stateStorageProvider, windowing)) .setParallelism(operator.getParallelism()) @@ -118,8 +116,8 @@ public DataSet translate(FlinkOperator operator, .partitionCustom(new PartitionerWrapper<>( origOperator.getPartitioning().getPartitioner()), Utils.wrapQueryable( - (KeySelector, Comparable>) - (FlinkElement we) -> (Comparable) we.getElement().getKey(), + (KeySelector, Comparable>) + (BatchElement we) -> (Comparable) we.getElement().getKey(), Comparable.class)) .setParallelism(operator.getParallelism()); } @@ -128,8 +126,8 @@ public DataSet translate(FlinkOperator operator, } static class RSBKReducer - implements GroupReduceFunction, FlinkElement>, - ResultTypeQueryable> + implements GroupReduceFunction, BatchElement>, + ResultTypeQueryable> { private final StateFactory stateFactory; private final CombinableReduceFunction stateCombiner; @@ -152,18 +150,18 @@ static class RSBKReducer @Override @SuppressWarnings("unchecked") - public void reduce(Iterable> values, - org.apache.flink.util.Collector> out) + public void reduce(Iterable> values, + org.apache.flink.util.Collector> out) { GroupReducer reducer = new GroupReducer<>( stateFactory, - FlinkElement::new, + BatchElement::new, stateCombiner, stateStorageProvider, windowing, trigger, - elem -> out.collect((FlinkElement) elem)); - for (FlinkElement value : values) { + elem -> out.collect((BatchElement) elem)); + for (BatchElement value : values) { reducer.process(value); } reducer.close(); @@ -171,8 +169,8 @@ public void reduce(Iterable> values, @Override @SuppressWarnings("unchecked") - public TypeInformation> getProducedType() { - return TypeInformation.of((Class) FlinkElement.class); + public TypeInformation> getProducedType() { + return TypeInformation.of((Class) BatchElement.class); } } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/RepartitionTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/RepartitionTranslator.java index 342ff325..393e195c 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/RepartitionTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/RepartitionTranslator.java @@ -17,7 +17,6 @@ import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.operator.Repartition; -import cz.seznam.euphoria.flink.FlinkElement; import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.Utils; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; @@ -30,8 +29,8 @@ class RepartitionTranslator implements BatchOperatorTranslator { public DataSet translate(FlinkOperator operator, BatchExecutorContext context) { - DataSet input = - (DataSet)context.getSingleInputStream(operator); + DataSet input = + (DataSet)context.getSingleInputStream(operator); Partitioning partitioning = operator.getOriginalOperator().getPartitioning(); PartitionerWrapper flinkPartitioner = @@ -39,7 +38,7 @@ public DataSet translate(FlinkOperator operator, return input.partitionCustom( flinkPartitioner, - Utils.wrapQueryable((FlinkElement we) -> (Comparable) we.getElement(), Comparable.class)) + Utils.wrapQueryable((BatchElement we) -> (Comparable) we.getElement(), Comparable.class)) .setParallelism(operator.getParallelism()); } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/io/DataSinkWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/io/DataSinkWrapper.java index 05d58bed..8b95152d 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/io/DataSinkWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/io/DataSinkWrapper.java @@ -17,14 +17,14 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.Writer; -import cz.seznam.euphoria.flink.FlinkElement; +import cz.seznam.euphoria.flink.batch.BatchElement; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import java.io.IOException; public class DataSinkWrapper - implements OutputFormat> + implements OutputFormat> { private final DataSink dataSink; @@ -45,7 +45,7 @@ public void open(int taskNumber, int numTasks) throws IOException { } @Override - public void writeRecord(FlinkElement record) throws IOException { + public void writeRecord(BatchElement record) throws IOException { writer.write(record.getElement()); } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/io/DataSourceWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/io/DataSourceWrapper.java index bb222b13..d3965739 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/io/DataSourceWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/io/DataSourceWrapper.java @@ -20,7 +20,7 @@ import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.io.Partition; import cz.seznam.euphoria.core.client.io.Reader; -import cz.seznam.euphoria.flink.FlinkElement; +import cz.seznam.euphoria.flink.batch.BatchElement; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -34,7 +34,7 @@ import java.util.function.BiFunction; public class DataSourceWrapper - implements InputFormat, + implements InputFormat, PartitionWrapper>, ResultTypeQueryable { @@ -92,10 +92,10 @@ public boolean reachedEnd() throws IOException { } @Override - public FlinkElement nextRecord( - FlinkElement reuse) + public BatchElement nextRecord( + BatchElement reuse) throws IOException { - return new FlinkElement<>(Batch.BatchWindow.get(), 0L, reader.next()); + return new BatchElement<>(Batch.BatchWindow.get(), 0L, reader.next()); } @Override @@ -106,6 +106,6 @@ public void close() throws IOException { @Override @SuppressWarnings("unchecked") public TypeInformation getProducedType() { - return TypeInformation.of((Class) FlinkElement.class); + return TypeInformation.of((Class) BatchElement.class); } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java index c662504b..27bafce6 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java @@ -18,7 +18,6 @@ import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.flink.FlinkOperator; -import cz.seznam.euphoria.flink.FlinkElement; import org.apache.flink.streaming.api.datastream.DataStream; class FlatMapTranslator implements StreamingOperatorTranslator { @@ -32,7 +31,7 @@ public DataStream translate(FlinkOperator operator, UnaryFunctor mapper = operator.getOriginalOperator().getFunctor(); return input .flatMap(new StreamingUnaryFunctorWrapper<>(mapper)) - .returns((Class) FlinkElement.class) + .returns((Class) StreamingElement.class) .name(operator.getName()) .setParallelism(operator.getParallelism()); } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/InputTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/InputTranslator.java index de3675e8..63d044ff 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/InputTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/InputTranslator.java @@ -20,7 +20,6 @@ import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.streaming.io.DataSourceWrapper; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; class InputTranslator implements StreamingOperatorTranslator { @@ -33,7 +32,6 @@ public DataStream translate(FlinkOperator operato return context.getExecutionEnvironment() .addSource(new DataSourceWrapper<>(ds)) - .setParallelism(operator.getParallelism()) - .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); + .setParallelism(operator.getParallelism()); } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java index b8e71bdf..a9634486 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java @@ -25,22 +25,24 @@ import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.util.Settings; import cz.seznam.euphoria.flink.FlinkOperator; -import cz.seznam.euphoria.flink.FlinkElement; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; import cz.seznam.euphoria.flink.streaming.windowing.AttachedWindowing; import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement; import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElementWindowOperator; -import cz.seznam.euphoria.flink.streaming.windowing.WindowedElementWindowOperator; -import org.apache.flink.api.common.functions.MapFunction; +import cz.seznam.euphoria.flink.streaming.windowing.StreamingElementWindowOperator; +import cz.seznam.euphoria.flink.streaming.windowing.WindowAssigner; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.time.Duration; import java.util.Objects; -import java.util.Set; class ReduceStateByKeyTranslator implements StreamingOperatorTranslator { @@ -82,22 +84,25 @@ public DataStream translate(FlinkOperator operator, new EventTimeAssigner(context.getAllowedLateness(), eventTimeAssigner)); } - DataStream> reduced; + DataStream> reduced; WindowAssigner elMapper = - new WindowAssigner(windowing, keyExtractor, valueExtractor, eventTimeAssigner); + new WindowAssigner(windowing, keyExtractor, valueExtractor); if (valueOfAfterShuffle) { reduced = input.keyBy(new UnaryFunctionKeyExtractor(keyExtractor)) - .transform(operator.getName(), TypeInformation.of(FlinkElement.class), - new WindowedElementWindowOperator(elMapper, windowing, stateFactory, stateCombiner, context.isLocalMode())) + .transform(operator.getName(), TypeInformation.of(StreamingElement.class), + new StreamingElementWindowOperator(elMapper, windowing, stateFactory, stateCombiner, context.isLocalMode())) .setParallelism(operator.getParallelism()); } else { // assign windows - DataStream windowed = input.map(elMapper) + DataStream windowed = input.transform( + operator.getName() + "::window-assigner", + TypeInformation.of(KeyedMultiWindowedElement.class), + new WindowAssignerOperator(elMapper)) // ~ execute in the same chain of the input's processing // so far, thereby, avoiding an unnecessary shuffle .setParallelism(input.getParallelism()); reduced = (DataStream) windowed.keyBy(new KeyedMultiWindowedElementKeyExtractor()) - .transform(operator.getName(), TypeInformation.of(FlinkElement.class), + .transform(operator.getName(), TypeInformation.of(StreamingElement.class), new KeyedMultiWindowedElementWindowOperator<>(windowing, stateFactory, stateCombiner, context.isLocalMode())) .setParallelism(operator.getParallelism()); } @@ -117,7 +122,7 @@ public DataStream translate(FlinkOperator operator, } private static class EventTimeAssigner - extends BoundedOutOfOrdernessTimestampExtractor + extends BoundedOutOfOrdernessTimestampExtractor { private final UnaryFunction eventTimeFn; @@ -127,7 +132,7 @@ private static class EventTimeAssigner } @Override - public long extractTimestamp(FlinkElement element) { + public long extractTimestamp(StreamingElement element) { return eventTimeFn.apply(element.getElement()); } @@ -137,47 +142,31 @@ public long extractTimestamp(FlinkElement element) { } } - private static class WindowAssigner implements MapFunction, - ResultTypeQueryable { + private static class WindowAssignerOperator + extends AbstractStreamOperator + implements OneInputStreamOperator { - private final Windowing windowing; - private final UnaryFunction keyExtractor; - private final UnaryFunction valueExtractor; - private final UnaryFunction eventTimeAssigner; - - public WindowAssigner(Windowing windowing, - UnaryFunction keyExtractor, - UnaryFunction valueExtractor, - UnaryFunction eventTimeAssigner) { - this.windowing = windowing; - this.keyExtractor = keyExtractor; - this.valueExtractor = valueExtractor; - this.eventTimeAssigner = eventTimeAssigner; + private final WindowAssigner windowAssigner; + + private WindowAssignerOperator(WindowAssigner windowAssigner) { + this.windowAssigner = windowAssigner; + + // allow chaining to optimize performance + this.chainingStrategy = ChainingStrategy.ALWAYS; } @Override @SuppressWarnings("unchecked") - public KeyedMultiWindowedElement map(FlinkElement el) throws Exception { - if (eventTimeAssigner != null) { - el.setTimestamp((long) eventTimeAssigner.apply(el.getElement())); - } - Set windows = windowing.assignWindowsToElement(el); - - return new KeyedMultiWindowedElement<>( - keyExtractor.apply(el.getElement()), - valueExtractor.apply(el.getElement()), - el.getTimestamp(), - windows); - } + public void processElement(StreamRecord record) throws Exception { + KeyedMultiWindowedElement assigned = windowAssigner.apply(record); + record.replace(assigned); - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(KeyedMultiWindowedElement.class); + output.collect((StreamRecord) record); } } private static class UnaryFunctionKeyExtractor - implements KeySelector, + implements KeySelector, ResultTypeQueryable { private final UnaryFunction keyExtractor; @@ -187,7 +176,7 @@ public UnaryFunctionKeyExtractor(UnaryFunction keyExtractor) { @Override @SuppressWarnings("unchecked") - public Object getKey(FlinkElement value) throws Exception { + public Object getKey(StreamingElement value) throws Exception { return keyExtractor.apply(value.getElement()); } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java index 9c84c175..f613ae4b 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java @@ -18,7 +18,6 @@ import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.operator.Repartition; import cz.seznam.euphoria.flink.FlinkOperator; -import cz.seznam.euphoria.flink.FlinkElement; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; import org.apache.flink.streaming.api.datastream.DataStream; @@ -29,8 +28,8 @@ class RepartitionTranslator implements StreamingOperatorTranslator public DataStream translate(FlinkOperator operator, StreamingExecutorContext context) { - DataStream input = - (DataStream) context.getSingleInputStream(operator); + DataStream input = + (DataStream) context.getSingleInputStream(operator); Partitioning partitioning = operator.getOriginalOperator().getPartitioning(); PartitionerWrapper flinkPartitioner = diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingElement.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingElement.java new file mode 100644 index 00000000..9f5bba76 --- /dev/null +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingElement.java @@ -0,0 +1,56 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming; + +import cz.seznam.euphoria.core.client.dataset.windowing.Window; + +/** + * Single element flowing through Flink streaming pipeline. Every such element + * is associated with a window identifier and timestamp. + * @param type of the assigned window + * @param type of the data element + */ +public class StreamingElement { + + private Window window; + private T element; + + // This class needs to ne POJO for effective serialization + public StreamingElement() { + } + + public StreamingElement(W window, T element) { + this.window = window; + this.element = element; + } + + @SuppressWarnings("unchecked") + public W getWindow() { + return (W) window; + } + + public void setWindow(W window) { + this.window = window; + } + + public T getElement() { + return element; + } + + public void setElement(T element) { + this.element = element; + } +} diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java index 025befe0..1cab2259 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java @@ -84,8 +84,8 @@ public List> translateInto(Flow flow) { // transform flow to acyclic graph of supported operators DAG>> dag = flowToDag(flow); - // we're running exclusively on event time - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // we're running exclusively on ingestion time + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.getConfig().setAutoWatermarkInterval(autoWatermarkInterval.toMillis()); StreamingExecutorContext executorContext = diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java index bc021336..bfe720a3 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java @@ -18,7 +18,6 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.io.Context; -import cz.seznam.euphoria.flink.FlinkElement; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -27,9 +26,9 @@ import java.util.Objects; public class StreamingUnaryFunctorWrapper - implements FlatMapFunction, - FlinkElement>, - ResultTypeQueryable> { + implements FlatMapFunction, + StreamingElement>, + ResultTypeQueryable> { private final UnaryFunctor f; @@ -38,26 +37,25 @@ public StreamingUnaryFunctorWrapper(UnaryFunctor f) { } @Override - public void flatMap(FlinkElement value, - Collector> out) + public void flatMap(StreamingElement element, + Collector> out) throws Exception { - f.apply(value.getElement(), new Context() { + f.apply(element.getElement(), new Context() { @Override public void collect(OUT elem) { - out.collect(new FlinkElement<>( - value.getWindow(), value.getTimestamp(), elem)); + out.collect(new StreamingElement<>(element.getWindow(), elem)); } @Override public Object getWindow() { - return value.getWindow(); + return element.getWindow(); } }); } @SuppressWarnings("unchecked") @Override - public TypeInformation> getProducedType() { - return TypeInformation.of((Class) FlinkElement.class); + public TypeInformation> getProducedType() { + return TypeInformation.of((Class) StreamingElement.class); } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.java index 4c272fb1..5309c213 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.java @@ -17,7 +17,7 @@ import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.Writer; -import cz.seznam.euphoria.flink.FlinkElement; +import cz.seznam.euphoria.flink.streaming.StreamingElement; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; @@ -26,7 +26,7 @@ import java.io.Serializable; public class DataSinkWrapper - extends RichSinkFunction> + extends RichSinkFunction> implements Checkpointed { private DataSink dataSink; @@ -56,7 +56,7 @@ public void close() throws Exception { } @Override - public void invoke(FlinkElement elem) throws Exception { + public void invoke(StreamingElement elem) throws Exception { writer.write(elem.getElement()); } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java index 8c59ecc1..257608d6 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java @@ -20,7 +20,7 @@ import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.io.Partition; import cz.seznam.euphoria.core.client.io.Reader; -import cz.seznam.euphoria.flink.FlinkElement; +import cz.seznam.euphoria.flink.streaming.StreamingElement; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit; public class DataSourceWrapper - extends RichParallelSourceFunction> - implements ResultTypeQueryable> + extends RichParallelSourceFunction> + implements ResultTypeQueryable> { private final DataSource dataSource; private volatile boolean isRunning = true; @@ -49,7 +49,7 @@ public DataSourceWrapper(DataSource dataSource) { } @Override - public void run(SourceContext> ctx) + public void run(SourceContext> ctx) throws Exception { StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); @@ -106,9 +106,9 @@ public void run(SourceContext> ctx) } } - private FlinkElement toStreamingElement(T elem) { + private StreamingElement toStreamingElement(T elem) { // assign ingestion timestamp to elements - return new FlinkElement<>(Batch.BatchWindow.get(), System.currentTimeMillis(), elem); + return new StreamingElement<>(Batch.BatchWindow.get(), elem); } @Override @@ -121,8 +121,8 @@ public void cancel() { @Override @SuppressWarnings("unchecked") - public TypeInformation> getProducedType() { - return TypeInformation.of((Class) FlinkElement.class); + public TypeInformation> getProducedType() { + return TypeInformation.of((Class) StreamingElement.class); } private ThreadPoolExecutor createThreadPool() { diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index 0be9c1ba..ea4d776f 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -36,7 +36,7 @@ import cz.seznam.euphoria.core.client.triggers.TriggerContext; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.flink.storage.Descriptors; -import cz.seznam.euphoria.flink.FlinkElement; +import cz.seznam.euphoria.flink.streaming.StreamingElement; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -61,8 +61,8 @@ import java.util.Objects; public abstract class AbstractWindowOperator - extends AbstractStreamOperator>> - implements OneInputStreamOperator>>, + extends AbstractStreamOperator>> + implements OneInputStreamOperator>>, Triggerable { private final Windowing windowing; @@ -139,7 +139,13 @@ private void setupEnvironment(Object key, WID window) { storageProvider.setWindow(window); } - /** Extracts the data element from the input stream record. */ + /** + * Extracts the data element from the input stream record. + * + * @param record input stream record + * + * @return extracted data element fro Flink stream record + */ protected abstract KeyedMultiWindowedElement recordValue(StreamRecord record) throws Exception; @Override @@ -147,13 +153,13 @@ private void setupEnvironment(Object key, WID window) { public void processElement(StreamRecord record) throws Exception { - KeyedMultiWindowedElement element = recordValue(record); - // drop late-comers immediately - if (element.getTimestamp() < timerService.currentWatermark()) { + if (record.getTimestamp() < timerService.currentWatermark()) { return; } + KeyedMultiWindowedElement element = recordValue(record); + if (windowing instanceof MergingWindowing) { MergingWindowSet mergingWindowSet = getMergingWindowSet(); @@ -203,7 +209,7 @@ public void processElement(StreamRecord record) // process trigger Trigger.TriggerResult triggerResult = trigger.onElement( - element.getTimestamp(), + record.getTimestamp(), currentWindow, triggerContext); @@ -228,7 +234,7 @@ public void processElement(StreamRecord record) // process trigger Trigger.TriggerResult triggerResult = trigger.onElement( - element.getTimestamp(), + record.getTimestamp(), window, triggerContext); @@ -449,9 +455,8 @@ public void collect(Object elem) { ? ((TimedWindow) window).maxTimestamp() : timerService.currentWatermark(); - // FIXME timestamp is duplicated here output.collect(reuse.replace( - new FlinkElement<>(window, stamp, Pair.of(key, elem)), + new StreamingElement<>(window, Pair.of(key, elem)), stamp)); } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElement.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElement.java index eb802f6a..0ae7d272 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElement.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElement.java @@ -16,14 +16,14 @@ package cz.seznam.euphoria.flink.streaming.windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.flink.FlinkElement; +import cz.seznam.euphoria.flink.streaming.StreamingElement; import java.util.Set; /** * Single data element associated with multiple windows. It's rather used * for performance optimization as the same thing could be expressed - * as a set of {@link FlinkElement} instances. + * as a set of {@link StreamingElement} instances. * * @param Type of used window * @param Type of element's key @@ -33,7 +33,6 @@ public class KeyedMultiWindowedElement { private KEY key; private VALUE value; - private long timestamp; private Set windows; public KeyedMultiWindowedElement() { @@ -41,11 +40,9 @@ public KeyedMultiWindowedElement() { public KeyedMultiWindowedElement(KEY key, VALUE value, - long timestamp, Set windows) { this.key = key; this.value = value; - this.timestamp = timestamp; this.windows = windows; } @@ -65,14 +62,6 @@ public void setValue(VALUE value) { this.value = value; } - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - public Set getWindows() { return windows; } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedElementWindowOperator.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java similarity index 54% rename from euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedElementWindowOperator.java rename to euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java index a4ec6d56..e04d3f8e 100644 --- a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedElementWindowOperator.java +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java @@ -20,35 +20,35 @@ import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; import cz.seznam.euphoria.core.client.functional.StateFactory; import cz.seznam.euphoria.core.client.operator.state.State; -import cz.seznam.euphoria.flink.FlinkElement; -import org.apache.flink.api.common.functions.MapFunction; +import cz.seznam.euphoria.flink.streaming.StreamingElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Objects; /** * An {@link AbstractWindowOperator} implementation which expects input - * elements to be of type {@link FlinkElement} and transforms these + * elements to be of type {@link StreamingElement} and transforms these * into {@link KeyedMultiWindowedElement} on the fly. */ -public class WindowedElementWindowOperator - extends AbstractWindowOperator, KEY, WID> { +public class StreamingElementWindowOperator + extends AbstractWindowOperator, KEY, WID> { - MapFunction, KeyedMultiWindowedElement> mapper; + WindowAssigner windowAssigner; - public WindowedElementWindowOperator( - MapFunction, KeyedMultiWindowedElement> mapper, - Windowing windowing, - StateFactory stateFactory, - CombinableReduceFunction stateCombiner, - boolean localMode) { - super(windowing, stateFactory, stateCombiner, localMode); - this.mapper = Objects.requireNonNull(mapper); - } + public StreamingElementWindowOperator( + WindowAssigner windowAssigner, + Windowing windowing, + StateFactory stateFactory, + CombinableReduceFunction stateCombiner, + boolean localMode) { + super(windowing, stateFactory, stateCombiner, localMode); + this.windowAssigner = Objects.requireNonNull(windowAssigner); + } - @Override - protected KeyedMultiWindowedElement - recordValue(StreamRecord> record) throws Exception { - return mapper.map(record.getValue()); - } + @Override + @SuppressWarnings("unchecked") + protected KeyedMultiWindowedElement + recordValue(StreamRecord> record) throws Exception { + return windowAssigner.apply((StreamRecord) record); + } } diff --git a/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowAssigner.java b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowAssigner.java new file mode 100644 index 00000000..7f75c6ed --- /dev/null +++ b/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowAssigner.java @@ -0,0 +1,102 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming.windowing; + +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.flink.streaming.StreamingElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.Set; +import java.util.function.Function; + +/** + * Assigns windows to element and extracts key and value. + * + * @param type of input data + * @param type of output key + * @param type of output value + * @param type of output window + */ +public class WindowAssigner + implements Function>, KeyedMultiWindowedElement>, + Serializable { + + private final Windowing windowing; + private final UnaryFunction keyExtractor; + private final UnaryFunction valueExtractor; + + private transient TimestampedElement reuse; + + public WindowAssigner(Windowing windowing, + UnaryFunction keyExtractor, + UnaryFunction valueExtractor) { + this.windowing = windowing; + this.keyExtractor = keyExtractor; + this.valueExtractor = valueExtractor; + } + + @Override + @SuppressWarnings("unchecked") + public KeyedMultiWindowedElement apply(StreamRecord> record) { + StreamingElement el = record.getValue(); + + if (reuse == null) { + reuse = new TimestampedElement(); + } + reuse.setTimestamp(record.getTimestamp()); + reuse.setStreamingElement(el); + Set windows = windowing.assignWindowsToElement(reuse); + + return new KeyedMultiWindowedElement<>( + keyExtractor.apply(el.getElement()), + valueExtractor.apply(el.getElement()), + windows); + } + + private static class TimestampedElement + implements WindowedElement { + + private long timestamp; + private StreamingElement element; + + @Override + public W getWindow() { + return element.getWindow(); + } + + @Override + public long getTimestamp() { + return timestamp; + } + + private void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public T getElement() { + return element.getElement(); + } + + private void setStreamingElement(StreamingElement element) { + this.element = element; + } + } +} diff --git a/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/FlinkElementTest.java b/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/batch/BatchElementTest.java similarity index 86% rename from euphoria-flink/src/test/java/cz/seznam/euphoria/flink/FlinkElementTest.java rename to euphoria-flink/src/test/java/cz/seznam/euphoria/flink/batch/BatchElementTest.java index 059010a7..c5de045e 100644 --- a/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/FlinkElementTest.java +++ b/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/batch/BatchElementTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.flink; +package cz.seznam.euphoria.flink.batch; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -23,12 +23,12 @@ import static org.junit.Assert.*; -public class FlinkElementTest { +public class BatchElementTest { @Test public void testSerializer() { - TypeSerializer serializer = - TypeInformation.of(FlinkElement.class) + TypeSerializer serializer = + TypeInformation.of(BatchElement.class) .createSerializer(new ExecutionConfig()); // must be POJO serializer for performance reasons diff --git a/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/StreamingElementTest.java b/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/StreamingElementTest.java new file mode 100644 index 00000000..25ead0fc --- /dev/null +++ b/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/StreamingElementTest.java @@ -0,0 +1,38 @@ +/** + * Copyright 2016 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.streaming; + +import cz.seznam.euphoria.flink.streaming.StreamingElement; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class StreamingElementTest { + + @Test + public void testSerializer() { + TypeSerializer serializer = + TypeInformation.of(StreamingElement.class) + .createSerializer(new ExecutionConfig()); + + // must be POJO serializer for performance reasons + assertTrue(serializer instanceof PojoSerializer); + } +} \ No newline at end of file