Skip to content

Commit

Permalink
[greyhound] parallel consumer - fix update bug (#36161)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 5b75a27782dec7e534f72f57d3b3ba38c4194d1e
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 3, 2023
1 parent 9b676c9 commit 118de10
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
(cr.key match {
case Some(_) =>
fastMessagesLatch.countDown
case None =>
case None =>
// make sure the handler doesn't finish before the rebalance is done, including drain timeout
finishRebalance.await *> ZIO.sleep(drainTimeout + 5.second)
}) *> numProcessedMessages.update(_ + 1)
Expand Down Expand Up @@ -150,44 +150,38 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
}
}

// "migrate correctly from regular record consumer to parallel consumer - consume every record once" in {
// ZIO.scoped {
// for {
// r <- getShared
// TestResources(kafka, producer) = r
// topic <- kafka.createRandomTopic()
// group <- randomGroup
// cId <- clientId
//
// regularConfig = configFor(kafka, group, Set(topic))
// parallelConfig = parallelConsumerConfig(kafka, topic, group, cId) // same group name for both consumers
// queue <- Queue.unbounded[ConsumerRecord[String, String]]
// handler = RecordHandler((cr: ConsumerRecord[String, String]) => queue.offer(cr)).withDeserializers(StringSerde, StringSerde)
//
// records1 = producerRecords(topic, "1", partitions, 3)
// records2 = producerRecords(topic, "2", partitions, 3)
// _ <- ZIO.debug(s"records1:\n${records1.mkString("\n")}\nrecords2:\n${records2.mkString("\n")}")
// numMessages = records1.size + records2.size
//
// _ <- RecordConsumer.make(regularConfig, handler)
// _ <- produceRecords(producer, records1)
// _ <- ZIO.sleep(3.seconds)
// _ <- RecordConsumer.make(parallelConfig, handler).delay(3.seconds)
// _ <- produceRecords(producer, records2)
// _ <- ZIO.sleep(3.seconds)
// messagesOption <- RecordConsumer.make(parallelConfig, handler).flatMap { _ =>
// produceRecords(producer, records2) *> ZIO.sleep(3.seconds) *>
// queue
// .takeBetween(numMessages, numMessages)
// .timeout(60.seconds)
// .tap(o => ZIO.when(o.isEmpty)(Console.printLine("timeout waiting for messages!")))
// }
// messages <- ZIO.fromOption(messagesOption).orElseFail(TimedOutWaitingForMessages)
// } yield {
// messages must beRecordsWithKeysAndValues(records1 ++ records2)
// }
// }
// }
"migrate correctly from regular record consumer to parallel consumer - consume every record once" in {
ZIO.scoped {
for {
r <- getShared
TestResources(kafka, producer) = r
topic <- kafka.createRandomTopic()
group <- randomGroup
cId <- clientId

regularConfig = configFor(kafka, group, Set(topic))
parallelConfig = parallelConsumerConfig(kafka, topic, group, cId) // same group name for both consumers
queue <- Queue.unbounded[ConsumerRecord[String, String]]
regularHandler = RecordHandler((cr: ConsumerRecord[String, String]) => queue.offer(cr)).withDeserializers(StringSerde, StringSerde)
longRunningHandler = RecordHandler((cr: ConsumerRecord[String, String]) =>
(if (cr.offset % 2 == 0) ZIO.sleep(2.seconds) else ZIO.unit) *> queue.offer(cr)
).withDeserializers(StringSerde, StringSerde)

records1 = producerRecords(topic, "1", partitions, 20)
records2 = producerRecords(topic, "2", partitions, 20)
numMessages = records1.size + records2.size

_ <- RecordConsumer.make(regularConfig, regularHandler)
_ <- produceRecords(producer, records1)
_ <- eventuallyZ(queue.size)(_ == records1.size)
_ <- ZIO.sleep(10.seconds)
_ <- RecordConsumer.make(parallelConfig, longRunningHandler).delay(3.seconds)
_ <- produceRecords(producer, records2)
_ <- ZIO.sleep(10.seconds)
_ <- eventuallyZ(queue.size, timeout = 20.seconds)(_ == numMessages)
} yield ok
}
}

"migrate from parallel consumer with gaps to regular consumer - consume from latest and report non-consumed gaps" in {
ZIO.scoped {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,9 @@ object Dispatcher {
.foreachParDiscard(groupedRecords)(sameKeyRecords =>
ZIO.foreach(sameKeyRecords) { record =>
if (shouldRecordBeHandled(record, latestCommitGaps)) {
handle(record).interruptible.ignore *> updateBatch(sameKeyRecords).interruptible
handle(record).interruptible.ignore *> updateBatch(Chunk(record)).interruptible
} else
report(SkippedPreviouslyHandledRecord(record, group, clientId, consumerAttributes))

report(SkippedPreviouslyHandledRecord(record, group, clientId))
}
)
.withParallelism(maxParallelism)
Expand Down Expand Up @@ -675,8 +674,11 @@ object DispatcherMetric {

case class InvokingHandlersInParallel(partition: TopicPartition, numHandlers: Int) extends DispatcherMetric

case class SkippedPreviouslyHandledRecord(record: Record, group: Group, clientId: ClientId, attributes: Map[String, String])
extends DispatcherMetric
case class SkippedPreviouslyHandledRecord(
record: Record,
group: Group,
clientId: ClientId
) extends DispatcherMetric

}

Expand Down

0 comments on commit 118de10

Please sign in to comment.