diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala index 126ed41fa..e5a4ac71d 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala @@ -1,20 +1,27 @@ package zio.kafka.consumer.internal -import org.apache.kafka.clients.consumer.{ ConsumerRecord, MockConsumer, OffsetResetStrategy } +import org.apache.kafka.clients.consumer.{ + ConsumerRebalanceListener, + ConsumerRecord, + MockConsumer, + OffsetResetStrategy +} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import zio._ -import zio.kafka.consumer.{ ConsumerSettings, Subscription } +import zio.kafka.ZIOSpecDefaultSlf4j import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.{ ConsumerSettings, Subscription } import zio.metrics.{ MetricState, Metrics } import zio.stream.{ Take, ZStream } import zio.test.TestAspect.withLiveClock import zio.test._ +import java.util import scala.jdk.CollectionConverters._ -object RunloopSpec extends ZIOSpecDefault { +object RunloopSpec extends ZIOSpecDefaultSlf4j { private type BinaryMockConsumer = MockConsumer[Array[Byte], Array[Byte]] private type PartitionsHub = Hub[Take[Throwable, PartitionAssignment]] @@ -93,6 +100,55 @@ object RunloopSpec extends ZIOSpecDefault { } } }, + test( + "runloop continues polling after a lost partition" + ) { + Diagnostics.SlidingQueue.make(100).flatMap { diagnostics => + var rebalanceListener: ConsumerRebalanceListener = null + + // Catches the rebalance listener so we can use it + val mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) { + override def subscribe( + topics: util.Collection[String], + listener: ConsumerRebalanceListener + ): Unit = { + rebalanceListener = listener + super.subscribe(topics, listener) + } + } + + withRunloop(diagnostics, mockConsumer) { (mockConsumer, partitionsHub, runloop) => + mockConsumer.schedulePollTask { () => + mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L), tp11 -> Long.box(0L)).asJava) + mockConsumer.rebalance(Seq(tp10, tp11).asJava) + } + mockConsumer.schedulePollTask { () => + rebalanceListener.onPartitionsLost(Seq(tp10, tp11).asJava) + } + mockConsumer.schedulePollTask { () => + mockConsumer.rebalance(Seq.empty.asJava) + mockConsumer.rebalance(Seq(tp10, tp11).asJava) + } + + for { + streamStream <- ZStream.fromHubScoped(partitionsHub) + _ <- runloop.addSubscription(Subscription.Topics(Set(tp10, tp11).map(_.topic()))) + result <- streamStream + .map(_.exit) + .flattenExitOption + .flattenChunks + .take(3) + .mapZIO { case (_, stream) => + stream.runHead + } + .runDrain + .timeout(10.seconds) + } yield assertTrue( + result.isDefined + ) // Test will not finish if polling did not continue after partitions lost + } + } + }, test("runloop retries poll upon AuthorizationException and AuthenticationException") { withRunloop() { (mockConsumer, partitionsHub, runloop) => mockConsumer.schedulePollTask { () => @@ -131,11 +187,13 @@ object RunloopSpec extends ZIOSpecDefault { } ) @@ withLiveClock - private def withRunloop(diagnostics: Diagnostics = Diagnostics.NoOp)( + private def withRunloop( + diagnostics: Diagnostics = Diagnostics.NoOp, + mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) + )( f: (BinaryMockConsumer, PartitionsHub, Runloop) => ZIO[Scope, Throwable, TestResult] ): ZIO[Scope, Throwable, TestResult] = ZIO.scoped { - val mockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) for { consumerAccess <- ConsumerAccess.make(mockConsumer) consumerScope <- ZIO.scope diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 32d0b162c..ed06b70da 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -236,7 +236,7 @@ private[consumer] final class Runloop private ( state <- currentStateRef.get lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp)) _ <- ZIO.foreachDiscard(lostStreams)(_.lost) - _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps)) + _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, lostStreams)) _ <- ZIO.logTrace(s"onLost done") } yield () ) @@ -560,6 +560,17 @@ private[consumer] final class Runloop private ( ended = endedStreams.map(_.tp).toSet ) ) + // Ensure that all assigned partitions have a stream and no streams are present for unassigned streams + _ <- + ZIO + .logWarning( + s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}" + ) + .when( + currentAssigned != updatedAssignedStreams + .map(_.tp) + .toSet || currentAssigned.size != updatedAssignedStreams.size + ) } yield Runloop.PollResult( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps, @@ -830,11 +841,12 @@ object Runloop { endedStreams = this.endedStreams ++ endedStreams ) - def onLost(lost: Set[TopicPartition]): RebalanceEvent = + def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = copy( wasInvoked = true, assignedTps = assignedTps -- lost, - lostTps = lostTps ++ lost + lostTps = lostTps ++ lost, + endedStreams = this.endedStreams ++ endedStreams ) }