Skip to content

Commit

Permalink
[greyhound] add shutdown visibility (#36571)
Browse files Browse the repository at this point in the history
[greyhound] add shutdown visibility #automerge

GitOrigin-RevId: a65aaecff072d8a19e9e70f68d4da713971bd538
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 3, 2023
1 parent a27d84e commit f61ce82
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ object Dispatcher {
case state => (ZIO.unit, state)
}.flatten

override def shutdown: URIO[GreyhoundMetrics, Unit] =
state.modify(state => (state, DispatcherState.ShuttingDown)).flatMap {
case DispatcherState.Paused(resume) => resume.succeed(()).unit
case _ => ZIO.unit
} *> workers.get.flatMap(shutdownWorkers).ignore
override def shutdown: URIO[GreyhoundMetrics, Unit] = {
report(ShuttingDownDispatcher(group, clientId, consumerAttributes)) *>
state.modify(state => (state, DispatcherState.ShuttingDown)).flatMap {
case DispatcherState.Paused(resume) => resume.succeed(()).unit
case _ => ZIO.unit
} *> workers.get.flatMap(shutdownWorkers).ignore
}

/**
* This implementation is not fiber-safe. Since the worker is used per partition, and all operations performed on a single partition
Expand Down Expand Up @@ -581,6 +583,8 @@ object DispatcherMetric {
attributes: Map[String, String]
) extends DispatcherMetric

case class ShuttingDownDispatcher(group: Group, clientId: ClientId, attributes: Map[String, String]) extends DispatcherMetric

case class WorkerStopped(group: Group, clientId: ClientId, partition: TopicPartition, durationMs: Long, attributes: Map[String, String])
extends DispatcherMetric

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,35 @@ object EventLoop {
for {
_ <- report(StoppingEventLoop(clientId, group, consumerAttributes))
_ <- running.set(ShuttingDown)
drained <- (fiber.join *> dispatcher.shutdown).timeout(config.drainTimeout)
drained <-
(joinFiberAndReport(group, clientId, consumerAttributes, fiber).interruptible *>
shutdownDispatcherAndReport(group, clientId, consumerAttributes, dispatcher))
.timeout(config.drainTimeout)
_ <- ZIO.when(drained.isEmpty)(report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumerAttributes)))
_ <- if (config.consumePartitionInParallel) commitOffsetsAndGaps(consumer, offsetsAndGaps) else commitOffsets(consumer, offsets)
_ <- report(StoppedEventLoop(clientId, group, consumerAttributes))
} yield ()

private def shutdownDispatcherAndReport[R](
group: Group,
clientId: ClientId,
consumerAttributes: Map[Group, Group],
dispatcher: Dispatcher[R]
) =
dispatcher.shutdown.timed
.map(_._1)
.flatMap(duration => report(DispatcherStopped(clientId, group, duration.toMillis, consumerAttributes)))

private def joinFiberAndReport[R](
group: Group,
clientId: ClientId,
consumerAttributes: Map[Group, Group],
fiber: Fiber.Runtime[Nothing, Boolean]
) =
fiber.join.timed
.map(_._1)
.flatMap(duration => report(JoinedPollOnceFiberBeforeDispatcherShutdown(clientId, group, duration.toMillis, consumerAttributes)))

private def updatePositions(
records: Consumer.Records,
positionsRef: Ref[Map[TopicPartition, Offset]],
Expand Down Expand Up @@ -434,6 +458,17 @@ object EventLoopMetric {

case class StoppingEventLoop(clientId: ClientId, group: Group, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class StoppedEventLoop(clientId: ClientId, group: Group, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class JoinedPollOnceFiberBeforeDispatcherShutdown(
clientId: ClientId,
group: Group,
durationMs: Long,
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric

case class DispatcherStopped(group: Group, clientId: ClientId, durationMs: Long, attributes: Map[String, String]) extends EventLoopMetric

case class DrainTimeoutExceeded(clientId: ClientId, group: Group, timeoutMs: Long, attributes: Map[String, String] = Map.empty)
extends EventLoopMetric

Expand Down

0 comments on commit f61ce82

Please sign in to comment.