Skip to content

Commit

Permalink
[greyhound] Cooperative Rebalance fix (#34153)
Browse files Browse the repository at this point in the history
* [greyhound] code cleanup #pr

* fix for paused partitions unrevoked in cooperative

GitOrigin-RevId: 7cf8fe80c2ac9108b3ebe72472355fe73158df9a
  • Loading branch information
leonbur authored and wix-oss committed Sep 3, 2023
1 parent 693ab11 commit 659129c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,10 @@ object Dispatcher {
override def revoke(partitions: Set[TopicPartition]): URIO[GreyhoundMetrics, Unit] =
workers
.modify { workers =>
partitions.foldLeft((List.empty[(TopicPartition, Worker)], workers)) {
case ((revoked, remaining), partition) =>
remaining.get(partition) match {
case Some(worker) => ((partition, worker) :: revoked, remaining - partition)
case None => (revoked, remaining)
}
}
val revoked = workers.filterKeys(partitions.contains)
val remaining = workers -- partitions

(revoked, remaining)
}
.flatMap(shutdownWorkers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ object EventLoop {
consumer: Consumer,
partitions: Set[TopicPartition]
)(implicit trace: Trace): URIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
pausedPartitionsRef.set(Set.empty) *>
dispatcher.revoke(partitions).timeout(config.drainTimeout).flatMap { drained =>
ZIO.when(drained.isEmpty)(
report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumer.config.consumerAttributes))
)
} *> commitOffsetsOnRebalance(consumer0, offsets)
for {
_ <- pausedPartitionsRef.update(_ -- partitions)
isRevokeTimedOut <- dispatcher.revoke(partitions).timeout(config.drainTimeout).map(_.isEmpty)
_ <- ZIO.when(isRevokeTimedOut)(
report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumer.config.consumerAttributes))
)
delayedRebalanceEffect <- commitOffsetsOnRebalance(consumer0, offsets)
} yield delayedRebalanceEffect
}

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] =
Expand Down Expand Up @@ -258,7 +260,8 @@ case class EventLoopConfig(
highWatermark: Int,
rebalanceListener: RebalanceListener[Any],
delayResumeOfPausedPartition: Long,
startPaused: Boolean
startPaused: Boolean,
cooperativeRebalanceEnabled: Boolean
)

object EventLoopConfig {
Expand All @@ -269,7 +272,8 @@ object EventLoopConfig {
highWatermark = 256,
rebalanceListener = RebalanceListener.Empty,
delayResumeOfPausedPartition = 0,
startPaused = false
startPaused = false,
cooperativeRebalanceEnabled = false
)
}

Expand Down

0 comments on commit 659129c

Please sign in to comment.