Skip to content

Commit

Permalink
[greyhound] parallel consumer - init gaps on every rebalance (#36375)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 5b55ff311413a0ca90f0c66b7b5be0f12c8d3582
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 3, 2023
1 parent d8420ab commit 3af93b9
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,30 +155,43 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
for {
r <- getShared
TestResources(kafka, producer) = r
topic <- kafka.createRandomTopic()
topic <- kafka.createRandomTopic(partitions = 1)
group <- randomGroup
cId <- clientId
tp = TopicPartition(topic, 0)

regularConfig = configFor(kafka, group, Set(topic))
parallelConfig = parallelConsumerConfig(kafka, topic, group, cId) // same group name for both consumers
queue <- Queue.unbounded[ConsumerRecord[String, String]]
regularHandler = RecordHandler((cr: ConsumerRecord[String, String]) => queue.offer(cr)).withDeserializers(StringSerde, StringSerde)
handledOffsets <- Ref.make[Seq[Long]](Seq.empty) // keep track of handled offsets to make sure no duplicates are processed
regularHandler = RecordHandler((cr: ConsumerRecord[String, String]) => handledOffsets.update(_ :+ cr.offset))
.withDeserializers(StringSerde, StringSerde)
longRunningHandler = RecordHandler((cr: ConsumerRecord[String, String]) =>
(if (cr.offset % 2 == 0) ZIO.sleep(2.seconds) else ZIO.unit) *> queue.offer(cr)
(if (cr.offset % 2 == 0) ZIO.sleep(2.seconds) else ZIO.unit) *> handledOffsets.update(_ :+ cr.offset)
).withDeserializers(StringSerde, StringSerde)

records1 = producerRecords(topic, "1", partitions, 20)
records2 = producerRecords(topic, "2", partitions, 20)
numMessages = records1.size + records2.size

_ <- RecordConsumer.make(regularConfig, regularHandler)
_ <- produceRecords(producer, records1)
_ <- eventuallyZ(queue.size)(_ == records1.size)
_ <- ZIO.sleep(10.seconds)
_ <- RecordConsumer.make(parallelConfig, longRunningHandler).delay(3.seconds)
_ <- produceRecords(producer, records2)
_ <- ZIO.sleep(10.seconds)
_ <- eventuallyZ(queue.size, timeout = 20.seconds)(_ == numMessages)
records1 = producerRecords(topic, "1", 1, 10)
records2 = producerRecords(topic, "2", 1, 10)
records3 = producerRecords(topic, "3", 1, 10)
records4 = producerRecords(topic, "4", 1, 10)

regularConsumer1 <- RecordConsumer.make(regularConfig, regularHandler)
_ <- produceRecords(producer, records1)
_ <- eventuallyZ(handledOffsets.get)(_.sorted == records1.indices.map(_.toLong).sorted)
_ <- regularConsumer1.shutdown()
parallelConsumer1 <- RecordConsumer.make(parallelConfig, longRunningHandler)
parallelConsumer2 <- RecordConsumer.make(parallelConfig, longRunningHandler)
_ <- produceRecords(producer, records2)
_ <- eventuallyZ(handledOffsets.get, timeout = 10.seconds)(_.sorted == (records1 ++ records2).indices.map(_.toLong).sorted)
parallelConsumer3 <- RecordConsumer.make(parallelConfig, longRunningHandler).delay(5.seconds)
_ <- parallelConsumer1.shutdown() zipPar parallelConsumer2.shutdown()
_ <- produceRecords(producer, records3)
_ <-
eventuallyZ(handledOffsets.get, timeout = 10.seconds)(_.sorted == (records1 ++ records2 ++ records3).indices.map(_.toLong).sorted)
_ <- produceRecords(producer, records4)
_ <- eventuallyZ(handledOffsets.get, timeout = 10.seconds)(
_.sorted == (records1 ++ records2 ++ records3 ++ records4).indices.map(_.toLong).sorted
)
_ <- eventuallyZ(parallelConsumer3.committedOffsetsAndGaps(Set(tp)), timeout = 10.seconds)(_.get(tp).exists(_.gaps.isEmpty))
} yield ok
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ trait Consumer {

def commitOnRebalance(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect]

def committedOffsetsAndMetadataOnRebalance(partitions: Set[TopicPartition])(implicit trace: Trace): Map[TopicPartition, OffsetAndMetadata]

def commitWithMetadataOnRebalance(offsets: Map[TopicPartition, OffsetAndMetadata])(
implicit trace: Trace
): RIO[GreyhoundMetrics, DelayedRebalanceEffect]
Expand Down Expand Up @@ -187,6 +189,20 @@ object Consumer {
ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kOffsets)))
}

override def committedOffsetsAndMetadataOnRebalance(partitions: Set[TopicPartition])(
implicit trace: Trace
): Map[TopicPartition, OffsetAndMetadata] = {
// unsafe function - should only be called from a RebalanceListener
consumer
.committed(kafkaPartitions(partitions))
.asScala
.collect {
case (tp: KafkaTopicPartition, om: KafkaOffsetAndMetadata) =>
(TopicPartition(tp), OffsetAndMetadata(om.offset, om.metadata))
}
.toMap
}

override def commitWithMetadataOnRebalance(
offsets: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,11 @@ object Dispatcher {
maxParallelism: Int = 1,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit,
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, OffsetAndGaps]] = _ => ZIO.succeed(Map.empty),
gapsSizeLimit: Int = 500,
init: Promise[Nothing, Unit]
gapsSizeLimit: Int = 500
)(implicit trace: Trace): UIO[Dispatcher[R]] =
for {
p <- Promise.make[Nothing, Unit]
state <- Ref.make[DispatcherState](if (startPaused) DispatcherState.Paused(p) else DispatcherState.Running)
initState <-
Ref.make[DispatcherInitState](if (consumeInParallel) DispatcherInitState.NotInitialized else DispatcherInitState.Initialized)
workers <- Ref.make(Map.empty[TopicPartition, Worker])
} yield new Dispatcher[R] {
override def submit(record: Record): URIO[R with Env, SubmitResult] =
Expand All @@ -77,11 +74,6 @@ object Dispatcher {
override def submitBatch(records: Records): URIO[R with Env, SubmitResult] =
for {
_ <- report(SubmittingRecordBatch(group, clientId, records.size, records, consumerAttributes))
currentInitState <- initState.get
_ <- currentInitState match {
case DispatcherInitState.NotInitialized => init.await *> initState.set(DispatcherInitState.Initialized)
case _ => ZIO.unit
}
allSamePartition = records.map(r => RecordTopicPartition(r)).distinct.size == 1
submitResult <- if (allSamePartition) {
val partition = RecordTopicPartition(records.head)
Expand Down Expand Up @@ -225,16 +217,6 @@ object Dispatcher {

}

sealed trait DispatcherInitState

object DispatcherInitState {

case object NotInitialized extends DispatcherInitState

case object Initialized extends DispatcherInitState

}

case class Task(record: Record, complete: UIO[Unit])

trait Worker {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ object EventLoop {
updateBatch = { records: Chunk[Record] => report(HandledBatch(records)) *> updateGapsByBatch(records, offsetsAndGaps) }
currentGaps = { partitions: Set[TopicPartition] => offsetsAndGaps.offsetsAndGapsForPartitions(partitions) }
_ <- report(CreatingDispatcher(clientId, group, consumerAttributes, config.startPaused))
offsetsAndGapsInit <- Promise.make[Nothing, Unit]
dispatcher <- Dispatcher.make(
group,
clientId,
Expand All @@ -62,8 +61,7 @@ object EventLoop {
config.maxParallelism,
updateBatch,
currentGaps,
config.gapsSizeLimit,
offsetsAndGapsInit
config.gapsSizeLimit
)
positionsRef <- Ref.make(Map.empty[TopicPartition, Offset])
pausedPartitionsRef <- Ref.make(Set.empty[TopicPartition])
Expand All @@ -90,19 +88,6 @@ object EventLoop {
.forkDaemon
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
partitions <- partitionsAssigned.await
_ <- if (config.consumePartitionInParallel) {
report(AwaitingOffsetsAndGapsInit(clientId, group, consumerAttributes)) *>
initializeOffsetsAndGaps( // we must preform init in the main thread ant not in the rebalance listener as it involves calling SDK
offsetsAndGaps,
partitions,
consumer,
clientId,
group,
consumerAttributes,
offsetsAndGapsInit
) *> offsetsAndGapsInit.await

} else offsetsAndGapsInit.succeed()
env <- ZIO.environment[Env]
} yield (dispatcher, fiber, offsets, positionsRef, running, rebalanceListener.provideEnvironment(env))

Expand Down Expand Up @@ -224,8 +209,17 @@ object EventLoop {

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): UIO[DelayedRebalanceEffect] =
partitionsAssigned.succeed(partitions).as(DelayedRebalanceEffect.unit)
): URIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
for {
delayedRebalanceEffect <-
if (useParallelConsumer)
initOffsetsAndGapsOnRebalance(partitions, consumer0, offsetsAndGaps).catchAll { t =>
report(FailedToUpdateGapsOnPartitionAssignment(partitions, t)).as(DelayedRebalanceEffect.unit)
}
else DelayedRebalanceEffect.zioUnit
_ <- partitionsAssigned.succeed(partitions)
} yield delayedRebalanceEffect
}
}
}

Expand Down Expand Up @@ -263,24 +257,24 @@ object EventLoop {
_ <- pausedRef.update(_ => pausedTopics)
} yield records

private def initializeOffsetsAndGaps(
offsetsAndGaps: OffsetsAndGaps,
private def initOffsetsAndGapsOnRebalance(
partitions: Set[TopicPartition],
consumer: Consumer,
clientId: ClientId,
group: Group,
attributes: Map[String, String],
offsetsAndGapsInit: Promise[Nothing, Unit]
) = for {
committedOffsetsAndMetadata <- consumer.committedOffsetsAndMetadata(partitions)
initialOffsetsAndGaps =
committedOffsetsAndMetadata.mapValues(om =>
OffsetsAndGaps.parseGapsString(om.metadata).fold(OffsetAndGaps(om.offset - 1, committable = false))(identity)
)
_ <- offsetsAndGaps.init(initialOffsetsAndGaps)
_ <- report(InitializedOffsetsAndGaps(clientId, group, initialOffsetsAndGaps, attributes))
_ <- offsetsAndGapsInit.succeed(())
} yield ()
offsetsAndGaps: OffsetsAndGaps
): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
ZIO.runtime[GreyhoundMetrics].map { rt =>
DelayedRebalanceEffect {
val committed = committedOffsetsAndGaps(consumer, partitions)
zio.Unsafe.unsafe { implicit s => rt.unsafe.run(offsetsAndGaps.init(committed)) }
}
}
}

private def committedOffsetsAndGaps(consumer: Consumer, partitions: Set[TopicPartition]): Map[TopicPartition, OffsetAndGaps] = {
consumer
.committedOffsetsAndMetadataOnRebalance(partitions)
.mapValues(om => OffsetsAndGaps.parseGapsString(om.metadata).fold(OffsetAndGaps(om.offset - 1, committable = false))(identity))
}

private def submitRecordsSequentially[R2, R1](
consumer: Consumer,
Expand Down Expand Up @@ -450,13 +444,7 @@ object EventLoopMetric {

case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class FailedToUpdateGapsOnPartitionAssignment(
t: Throwable,
clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric
case class FailedToUpdateGapsOnPartitionAssignment(partitions: Set[TopicPartition], t: Throwable) extends EventLoopMetric

case class FailedToFetchCommittedGaps(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty)
extends EventLoopMetric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ trait RecordConsumer[-R] extends Resource[R] with RecordConsumerProperties[Recor

def committedOffsets(partitions: Set[TopicPartition]): RIO[Env, Map[TopicPartition, Offset]]

def committedOffsetsAndGaps(partitions: Set[TopicPartition]): RIO[Env, Map[TopicPartition, OffsetAndGaps]]

def waitForCurrentRecordsCompletion: URIO[Any, Unit]

def offsetsForTimes(topicPartitionsOnTimestamp: Map[TopicPartition, Long]): RIO[Any, Map[TopicPartition, Offset]]
Expand Down Expand Up @@ -139,6 +141,15 @@ object RecordConsumer {
override def committedOffsets(partitions: Set[TopicPartition]): RIO[Env, Map[TopicPartition, Offset]] =
consumer.committedOffsets(partitions)

override def committedOffsetsAndGaps(partitions: Set[TopicPartition]): RIO[Env, Map[TopicPartition, OffsetAndGaps]] =
consumer
.committedOffsetsAndMetadata(partitions)
.map(
_.mapValues(om =>
OffsetsAndGaps.parseGapsString(om.metadata).fold(OffsetAndGaps(om.offset - 1, committable = false))(identity)
)
)

override def waitForCurrentRecordsCompletion: URIO[Any, Unit] = eventLoop.waitForCurrentRecordsCompletion

override def state(implicit trace: Trace): UIO[RecordConsumerExposedState] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ case class ReportingConsumer(clientId: ClientId, group: Group, internal: Consume
}
.map(_._2)).provideEnvironment(r)

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[DelayedRebalanceEffect] =
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): UIO[DelayedRebalanceEffect] =
(report(PartitionsAssigned(clientId, group, partitions, config.consumerAttributes)) *>
rebalanceListener.onPartitionsAssigned(consumer, partitions)).provideEnvironment(r)
}
Expand Down Expand Up @@ -109,7 +111,12 @@ case class ReportingConsumer(clientId: ClientId, group: Group, internal: Consume
} else DelayedRebalanceEffect.zioUnit
}

override def commitWithMetadataOnRebalance(offsets: Map[TopicPartition, OffsetAndMetadata]
override def committedOffsetsAndMetadataOnRebalance(partitions: NonEmptySet[TopicPartition])(
implicit trace: Trace
): Map[TopicPartition, OffsetAndMetadata] = internal.committedOffsetsAndMetadataOnRebalance(partitions)

override def commitWithMetadataOnRebalance(
offsets: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
ZIO.runtime[GreyhoundMetrics].flatMap { runtime =>
if (offsets.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ trait EmptyConsumer extends Consumer {
): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
DelayedRebalanceEffect.zioUnit

override def committedOffsetsAndMetadataOnRebalance(partitions: Set[TopicPartition])(
implicit trace: Trace
): Map[TopicPartition, OffsetAndMetadata] = Map.empty

override def commitWithMetadataOnRebalance(offsets: Map[TopicPartition, OffsetAndMetadata])(
implicit trace: Trace
): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
Expand Down
Loading

0 comments on commit 3af93b9

Please sign in to comment.