-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#14 [euphoria-flink] Don't send timestamp along with each element. #59
Conversation
…e rather internal Flink record's timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Superb! Apart from reducing the amount of data carried along each element we also avoid the second (and redundant) call the event-time-assigner! Very 🆒 👍 As usually I've left some remarks behind - hopefully relevant.
* | ||
* @param reuse set TRUE for enabling object reuse | ||
* | ||
* @return this instance (for method chaining purposes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
public TypeInformation<FlinkElement<WID, OUT>> getProducedType() { | ||
return TypeInformation.of((Class) FlinkElement.class); | ||
public TypeInformation<BatchElement<WID, OUT>> getProducedType() { | ||
return TypeInformation.of((Class) BatchElement.class); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to rename the whole class to BatchUnaryFunctorWrapper
(similarly to StreamingUnaryFunctorWrapper
) and move it directly into the "batch" package.
@@ -33,7 +32,6 @@ | |||
|
|||
return context.getExecutionEnvironment() | |||
.addSource(new DataSourceWrapper<>(ds)) | |||
.setParallelism(operator.getParallelism()) | |||
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); | |||
.setParallelism(operator.getParallelism()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this resolves #37, does it? If so, do we have a test covering a flow purely based on ingestion time, i.e. a flow with a stateful operator without custom event time function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid it doesn't. We're still running using ingestion
time characteristics, not processing
. So the periodic watermarks will be send.
Also I think testing of ingestion
or processing
may be quite tricky because of flipping tests.
} | ||
reuse.setTimestamp(record.getTimestamp()); | ||
reuse.setElement(el); | ||
Set windows = windowing.assignWindowsToElement(reuse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if we shouldn't call reuse.setElement(null)
after the call to assignWindowsToElement
to avoid referencing the last processed streaming element. but it probably makes no difference.
|
||
public void setElement(StreamingElement<W, T> element) { | ||
this.element = element; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look good to me :/ at least it's surprising from a code perspective. the getter returns some type T
while the setter takes a StreamElement<W, T>
... can we maybe call the setter explicitly setStreamingElement
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good observation! This appeared as a result of some name refactoring. Thanks 👍
private StreamingElement<W, T> element; | ||
|
||
public TimestampedElement() { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess we can drop this and thus shorten the code a little bit.
private final WindowAssigner windowAssigner; | ||
|
||
private WindowAssignerOperator(WindowAssigner windowAssigner) { | ||
this.windowAssigner = windowAssigner; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen Flink's StreamMap
(which was previously used for the work done in this new WindowAssiginerOperator
) uses chainingStrategy = ChainingStrategy.ALWAYS;
while the default value is .HEAD
) Can you try that out and see if it has any effect on the executed DAG?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! This indeed helped to remove one stage/shuffle from the DAG resulting in even better performance. Thanks 👍
Thanks! |
#14 [euphoria-flink] Don't send timestamp along with each element.
In Flink streaming executor a timestamp is not sent with each element anymore. Instead it relies on internal Flink timestamps.
Seems it introduces noticeable performance benefits.