Skip to content

Commit

Permalink
Merge pull request #59 from seznam/vanekjar/FlinkRemoveTimestamp
Browse files Browse the repository at this point in the history
#14 [euphoria-flink] Don't send timestamp along with each element.
  • Loading branch information
xitep authored Mar 27, 2017
2 parents 2a766d3 + 29538db commit 7e8cf21
Show file tree
Hide file tree
Showing 25 changed files with 377 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.flink.batch.BatchElement;
import cz.seznam.euphoria.flink.streaming.StreamingElement;
import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -138,7 +140,8 @@ private Set<Class<?>> getClassesToRegister(Set<Class<?>> registeredClasses) {
ret.add(TimeInterval.class);

ret.add(Pair.class);
ret.add(FlinkElement.class);
ret.add(StreamingElement.class);
ret.add(BatchElement.class);
ret.add(KeyedMultiWindowedElement.class);
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ protected FlowTranslator createStreamTranslator(Settings settings,
/**
* See {@link ExecutionConfig#disableObjectReuse()}
* and {@link ExecutionConfig#disableObjectReuse()}.
*
* @param reuse set TRUE for enabling object reuse
*
* @return this instance (for method chaining purposes)
*/
public FlinkExecutor setObjectReuse(boolean reuse){
this.objectReuse = reuse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.flink;
package cz.seznam.euphoria.flink.batch;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement;

/**
* Single element flowing through Flink pipeline. Every such element
* Single element flowing through Flink batch pipeline. Every such element
* is associated with a window identifier and timestamp.
* @param <W> type of the assigned window
* @param <T> type of the data element
*/
public class FlinkElement<W extends Window, T> implements WindowedElement<W, T> {
public class BatchElement<W extends Window, T> implements WindowedElement<W, T> {

private Window window;
private long timestamp;
private T element;

// This class needs to ne POJO for effective serialization
public FlinkElement() {
public BatchElement() {
}

public FlinkElement(W window, long timestamp, T element) {
public BatchElement(W window, long timestamp, T element) {
this.window = window;
this.timestamp = timestamp;
this.element = element;
Expand All @@ -51,20 +51,20 @@ public void setWindow(W window) {
}

@Override
public long getTimestamp() {
return timestamp;
public T getElement() {
return element;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
public void setElement(T element) {
this.element = element;
}

@Override
public T getElement() {
return element;
public long getTimestamp() {
return timestamp;
}

public void setElement(T element) {
this.element = element;
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.flink.functions;
package cz.seznam.euphoria.flink.batch;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.functional.UnaryFunctor;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.flink.FlinkElement;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;

import java.util.Objects;

public class UnaryFunctorWrapper<WID extends Window, IN, OUT>
implements FlatMapFunction<FlinkElement<WID, IN>,
FlinkElement<WID, OUT>>,
ResultTypeQueryable<FlinkElement<WID, OUT>> {
public class BatchUnaryFunctorWrapper<WID extends Window, IN, OUT>
implements FlatMapFunction<BatchElement<WID, IN>,
BatchElement<WID, OUT>>,
ResultTypeQueryable<BatchElement<WID, OUT>> {

private final UnaryFunctor<IN, OUT> f;

public UnaryFunctorWrapper(UnaryFunctor<IN, OUT> f) {
public BatchUnaryFunctorWrapper(UnaryFunctor<IN, OUT> f) {
this.f = Objects.requireNonNull(f);
}

@Override
public void flatMap(FlinkElement<WID, IN> value,
Collector<FlinkElement<WID, OUT>> out)
public void flatMap(BatchElement<WID, IN> value,
Collector<BatchElement<WID, OUT>> out)
throws Exception
{
f.apply(value.getElement(), new Context<OUT>() {
@Override
public void collect(OUT elem) {
out.collect(new FlinkElement<>(
out.collect(new BatchElement<>(
value.getWindow(), value.getTimestamp(), elem));
}
@Override
Expand All @@ -57,7 +56,7 @@ public Object getWindow() {

@SuppressWarnings("unchecked")
@Override
public TypeInformation<FlinkElement<WID, OUT>> getProducedType() {
return TypeInformation.of((Class) FlinkElement.class);
public TypeInformation<BatchElement<WID, OUT>> getProducedType() {
return TypeInformation.of((Class) BatchElement.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import cz.seznam.euphoria.core.client.functional.UnaryFunctor;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.flink.FlinkElement;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.functions.UnaryFunctorWrapper;
import org.apache.flink.api.java.DataSet;

class FlatMapTranslator implements BatchOperatorTranslator<FlatMap> {
Expand All @@ -31,8 +29,8 @@ public DataSet<?> translate(FlinkOperator<FlatMap> operator,
DataSet<?> input = context.getSingleInputStream(operator);
UnaryFunctor mapper = operator.getOriginalOperator().getFunctor();
return input
.flatMap(new UnaryFunctorWrapper<>(mapper))
.returns((Class) FlinkElement.class)
.flatMap(new BatchUnaryFunctorWrapper<>(mapper))
.returns((Class) BatchElement.class)
.setParallelism(operator.getParallelism())
.name(operator.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.flink.FlinkElement;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.Utils;
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
Expand Down Expand Up @@ -78,14 +77,14 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
final UnaryFunction udfValue = origOperator.getValueExtractor();

// ~ extract key/value from input elements and assign windows
DataSet<FlinkElement<Window, Pair>> tuples;
DataSet<BatchElement<Window, Pair>> tuples;
{
// FIXME require keyExtractor to deliver `Comparable`s

UnaryFunction<Object, Long> timeAssigner = origOperator.getEventTimeAssigner();
FlatMapOperator<Object, FlinkElement<Window, Pair>> wAssigned =
FlatMapOperator<Object, BatchElement<Window, Pair>> wAssigned =
input.flatMap((i, c) -> {
FlinkElement wel = (FlinkElement) i;
BatchElement wel = (BatchElement) i;
if (timeAssigner != null) {
long stamp = timeAssigner.apply(wel.getElement());
wel.setTimestamp(stamp);
Expand All @@ -96,18 +95,18 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
long stamp = (wid instanceof TimedWindow)
? ((TimedWindow) wid).maxTimestamp()
: wel.getTimestamp();
c.collect(new FlinkElement<>(
c.collect(new BatchElement<>(
wid, stamp, Pair.of(udfKey.apply(el), udfValue.apply(el))));
}
});
tuples = wAssigned
.name(operator.getName() + "::map-input")
.setParallelism(operator.getParallelism())
.returns(new TypeHint<FlinkElement<Window, Pair>>() {});
.returns(new TypeHint<BatchElement<Window, Pair>>() {});
}

// ~ reduce the data now
Operator<FlinkElement<Window, Pair>, ?> reduced;
Operator<BatchElement<Window, Pair>, ?> reduced;
reduced = tuples
.groupBy(new RBKKeySelector())
.reduce(new RBKReducer(reducer));
Expand All @@ -125,8 +124,8 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
.partitionCustom(
new PartitionerWrapper<>(origOperator.getPartitioning().getPartitioner()),
Utils.wrapQueryable(
(KeySelector<FlinkElement<Window, Pair>, Comparable>)
(FlinkElement<Window, Pair> we) -> (Comparable) we.getElement().getKey(),
(KeySelector<BatchElement<Window, Pair>, Comparable>)
(BatchElement<Window, Pair> we) -> (Comparable) we.getElement().getKey(),
Comparable.class))
.setParallelism(operator.getParallelism());
}
Expand All @@ -141,18 +140,18 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
*/
@SuppressWarnings("unchecked")
static class RBKKeySelector
implements KeySelector<FlinkElement<Window, Pair>, Tuple2<Comparable, Comparable>> {
implements KeySelector<BatchElement<Window, Pair>, Tuple2<Comparable, Comparable>> {

@Override
public Tuple2<Comparable, Comparable> getKey(
FlinkElement<Window, Pair> value) {
BatchElement<Window, Pair> value) {

return new Tuple2(value.getWindow(), value.getElement().getKey());
}
}

static class RBKReducer
implements ReduceFunction<FlinkElement<Window, Pair>> {
implements ReduceFunction<BatchElement<Window, Pair>> {

final UnaryFunction<Iterable, Object> reducer;

Expand All @@ -161,11 +160,11 @@ static class RBKReducer
}

@Override
public FlinkElement<Window, Pair>
reduce(FlinkElement<Window, Pair> p1, FlinkElement<Window, Pair> p2) {
public BatchElement<Window, Pair>
reduce(BatchElement<Window, Pair> p1, BatchElement<Window, Pair> p2) {

Window wid = p1.getWindow();
return new FlinkElement<>(
return new BatchElement<>(
wid,
Math.max(p1.getTimestamp(), p2.getTimestamp()),
Pair.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.collect.Iterables;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.StateFactory;
Expand All @@ -29,7 +28,6 @@
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.greduce.GroupReducer;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.FlinkElement;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.Utils;
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
Expand Down Expand Up @@ -75,9 +73,9 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
UnaryFunction<Object, Long> timeAssigner = origOperator.getEventTimeAssigner();

// FIXME require keyExtractor to deliver `Comparable`s
DataSet<FlinkElement> wAssigned =
DataSet<BatchElement> wAssigned =
input.flatMap((i, c) -> {
FlinkElement wel = (FlinkElement) i;
BatchElement wel = (BatchElement) i;

// assign timestamp if timeAssigner defined
if (timeAssigner != null) {
Expand All @@ -86,27 +84,27 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
Set<Window> assigned = windowing.assignWindowsToElement(wel);
for (Window wid : assigned) {
Object el = wel.getElement();
c.collect(new FlinkElement<>(
c.collect(new BatchElement<>(
wid,
wel.getTimestamp(),
Pair.of(udfKey.apply(el), udfValue.apply(el))));
}
})
.returns(FlinkElement.class)
.returns(BatchElement.class)
.name(operator.getName() + "::map-input")
.setParallelism(operator.getParallelism());

// ~ reduce the data now
DataSet<FlinkElement<?, Pair>> reduced =
DataSet<BatchElement<?, Pair>> reduced =
wAssigned.groupBy((KeySelector)
Utils.wrapQueryable(
// ~ FIXME if the underlying windowing is "non merging" we can group by
// "key _and_ window", thus, better utilizing the available resources
(FlinkElement<?, Pair> we) -> (Comparable) we.getElement().getFirst(),
(BatchElement<?, Pair> we) -> (Comparable) we.getElement().getFirst(),
Comparable.class))
.sortGroup(Utils.wrapQueryable(
(KeySelector<FlinkElement<?, ?>, Long>)
FlinkElement::getTimestamp, Long.class),
(KeySelector<BatchElement<?, ?>, Long>)
BatchElement::getTimestamp, Long.class),
Order.ASCENDING)
.reduceGroup(new RSBKReducer(origOperator, stateStorageProvider, windowing))
.setParallelism(operator.getParallelism())
Expand All @@ -118,8 +116,8 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
.partitionCustom(new PartitionerWrapper<>(
origOperator.getPartitioning().getPartitioner()),
Utils.wrapQueryable(
(KeySelector<FlinkElement<?, Pair>, Comparable>)
(FlinkElement<?, Pair> we) -> (Comparable) we.getElement().getKey(),
(KeySelector<BatchElement<?, Pair>, Comparable>)
(BatchElement<?, Pair> we) -> (Comparable) we.getElement().getKey(),
Comparable.class))
.setParallelism(operator.getParallelism());
}
Expand All @@ -128,8 +126,8 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
}

static class RSBKReducer
implements GroupReduceFunction<FlinkElement<?, Pair>, FlinkElement<?, Pair>>,
ResultTypeQueryable<FlinkElement<?, Pair>>
implements GroupReduceFunction<BatchElement<?, Pair>, BatchElement<?, Pair>>,
ResultTypeQueryable<BatchElement<?, Pair>>
{
private final StateFactory<?, State> stateFactory;
private final CombinableReduceFunction<State> stateCombiner;
Expand All @@ -152,27 +150,27 @@ static class RSBKReducer

@Override
@SuppressWarnings("unchecked")
public void reduce(Iterable<FlinkElement<?, Pair>> values,
org.apache.flink.util.Collector<FlinkElement<?, Pair>> out)
public void reduce(Iterable<BatchElement<?, Pair>> values,
org.apache.flink.util.Collector<BatchElement<?, Pair>> out)
{
GroupReducer reducer = new GroupReducer<>(
stateFactory,
FlinkElement::new,
BatchElement::new,
stateCombiner,
stateStorageProvider,
windowing,
trigger,
elem -> out.collect((FlinkElement) elem));
for (FlinkElement value : values) {
elem -> out.collect((BatchElement) elem));
for (BatchElement value : values) {
reducer.process(value);
}
reducer.close();
}

@Override
@SuppressWarnings("unchecked")
public TypeInformation<FlinkElement<?, Pair>> getProducedType() {
return TypeInformation.of((Class) FlinkElement.class);
public TypeInformation<BatchElement<?, Pair>> getProducedType() {
return TypeInformation.of((Class) BatchElement.class);
}
}
}
Loading

0 comments on commit 7e8cf21

Please sign in to comment.