Skip to content

Commit

Permalink
[greyhound] parallel consumer - add disconnect on shutdown (#36660)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 84bff3d0d5156c9cd3315423becf3811fe374776
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 8, 2023
1 parent 4de42ee commit 155a9cc
Showing 1 changed file with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ object EventLoop {
for {
_ <- report(StoppingEventLoop(clientId, group, consumerAttributes))
_ <- running.set(ShuttingDown)
_ <- running.get.flatMap(currentState => report(EventLoopStateOnShutdown(clientId, group, currentState, consumerAttributes)))
drained <-
(joinFiberAndReport(group, clientId, consumerAttributes, fiber).interruptible *>
shutdownDispatcherAndReport(group, clientId, consumerAttributes, dispatcher))
shutdownDispatcherAndReport(group, clientId, consumerAttributes, dispatcher)).disconnect.interruptible
.timeout(config.drainTimeout)
_ <- ZIO.when(drained.isEmpty)(
report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumerAttributes)) *> fiber.interruptFork
Expand Down Expand Up @@ -202,8 +203,8 @@ object EventLoop {
_ <- ZIO.when(records.isEmpty)(ZIO.sleep(50.millis))
} yield true

case ShuttingDown => ZIO.succeed(false)
case Paused => ZIO.sleep(100.millis).as(true)
case ShuttingDown => report(PollOnceFiberShuttingDown(clientId, group, consumer.config.consumerAttributes)) *> ZIO.succeed(false)
case Paused => report(PollOnceFiberPaused(clientId, group, consumer.config.consumerAttributes)) *> ZIO.sleep(100.millis).as(true)
}

private def listener(
Expand Down Expand Up @@ -463,6 +464,18 @@ object EventLoopMetric {

case class StoppingEventLoop(clientId: ClientId, group: Group, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class EventLoopStateOnShutdown(
clientId: ClientId,
group: Group,
eventLoopState: EventLoopState,
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric

case class PollOnceFiberShuttingDown(clientId: ClientId, group: Group, attributes: Map[String, String] = Map.empty)
extends EventLoopMetric

case class PollOnceFiberPaused(clientId: ClientId, group: Group, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class StoppedEventLoop(clientId: ClientId, group: Group, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class CommittingOffsetsAndGaps(
Expand Down

0 comments on commit 155a9cc

Please sign in to comment.