Skip to content

Commit

Permalink
[greyhound] parallel consumer - fix shutdown (#36489)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: eb0dc817fa50494422fba10994dd4382f1a58585
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 8, 2023
1 parent 5f11e80 commit c675fe4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.wixpress.dst.greyhound.core

import com.wixpress.dst.greyhound.core.Serdes._
import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.PollingFailed
import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.{CommittedOffsets, PollingFailed}
import com.wixpress.dst.greyhound.core.consumer.EventLoop.Handler
import com.wixpress.dst.greyhound.core.consumer.OffsetReset.{Earliest, Latest}
import com.wixpress.dst.greyhound.core.consumer._
Expand Down Expand Up @@ -318,13 +318,15 @@ class ConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
} yield test
}

s"wait until queues are drained${parallelConsumerString(useParallelConsumer)}" in {
s"wait until queues are drained and commit on shutdown${parallelConsumerString(useParallelConsumer)}" in {
for {
r <- getShared
TestResources(kafka, producer) = r
_ <- ZIO.debug(">>>> starting test: gracefulShutdownTest")
topic <- kafka.createRandomTopic(prefix = "core-wait-until")
topic <- kafka.createRandomTopic(partitions = 1, prefix = "core-wait-until")
group <- randomGroup
cId <- clientId
tp = TopicPartition(topic, 0)

ref <- Ref.make(0)
startedHandling <- Promise.make[Nothing, Unit]
Expand All @@ -340,15 +342,17 @@ class ConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
group,
topic,
mutateEventLoop = _.copy(consumePartitionInParallel = useParallelConsumer, maxParallelism = 8)
),
).copy(clientId = cId),
handler
)
.flatMap { _ => producer.produce(ProducerRecord(topic, Chunk.empty)) *> startedHandling.await }
)

handled <- ref.get
metrics <- TestMetrics.reported
} yield {
handled must equalTo(1)
(handled must equalTo(1)) and
(metrics must contain(CommittedOffsets(cId, group, Map(tp -> 1L), calledOnRebalance = false, Map.empty)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ object EventLoop {
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
partitions <- partitionsAssigned.await
env <- ZIO.environment[Env]
} yield (dispatcher, fiber, offsets, positionsRef, running, rebalanceListener.provideEnvironment(env))
} yield (dispatcher, fiber, offsets, offsetsAndGaps, positionsRef, running, rebalanceListener.provideEnvironment(env))

start
.map {
case (dispatcher, fiber, offsets, positionsRef, running, listener) =>
case (dispatcher, fiber, offsets, offsetsAndGaps, positionsRef, running, listener) =>
new EventLoop[GreyhoundMetrics] {

override def stop: URIO[GreyhoundMetrics, Any] =
stopLoop(group, consumer, clientId, consumerAttributes, config, running, fiber, offsets, dispatcher)
stopLoop(group, consumer, clientId, consumerAttributes, config, running, fiber, offsets, offsetsAndGaps, dispatcher)

override def pause(implicit trace: Trace): URIO[GreyhoundMetrics, Unit] =
(report(PausingEventLoop(clientId, group, consumerAttributes)) *> running.set(Paused) *> dispatcher.pause).unit
Expand Down Expand Up @@ -132,14 +132,15 @@ object EventLoop {
running: Ref[EventLoopState],
fiber: Fiber.Runtime[Nothing, Boolean],
offsets: Offsets,
offsetsAndGaps: OffsetsAndGaps,
dispatcher: Dispatcher[R]
) =
for {
_ <- report(StoppingEventLoop(clientId, group, consumerAttributes))
_ <- running.set(ShuttingDown)
drained <- (fiber.join *> dispatcher.shutdown).timeout(config.drainTimeout)
_ <- ZIO.when(drained.isEmpty)(report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumerAttributes)))
_ <- commitOffsets(consumer, offsets)
_ <- if (config.consumePartitionInParallel) commitOffsetsAndGaps(consumer, offsetsAndGaps) else commitOffsets(consumer, offsets)
} yield ()

private def updatePositions(
Expand Down Expand Up @@ -327,7 +328,9 @@ object EventLoop {
}

private def commitOffsets(consumer: Consumer, offsets: Offsets): URIO[GreyhoundMetrics, Unit] =
offsets.committable.flatMap { committable => consumer.commit(committable).catchAll { _ => offsets.update(committable) } }
offsets.committable.flatMap { committable =>
consumer.commit(committable).catchAll { t => report(FailedToCommitOffsets(t, committable)) *> offsets.update(committable) }
}

private def commitOffsetsAndGaps(consumer: Consumer, offsetsAndGaps: OffsetsAndGaps): URIO[GreyhoundMetrics, Unit] =
offsetsAndGaps.getCommittableAndClear.flatMap { committable =>
Expand Down Expand Up @@ -465,7 +468,7 @@ object EventLoopMetric {
case class CreatingPollOnceFiber(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

case class AwaitingPartitionsAssignment(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

case class InitializedOffsetsAndGaps(
clientId: ClientId,
group: Group,
Expand All @@ -478,6 +481,8 @@ object EventLoopMetric {
case class FailedToCommitOffsetsAndMetadata(t: Throwable, offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata])
extends EventLoopMetric

case class FailedToCommitOffsets(t: Throwable, offsets: Map[TopicPartition, Offset]) extends EventLoopMetric

case class HandledBatch(records: Records) extends EventLoopMetric
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class DispatcherTest extends BaseTest[TestMetrics with TestClock] {
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
) // Will be dropped
_ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_.isEmpty)
_ <- ZIO.foreachDiscard(1 to 4)(_ => queue.take)
_ <- ZIO.foreachDiscard(1 to 5)(_ => queue.take)
_ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_ == Set(TopicPartition(topic, partition)))
} yield ok
)
Expand Down

0 comments on commit c675fe4

Please sign in to comment.