diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala index cbe8d883..b618d0ba 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala @@ -93,13 +93,10 @@ object Dispatcher { override def revoke(partitions: Set[TopicPartition]): URIO[GreyhoundMetrics, Unit] = workers .modify { workers => - partitions.foldLeft((List.empty[(TopicPartition, Worker)], workers)) { - case ((revoked, remaining), partition) => - remaining.get(partition) match { - case Some(worker) => ((partition, worker) :: revoked, remaining - partition) - case None => (revoked, remaining) - } - } + val revoked = workers.filterKeys(partitions.contains) + val remaining = workers -- partitions + + (revoked, remaining) } .flatMap(shutdownWorkers) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala index 5985f80d..0d6be6ee 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala @@ -170,12 +170,14 @@ object EventLoop { consumer: Consumer, partitions: Set[TopicPartition] )(implicit trace: Trace): URIO[GreyhoundMetrics, DelayedRebalanceEffect] = { - pausedPartitionsRef.set(Set.empty) *> - dispatcher.revoke(partitions).timeout(config.drainTimeout).flatMap { drained => - ZIO.when(drained.isEmpty)( - report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumer.config.consumerAttributes)) - ) - } *> commitOffsetsOnRebalance(consumer0, offsets) + for { + _ <- pausedPartitionsRef.update(_ -- partitions) + isRevokeTimedOut <- dispatcher.revoke(partitions).timeout(config.drainTimeout).map(_.isEmpty) + _ <- ZIO.when(isRevokeTimedOut)( + report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumer.config.consumerAttributes)) + ) + delayedRebalanceEffect <- commitOffsetsOnRebalance(consumer0, offsets) + } yield delayedRebalanceEffect } override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] = @@ -258,7 +260,8 @@ case class EventLoopConfig( highWatermark: Int, rebalanceListener: RebalanceListener[Any], delayResumeOfPausedPartition: Long, - startPaused: Boolean + startPaused: Boolean, + cooperativeRebalanceEnabled: Boolean ) object EventLoopConfig { @@ -269,7 +272,8 @@ object EventLoopConfig { highWatermark = 256, rebalanceListener = RebalanceListener.Empty, delayResumeOfPausedPartition = 0, - startPaused = false + startPaused = false, + cooperativeRebalanceEnabled = false ) }