From c188681d8dc2f1ca4945e9bf234a92011c2e292b Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Wed, 24 Jul 2019 15:08:02 +0200 Subject: [PATCH] Improve ProcessFunctionTimers example (Chapter 6) --- .../chapter6/ProcessFunctionTimers.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/github/streamingwithflink/chapter6/ProcessFunctionTimers.scala b/src/main/scala/io/github/streamingwithflink/chapter6/ProcessFunctionTimers.scala index f67c1ff..7a02ee0 100644 --- a/src/main/scala/io/github/streamingwithflink/chapter6/ProcessFunctionTimers.scala +++ b/src/main/scala/io/github/streamingwithflink/chapter6/ProcessFunctionTimers.scala @@ -64,8 +64,12 @@ class TempIncreaseAlertFunction // update last temperature lastTemp.update(r.temperature) - val curTimerTimestamp = currentTimer.value(); - if (prevTemp == 0.0 || r.temperature < prevTemp) { + val curTimerTimestamp = currentTimer.value() + if (prevTemp == 0.0) { + // first sensor reading for this key. + // we cannot compare it with a previous value. + } + else if (r.temperature < prevTemp) { // temperature decreased. Delete current timer. ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp) currentTimer.clear()