Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chapter 6 example 6-3 watermark ts will overflow when job recovery from failure #5

Open
dispensable opened this issue Jul 17, 2020 · 0 comments

Comments

@dispensable
Copy link

class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
// 1 min in ms
val bound: Long = 60 * 1000
// the maximum observed timestamp
var maxTs: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
// update maximum timestamp
maxTs = maxTs.max(r.timestamp)
// return record timestamp
r.timestamp
}
}

When the task recovery from the cluster failure, the maxTS is initialized with Long.MinValue and the task manager will call getCurrentWatermark method which will - 60 * 1000 and make the maxTS to 9223372036854715808. This invalid ts will block the stream work with the web Dashboard all green but actually they will never push the watermark forward. I know this is just a example but maybe a little warning about this example's job recovery behiver will make the reader less frustrated with their code?

@dispensable dispensable changed the title Chapter 6 example 6-3 Chapter 6 example 6-3 watermark ts will overflow when job recovery from failure Jul 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant