From cb30bb4a98e41347d13ce67c7657297ba8289a0a Mon Sep 17 00:00:00 2001 From: Ben Wattelman <82799628+ben-wattelman@users.noreply.github.com> Date: Thu, 20 Jul 2023 15:21:44 +0300 Subject: [PATCH] [greyhound] parallel consumer - improve logging (#36009) * [greyhound] parallel consumer - improve logging * dummy commit GitOrigin-RevId: 60e02e82d457076a06b0c0f2a575130051db8c69 --- .../dst/greyhound/core/consumer/OffsetsInitializer.scala | 8 +++++--- .../dst/greyhound/core/consumer/ReportingConsumer.scala | 7 ++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala index 31b5d2bd..a5f0a98d 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala @@ -90,11 +90,13 @@ class OffsetsInitializer( } private def reportSkippedGaps(currentCommittedOffsets: Map[TopicPartition, Option[OffsetAndMetadata]]) = { - val skippedGaps = currentCommittedOffsets + val committedOffsetsAndGaps = currentCommittedOffsets .collect { case (tp, Some(om)) => tp -> om } .map(tpom => tpom._1 -> OffsetsAndGaps.parseGapsString(tpom._2.metadata)) - .collect { case (tp, Some(gaps)) => tp -> gaps } - reporter(SkippedGapsOnInitialization(clientId, group, skippedGaps)) + .collect { case (tp, Some(offsetAndGaps)) => tp -> offsetAndGaps } + val skippedGaps = committedOffsetsAndGaps.collect { case (tp, offsetAndGaps) if offsetAndGaps.gaps.nonEmpty => tp -> offsetAndGaps } + + reporter(SkippedGapsOnInitialization(clientId, group, skippedGaps, committedOffsetsAndGaps)) } private def fetchEndOffsets(seekToEndPartitions: Set[TopicPartition], timeout: Duration) = { diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala index 6202220d..da62f294 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala @@ -414,6 +414,11 @@ object ConsumerMetric { case class ClosedConsumer(group: Group, clientId: ClientId, result: MetricResult[Throwable, Unit]) extends ConsumerMetric - case class SkippedGapsOnInitialization(clientId: ClientId, group: Group, gaps: Map[TopicPartition, OffsetAndGaps]) extends ConsumerMetric + case class SkippedGapsOnInitialization( + clientId: ClientId, + group: Group, + skippedGaps: Map[TopicPartition, OffsetAndGaps], + currentCommittedOffsetsAndGaps: Map[TopicPartition, OffsetAndGaps] + ) extends ConsumerMetric }