From 0ed3a67b116a3960fccc639adb726a89988c54da Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 10 Jun 2024 21:24:59 +0200 Subject: [PATCH] A lost partition is no longer fatal Before 2.7.0 a lost partition was treated as a revoked partition. Since the partition is already assigned to another node, this potentially leads to duplicate processing of records. Zio-kafka 2.7.0 assumes that a lost partition is a fatal event. It leads to an interrupt in the stream that handles the partition. The other streams are ended, and the consumer closes with an error. Usually, a full program restart is needed to resume consuming. It should be noted that stream processing is not interrupted immediately. Only when the stream requests new records, the interrupt is observed. Unfortunately, we have not found a clean way to interrupt the stream consumer directly. Meanwhile, from bug reports, we understand that partitions are usually lost when no records have been received for a long time. In conclusion, 1) it is not possible to immediately interrupt user stream processing, and 2) it most likely not needed anyway because the stream is awaiting new records. With this change, a lost partition no longer leads to an interrupt. Instead, we first drain the stream's internal queue (just to be sure, it is probably already empty), and then we end it gracefully (that is, without error). Other streams are not affected, the consumer will continue to work. When `rebalanceSafeCommits` is enabled, lost partitions do _not_ participate like revoked partitions do. So lost partitions cannot hold up a rebalance. Fixes #1233 and #1250. --- .../consumer/internal/PartitionStreamControl.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 550bc4349..8f3262e35 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -82,12 +82,6 @@ final class PartitionStreamControl private ( private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] = queueInfoRef.get.map(_.deadlineExceeded(now)) - /** To be invoked when the partition was lost. */ - private[internal] def lost: UIO[Boolean] = { - val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace - interruptionPromise.fail(lostException) - } - /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Boolean] = { val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " + @@ -97,6 +91,14 @@ final class PartitionStreamControl private ( interruptionPromise.fail(consumeTimeout) } + /** To be invoked when the partition was lost. It clears the queue end ends the stream. */ + private[internal] def lost: UIO[Unit] = + logAnnotate { + ZIO.logDebug(s"Partition ${tp.toString} lost") *> + dataQueue.takeAll *> + dataQueue.offer(Take.end).unit + } + /** To be invoked when the partition was revoked or otherwise needs to be ended. */ private[internal] def end: ZIO[Any, Nothing, Unit] = logAnnotate {