Skip to content

Commit

Permalink
[greyhound] parallel cosnumer - add init log (#36421)
Browse files Browse the repository at this point in the history
[greyhound] parallel cosnumer - add init log #automerge

GitOrigin-RevId: 8e6b0c2f3a877fe4b9805f60a98a814a144b2ae6
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 8, 2023
1 parent 09dce31 commit 5f11e80
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ object EventLoop {
for {
delayedRebalanceEffect <-
if (useParallelConsumer)
initOffsetsAndGapsOnRebalance(partitions, consumer0, offsetsAndGaps).catchAll { t =>
initOffsetsAndGapsOnRebalance(partitions, consumer0, offsetsAndGaps, clientId, group).catchAll { t =>
report(FailedToUpdateGapsOnPartitionAssignment(partitions, t)).as(DelayedRebalanceEffect.unit)
}
else DelayedRebalanceEffect.zioUnit
Expand Down Expand Up @@ -260,12 +260,19 @@ object EventLoop {
private def initOffsetsAndGapsOnRebalance(
partitions: Set[TopicPartition],
consumer: Consumer,
offsetsAndGaps: OffsetsAndGaps
offsetsAndGaps: OffsetsAndGaps,
clientId: ClientId,
group: Group
): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
ZIO.runtime[GreyhoundMetrics].map { rt =>
DelayedRebalanceEffect {
val committed = committedOffsetsAndGaps(consumer, partitions)
zio.Unsafe.unsafe { implicit s => rt.unsafe.run(offsetsAndGaps.init(committed)) }
zio.Unsafe.unsafe { implicit s =>
rt.unsafe.run(
offsetsAndGaps.init(committed) *>
report(InitializedOffsetsAndGaps(clientId, group, committed, consumer.config.consumerAttributes))
)
}
}
}
}
Expand Down Expand Up @@ -458,9 +465,7 @@ object EventLoopMetric {
case class CreatingPollOnceFiber(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

case class AwaitingPartitionsAssignment(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

case class AwaitingOffsetsAndGapsInit(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric


case class InitializedOffsetsAndGaps(
clientId: ClientId,
group: Group,
Expand Down

0 comments on commit 5f11e80

Please sign in to comment.