From 659129ccd81d979de77dd89a09807aac8097c633 Mon Sep 17 00:00:00 2001 From: Leon Burdinov Date: Sun, 23 Apr 2023 08:12:42 +0300 Subject: [PATCH] [greyhound] Cooperative Rebalance fix (#34153) * [greyhound] code cleanup #pr * fix for paused partitions unrevoked in cooperative GitOrigin-RevId: 7cf8fe80c2ac9108b3ebe72472355fe73158df9a --- .../greyhound/core/consumer/Dispatcher.scala | 11 ++++------ .../greyhound/core/consumer/EventLoop.scala | 20 +++++++++++-------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala index cbe8d883..b618d0ba 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala @@ -93,13 +93,10 @@ object Dispatcher { override def revoke(partitions: Set[TopicPartition]): URIO[GreyhoundMetrics, Unit] = workers .modify { workers => - partitions.foldLeft((List.empty[(TopicPartition, Worker)], workers)) { - case ((revoked, remaining), partition) => - remaining.get(partition) match { - case Some(worker) => ((partition, worker) :: revoked, remaining - partition) - case None => (revoked, remaining) - } - } + val revoked = workers.filterKeys(partitions.contains) + val remaining = workers -- partitions + + (revoked, remaining) } .flatMap(shutdownWorkers) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala index 5985f80d..0d6be6ee 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala @@ -170,12 +170,14 @@ object EventLoop { consumer: Consumer, partitions: Set[TopicPartition] )(implicit trace: Trace): URIO[GreyhoundMetrics, DelayedRebalanceEffect] = { - pausedPartitionsRef.set(Set.empty) *> - dispatcher.revoke(partitions).timeout(config.drainTimeout).flatMap { drained => - ZIO.when(drained.isEmpty)( - report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumer.config.consumerAttributes)) - ) - } *> commitOffsetsOnRebalance(consumer0, offsets) + for { + _ <- pausedPartitionsRef.update(_ -- partitions) + isRevokeTimedOut <- dispatcher.revoke(partitions).timeout(config.drainTimeout).map(_.isEmpty) + _ <- ZIO.when(isRevokeTimedOut)( + report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumer.config.consumerAttributes)) + ) + delayedRebalanceEffect <- commitOffsetsOnRebalance(consumer0, offsets) + } yield delayedRebalanceEffect } override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] = @@ -258,7 +260,8 @@ case class EventLoopConfig( highWatermark: Int, rebalanceListener: RebalanceListener[Any], delayResumeOfPausedPartition: Long, - startPaused: Boolean + startPaused: Boolean, + cooperativeRebalanceEnabled: Boolean ) object EventLoopConfig { @@ -269,7 +272,8 @@ object EventLoopConfig { highWatermark = 256, rebalanceListener = RebalanceListener.Empty, delayResumeOfPausedPartition = 0, - startPaused = false + startPaused = false, + cooperativeRebalanceEnabled = false ) }