From 5e37292c2f152e744a22c50da7b4e5aecbafd9aa Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sat, 16 Nov 2024 11:16:18 +0100 Subject: [PATCH 1/2] Prevent unlimited enqueueing of CommitAvailable commands --- .../zio/kafka/consumer/internal/Runloop.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 8d45f8e55..b017c7a3d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -33,7 +33,8 @@ private[consumer] final class Runloop private ( maxStreamPullInterval: Duration, maxRebalanceDuration: Duration, currentStateRef: Ref[State], - committedOffsetsRef: Ref[CommitOffsets] + committedOffsetsRef: Ref[CommitOffsets], + commitAvailable: Ref[Boolean] ) { private val commitTimeout = settings.commitTimeout private val commitTimeoutNanos = settings.commitTimeout.toNanos @@ -327,10 +328,11 @@ private[consumer] final class Runloop private ( for { p <- Promise.make[Throwable, Unit] startTime = java.lang.System.nanoTime() - _ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p)) - _ <- commandQueue.offer(RunloopCommand.CommitAvailable) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + _ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p)) + commitAvailable <- commitAvailable.getAndSet(true) + _ <- commandQueue.offer(RunloopCommand.CommitAvailable).unless(commitAvailable) + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) endTime = java.lang.System.nanoTime() latency = (endTime - startTime).nanoseconds _ <- consumerMetrics.observeCommit(latency) @@ -819,6 +821,7 @@ private[consumer] final class Runloop private ( .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { + _ <- commitAvailable.set(false) commitCommands <- commitQueue.takeAll _ <- ZIO.logDebug( s"Processing ${commitCommands.size} commits," + @@ -958,6 +961,7 @@ object Runloop { lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) + commitAvailable <- Ref.make(false) committedOffsetsRef <- Ref.make(CommitOffsets.empty) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) executor <- ZIO.executor @@ -974,7 +978,8 @@ object Runloop { maxStreamPullInterval = maxStreamPullInterval, maxRebalanceDuration = maxRebalanceDuration, currentStateRef = currentStateRef, - committedOffsetsRef = committedOffsetsRef + committedOffsetsRef = committedOffsetsRef, + commitAvailable = commitAvailable ) _ <- ZIO.logDebug("Starting Runloop") From 6afa3efad87d702d10a707d225deb9b238b12a06 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 26 Nov 2024 21:13:06 +0100 Subject: [PATCH 2/2] Renames + comment --- .../zio/kafka/consumer/internal/Runloop.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 39570b7c7..6d8d9178f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -23,7 +23,7 @@ private[consumer] final class Runloop private ( sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, commandQueue: Queue[RunloopCommand], - commitAvailableQueue: Queue[Boolean], + commitAvailable: Queue[Boolean], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, maxStreamPullInterval: Duration, @@ -489,7 +489,7 @@ private[consumer] final class Runloop private ( ZStream .fromQueue(commandQueue) - .merge(ZStream.fromQueue(commitAvailableQueue).as(RunloopCommand.CommitAvailable)) + .merge(ZStream.fromQueue(commitAvailable).as(RunloopCommand.CommitAvailable)) .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { @@ -583,10 +583,11 @@ object Runloop { partitionsHub: Hub[Take[Throwable, PartitionAssignment]] ): URIO[Scope, Runloop] = for { - _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) - commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - commitAvailableQueue <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) + commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) + // A one-element dropping queue used to signal between two fibers that new commits are pending and we should poll + commitAvailable <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) + lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) @@ -597,7 +598,7 @@ object Runloop { settings.commitTimeout, diagnostics, metrics, - commitAvailableQueue.offer(true).unit, + commitAvailable.offer(true).unit, sameThreadRuntime ) rebalanceCoordinator = new RebalanceCoordinator( @@ -614,7 +615,7 @@ object Runloop { sameThreadRuntime = sameThreadRuntime, consumer = consumer, commandQueue = commandQueue, - commitAvailableQueue = commitAvailableQueue, + commitAvailable = commitAvailable, partitionsHub = partitionsHub, diagnostics = diagnostics, maxStreamPullInterval = maxStreamPullInterval,