Skip to content

Commit

Permalink
Metadata to commit log (#36749)
Browse files Browse the repository at this point in the history
* enrich commit metadata with pod and timestamp

* fix for metrics layer

GitOrigin-RevId: 8f4f026e95f2008cfe9e958a518381f31630e6c1
  • Loading branch information
leonbur authored and wix-oss committed Sep 3, 2023
1 parent 41ade44 commit 785d0b9
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,12 @@ object Consumer {
case class InitialOffsetsAndMetadata(offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]) extends com.wixpress.dst.greyhound.core.metrics.GreyhoundMetric

private def listener[R1](consumer: Consumer, onAssignFirstDo: Set[TopicPartition] => Unit, rebalanceListener: RebalanceListener[R1], unsafeOffsetOperations: UnsafeOffsetOperations) =
ZIO.runtime[R1].map { runtime =>
ZIO.runtime[R1 with GreyhoundMetrics].map { runtime =>
new ConsumerRebalanceListener {

def reportInitialOffsetsAndMetadata(partitions: Set[TopicPartition]) = {
val offsetsAndMetadata = unsafeOffsetOperations.committedWithMetadata(partitions, 10.seconds)
report(InitialOffsetsAndMetadata(offsetsAndMetadata)).provide(GreyhoundMetrics.liveLayer)
report(InitialOffsetsAndMetadata(offsetsAndMetadata))
}

override def onPartitionsRevoked(partitions: util.Collection[KafkaTopicPartition]): Unit = {
Expand Down

0 comments on commit 785d0b9

Please sign in to comment.