Skip to content

Commit

Permalink
enrich commit metadata with pod and timestamp (#36679)
Browse files Browse the repository at this point in the history
* enrich commit metadata with pod and timestamp

GitOrigin-RevId: 2669ad3b6fdb31749768eda02cf4cf0872933e74
  • Loading branch information
leonbur authored and wix-oss committed Sep 3, 2023
1 parent 461ccba commit 41ade44
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.ClosedConsumer
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, Decryptor, NoOpDecryptor, RecordTopicPartition}
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics._
import org.apache.kafka.clients.consumer.{ConsumerConfig => KafkaConsumerConfig, ConsumerRebalanceListener, KafkaConsumer, OffsetAndMetadata => KafkaOffsetAndMetadata}
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer, ConsumerConfig => KafkaConsumerConfig, OffsetAndMetadata => KafkaOffsetAndMetadata}
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.{TopicPartition => KafkaTopicPartition}
import zio.ZIO.attemptBlocking
Expand Down Expand Up @@ -99,11 +99,12 @@ object Consumer {
// we commit missing offsets to current position on assign - otherwise messages may be lost, in case of `OffsetReset.Latest`,
// if a partition with no committed offset is revoked during processing
// we also may want to seek forward to some given initial offsets
unsafeOffsetOperations = UnsafeOffsetOperations.make(consumer)
offsetsInitializer <- OffsetsInitializer
.make(
cfg.clientId,
cfg.groupId,
UnsafeOffsetOperations.make(consumer),
unsafeOffsetOperations,
timeout = 10.seconds,
timeoutIfSeek = 10.seconds,
initialSeek = cfg.initialSeek,
Expand All @@ -116,13 +117,13 @@ object Consumer {
override def subscribePattern[R1](topicStartsWith: Pattern, rebalanceListener: RebalanceListener[R1])(
implicit trace: Trace
): RIO[GreyhoundMetrics with R1, Unit] =
listener(this, offsetsInitializer.initializeOffsets, config.additionalListener *> rebalanceListener)
listener(this, offsetsInitializer.initializeOffsets, config.additionalListener *> rebalanceListener, unsafeOffsetOperations)
.flatMap(lis => withConsumer(_.subscribe(topicStartsWith, lis)))

override def subscribe[R1](topics: Set[Topic], rebalanceListener: RebalanceListener[R1])(
implicit trace: Trace
): RIO[GreyhoundMetrics with R1, Unit] =
listener(this, offsetsInitializer.initializeOffsets, config.additionalListener *> rebalanceListener)
listener(this, offsetsInitializer.initializeOffsets, config.additionalListener *> rebalanceListener, unsafeOffsetOperations)
.flatMap(lis => withConsumerBlocking(_.subscribe(topics.asJava, lis)))

override def poll(timeout: Duration)(implicit trace: Trace): RIO[Any, Records] =
Expand Down Expand Up @@ -170,8 +171,14 @@ object Consumer {
.toMap
)

import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
private val podName = sys.env.get("POD_NAME")
private val dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss")
def metadata: Option[String] = if (config.enrichMetadata) podName.map(name => s">>> pod: $name, ts: ${dtf.format(LocalDateTime.now())}") else None

override def commit(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString))))
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, metadata.getOrElse(cfg.commitMetadataString)))))
}

override def commitWithMetadata(
Expand Down Expand Up @@ -279,9 +286,17 @@ object Consumer {
}
}

private def listener[R1](consumer: Consumer, onAssignFirstDo: Set[TopicPartition] => Unit, rebalanceListener: RebalanceListener[R1]) =
case class InitialOffsetsAndMetadata(offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]) extends com.wixpress.dst.greyhound.core.metrics.GreyhoundMetric

private def listener[R1](consumer: Consumer, onAssignFirstDo: Set[TopicPartition] => Unit, rebalanceListener: RebalanceListener[R1], unsafeOffsetOperations: UnsafeOffsetOperations) =
ZIO.runtime[R1].map { runtime =>
new ConsumerRebalanceListener {

def reportInitialOffsetsAndMetadata(partitions: Set[TopicPartition]) = {
val offsetsAndMetadata = unsafeOffsetOperations.committedWithMetadata(partitions, 10.seconds)
report(InitialOffsetsAndMetadata(offsetsAndMetadata)).provide(GreyhoundMetrics.liveLayer)
}

override def onPartitionsRevoked(partitions: util.Collection[KafkaTopicPartition]): Unit = {
zio.Unsafe.unsafe { implicit s =>
runtime.unsafe
Expand All @@ -302,7 +317,7 @@ object Consumer {
zio.Unsafe.unsafe { implicit s =>
runtime.unsafe
.run(
rebalanceListener.onPartitionsAssigned(consumer, assigned)
reportInitialOffsetsAndMetadata(assigned) *> rebalanceListener.onPartitionsAssigned(consumer, assigned)
)
.getOrThrowFiberFailure()
.run()
Expand Down Expand Up @@ -341,7 +356,8 @@ case class ConsumerConfig(
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsByMillis: Long = 0L,
useParallelConsumer: Boolean = false
useParallelConsumer: Boolean = false,
enrichMetadata: Boolean = false
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ object RecordConsumer {
config.decryptor,
config.commitMetadataString,
config.rewindUncommittedOffsetsBy.toMillis,
config.eventLoopConfig.consumePartitionInParallel
config.eventLoopConfig.consumePartitionInParallel,
config.enrichMetadata
)
}

Expand Down Expand Up @@ -344,7 +345,8 @@ case class RecordConsumerConfig(
retryProducerAttributes: Map[String, String] = Map.empty,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsBy: Duration = 0.millis,
createRetryTopics: Boolean = true
createRetryTopics: Boolean = true,
enrichMetadata: Boolean = true
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = extraProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ object BatchConsumer {
config.consumerAttributes,
config.decryptor,
config.commitMetadataString,
config.rewindUncommittedOffsetsBy.toMillis
config.rewindUncommittedOffsetsBy.toMillis,
enrichMetadata = config.enrichMetadata
)
}

Expand Down Expand Up @@ -184,7 +185,8 @@ case class BatchConsumerConfig(
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsBy: Duration = Duration.ZERO
rewindUncommittedOffsetsBy: Duration = Duration.ZERO,
enrichMetadata: Boolean = false
)

object BatchConsumerConfig {
Expand Down

0 comments on commit 41ade44

Please sign in to comment.