diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala index 31b5d2bd..a5f0a98d 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala @@ -90,11 +90,13 @@ class OffsetsInitializer( } private def reportSkippedGaps(currentCommittedOffsets: Map[TopicPartition, Option[OffsetAndMetadata]]) = { - val skippedGaps = currentCommittedOffsets + val committedOffsetsAndGaps = currentCommittedOffsets .collect { case (tp, Some(om)) => tp -> om } .map(tpom => tpom._1 -> OffsetsAndGaps.parseGapsString(tpom._2.metadata)) - .collect { case (tp, Some(gaps)) => tp -> gaps } - reporter(SkippedGapsOnInitialization(clientId, group, skippedGaps)) + .collect { case (tp, Some(offsetAndGaps)) => tp -> offsetAndGaps } + val skippedGaps = committedOffsetsAndGaps.collect { case (tp, offsetAndGaps) if offsetAndGaps.gaps.nonEmpty => tp -> offsetAndGaps } + + reporter(SkippedGapsOnInitialization(clientId, group, skippedGaps, committedOffsetsAndGaps)) } private def fetchEndOffsets(seekToEndPartitions: Set[TopicPartition], timeout: Duration) = { diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala index 6202220d..da62f294 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala @@ -414,6 +414,11 @@ object ConsumerMetric { case class ClosedConsumer(group: Group, clientId: ClientId, result: MetricResult[Throwable, Unit]) extends ConsumerMetric - case class SkippedGapsOnInitialization(clientId: ClientId, group: Group, gaps: Map[TopicPartition, OffsetAndGaps]) extends ConsumerMetric + case class SkippedGapsOnInitialization( + clientId: ClientId, + group: Group, + skippedGaps: Map[TopicPartition, OffsetAndGaps], + currentCommittedOffsetsAndGaps: Map[TopicPartition, OffsetAndGaps] + ) extends ConsumerMetric }