Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#27 [euphoria-flink] Rewrite windowing to native implementation of StreamOperator #50

Merged
merged 3 commits into from
Mar 20, 2017

Conversation

vanekjar
Copy link
Contributor

@vanekjar vanekjar commented Mar 19, 2017

Finally, my PR is ready.

The code for handling windows is still over-complicated, but I deleted more code than actually added which is always a good sign.

Instead of using ProcessFunction I directly implemented custom StreamOperator. It turned out during implementation that ProcessFunction API is not so powerful for our use case - for example it's not possible to access the current key in callbacks.

There are a few pitfalls in the current PR that I am not very satisfied with:

  • Rewriting windowing haven't brought any performance benefit. Runtime is more or less the same as with previous implementation. On the other hand it now opens new opportunities to optimize - for example removing duplicate timestamp from WindowedElement flowing through the pipeline and use the built-in Flink timestamp instead. This will come in another PR soon.

  • I had to remove the functionality of flushing remaining windows in case of bounded stream end. So far all remaining windows were flushed to output in case of EOS. It proved that tracking of all existing windows had a huge performance drawback considering the registered window set must be persisted for fault-tolerance. I left that functionality only for unit testing (works for TestFlinkExecutor in local mode). Now it opens a discussion if we need this functionality of bounded stream in production.

Thanks for the review.


ArrayList<Long> assignerTimes = new ArrayList<>(TETETS_SEEN_TIMES_ASSIGNER);
assignerTimes.sort(Comparator.naturalOrder());
assertEquals(asList(15_000L, 19_999L, 25_000L, 29_999L), assignerTimes);
assertEquals(asList(19_999L, 19_999L, 29_999L, 29_999L), assignerTimes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we need this test anymore. i think testElementTimestamp should suffice.

.createSerializer(new ExecutionConfig());

// must be POJO serializer for performance reasons
assertTrue(serializer instanceof PojoSerializer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! 👍

import java.util.Iterator;
import java.util.Set;

class ReduceByKeyTranslator implements StreamingOperatorTranslator<ReduceByKey> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no specialization for the ReduceByKey operator in the streaming executor anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easier to maintain just one implementation. Flink is internally doing the same job with incremental reducer as we do now in WindowOperator.
Anyway the performance of benchmark using ReduceByKey is the same with the new implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree it's easier to maintain only one implementation. no doubt about that. what i worry about a bit is the difference between serializing the state (into the backend storage) of a combining RBK (a single value) vs. serializing the same value in a list (of size one.) my guess is, we cannot tell this difference now, since other factors are likely to hide this overhead. let's stick with one implementation and optimize later - if necessary.

Copy link
Contributor Author

@vanekjar vanekjar Mar 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your concern about a difference between ValueStorage and ListStorage. That can really make a difference.
There is a chance to optimize the internals of ReduceByKey.ReduceState in a way that it will use ValueStorage in case it's combinable. Also the storage is accessed (serialized) multiple times now when adding to state, this can be also avoided.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if necessary, i think we can even optimize this later directly in the api layer in #getBasicOps of the RBK operator. this would allow us to specialize the case without having to maintain separate impls in the executors.

public StreamingWindowedElement(W window, long timestamp, T element) {
super(window, timestamp, element);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many thanks for the clean-up!

DataStream<WindowedElement<?, Pair>> reduced = (DataStream) windowed.keyBy(new KeyExtractor())
.transform(operator.getName(), TypeInformation.of(WindowedElement.class), new WindowOperator<>(
windowing, stateFactory, stateCombiner, context.isLocalMode()))
.setParallelism(operator.getParallelism());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would like to see support for running this "as is", as well as with the value extraction and window assignment functionality executing only after the shuffle (maybe some global parameter to the translation layer). at least for our benchmarking this will be necessary. what do you think it would take to support both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely agree it would be useful. But I am not sure if this is not the part of #47

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, right. let's introduce that with the mentioned ticket later.

List<State> states = new ArrayList<>();
states.add(getWindowState(stateResultWindow));
mergedStateWindows.forEach(sw -> states.add(getWindowState(sw)));
stateCombiner.apply(states);
Copy link
Contributor

@xitep xitep Mar 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in regards to the above FIXME, i wanted to suggest to change the type of the state-combiner. however, now i see it's a CombinableReduceFunction which does have a return value that is supposed to replace the merged states. in the inmem as well as in the spark executor (e.g. GroupedReducer) we are doing so. if i'm not mistaken applying the same technique here should resolve the above FIXME.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about that. The returned state from CombinableReduceFunction doesn't matter at all. Since our state is basically "stateless", it depends if the resulting state stored the merged value to the appropriate persistent storage using correct namespace.
In this case we need the state combiner to store the result in stateResultWindow namespace. And I can't see any method how to ensure that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thanks for the explanation! now i see it. we'll follow up on this later - combinable-reduce-function then doesn't seem right to me at this place. may i ask you to set up a ticket for that FIXME?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue created #51

*/
class MergingWindowSet<W extends Window> {

private final MergingWindowing windowing;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is quite cool 😎! do you think we can re-use it to replace parts of GroupedReducer in euphoria-core?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, github is playing tricks on me ;) ... that comment was meant to address the class as a whole.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think reusing this class would bring any benefit. It's too complicated because the window set must be persisted after each step in streaming. Most of the code is performance optimization to avoid costly persistent state allocation. This is not the case in GroupReducer where everything is processed in a plain HashMap in memory.

public TriggerResult onElement(long time, WID window, TriggerContext ctx) {
// FIXME batch window shouldn't be used in stream flow in the future
// issue #38 on GitHub
if (window instanceof Batch.BatchWindow) return TriggerResult.NOOP;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about throwing an exception here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there still exists unit test with unbounded source and no windowing. I think it needs to be resolved with the issue #38

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fine with me.

@@ -22,6 +22,7 @@
import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like an unused import

windowing, keyExtractor, valueExtractor, eventTimeAssigner))
.setParallelism(operator.getParallelism());

DataStream<WindowedElement<?, Pair>> reduced = (DataStream) windowed.keyBy(new KeyExtractor())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we're better off with .keyBy("key"); flink will then derive the type information from the input data stream automatically; it'll still be "object" at this moment since we don't supply enough type information through the WindowAssigner, but that's about to come in some future.

Copy link
Contributor Author

@vanekjar vanekjar Mar 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea, but unfortunately doesn't work. Or at least I don't know how to make it work. Object is not a key type.

org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm ... we'll need more type information :/ anyway, thanks for having a try!

windowState = getWindowState(stateWindow);
} else {
windowState = getWindowState(window);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

potential micro-optimization possibility here: don't lookup out the window state from the state table when not necessary (e.g. tr == NOOP)

@xitep
Copy link
Contributor

xitep commented Mar 20, 2017

thank you so much for cleaning up old stuff! feel free to merge into master. it looks very good to me.

the fact that the performance didn't get better means we didn't get worse! :) (our benchmark is just a single scenario and it happens to hit a particular bottleneck which is present in both versions.) it be interesting to compare them without the bottleneck eliminated.

@vanekjar vanekjar merged commit 3f67275 into master Mar 20, 2017
@vanekjar vanekjar deleted the 27/ProcessFunction branch March 20, 2017 16:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants