Skip to content

Commit

Permalink
[greyhound] parallel consumer - interrupt fiber on shutdown timeout (…
Browse files Browse the repository at this point in the history
…#36642)

GitOrigin-RevId: aaccb23bdc18584d42b2240e5b5e63c7d9432838
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 3, 2023
1 parent f61ce82 commit 94e8131
Showing 1 changed file with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ object EventLoop {
_ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes))
fiber <- pollOnce(running, consumer, dispatcher, pausedPartitionsRef, positionsRef, offsets, config, clientId, group, offsetsAndGaps)
.repeatWhile(_ == true)
.interruptible
.forkDaemon
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
partitions <- partitionsAssigned.await
Expand Down Expand Up @@ -142,7 +143,9 @@ object EventLoop {
(joinFiberAndReport(group, clientId, consumerAttributes, fiber).interruptible *>
shutdownDispatcherAndReport(group, clientId, consumerAttributes, dispatcher))
.timeout(config.drainTimeout)
_ <- ZIO.when(drained.isEmpty)(report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumerAttributes)))
_ <- ZIO.when(drained.isEmpty)(
report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumerAttributes)) *> fiber.interruptFork
)
_ <- if (config.consumePartitionInParallel) commitOffsetsAndGaps(consumer, offsetsAndGaps) else commitOffsets(consumer, offsets)
_ <- report(StoppedEventLoop(clientId, group, consumerAttributes))
} yield ()
Expand Down Expand Up @@ -356,16 +359,18 @@ object EventLoop {
consumer.commit(committable).catchAll { t => report(FailedToCommitOffsets(t, committable)) *> offsets.update(committable) }
}

private def commitOffsetsAndGaps(consumer: Consumer, offsetsAndGaps: OffsetsAndGaps): URIO[GreyhoundMetrics, Unit] =
private def commitOffsetsAndGaps(consumer: Consumer, offsetsAndGaps: OffsetsAndGaps): URIO[GreyhoundMetrics, Unit] = {
offsetsAndGaps.getCommittableAndClear.flatMap { committable =>
val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable)
consumer
.commitWithMetadata(offsetsAndMetadataToCommit)
.tap(_ => ZIO.when(offsetsAndMetadataToCommit.nonEmpty)(report(CommittedOffsetsAndGaps(committable))))
.catchAll { t =>
report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable)
}
report(CommittingOffsetsAndGaps(consumer.config.groupId, committable)) *>
consumer
.commitWithMetadata(offsetsAndMetadataToCommit)
.tap(_ => ZIO.when(offsetsAndMetadataToCommit.nonEmpty)(report(CommittedOffsetsAndGaps(committable))))
.catchAll { t =>
report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable)
}
}
}

private def commitOffsetsOnRebalance(
consumer: Consumer,
Expand Down Expand Up @@ -460,6 +465,12 @@ object EventLoopMetric {

case class StoppedEventLoop(clientId: ClientId, group: Group, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class CommittingOffsetsAndGaps(
groupId: Group,
offsetsAndGaps: Map[TopicPartition, OffsetAndGaps],
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric

case class JoinedPollOnceFiberBeforeDispatcherShutdown(
clientId: ClientId,
group: Group,
Expand Down

0 comments on commit 94e8131

Please sign in to comment.