Skip to content

Commit

Permalink
moved enrich metadata out of GH core. add it to commit on rebalance (…
Browse files Browse the repository at this point in the history
…#36783)

moved enrich metadata out of GH core. add it to commit on rebalance #pr #skipreview

GitOrigin-RevId: bfdcccbc4c5210870118a1d049063db157d9f771
  • Loading branch information
leonbur authored and wix-oss committed Sep 3, 2023
1 parent c9d6404 commit cee1cac
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,8 @@ object Consumer {
.toMap
)

import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.time.ZoneOffset
private val podName = sys.env.get("POD_NAME")
private val dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss")
def metadata: Option[String] = if (config.enrichMetadata) podName.map(name => s">>> pod: $name, ts: ${dtf.format(LocalDateTime.now(ZoneOffset.UTC))}") else None

override def commit(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, metadata.getOrElse(cfg.commitMetadataString)))))
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString()))))
}

override def commitWithMetadata(
Expand All @@ -191,7 +184,7 @@ object Consumer {
override def commitOnRebalance(
offsets: Map[TopicPartition, Offset]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
val kOffsets = kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString))
val kOffsets = kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString()))
// we can't actually call commit here, as it needs to be called from the same
// thread, that triggered poll(), so we return the commit action as thunk
ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kOffsets)))
Expand Down Expand Up @@ -355,10 +348,9 @@ case class ConsumerConfig(
initialSeek: InitialOffsetsSeek = InitialOffsetsSeek.default,
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
commitMetadataString: Unit => Metadata = _ => OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsByMillis: Long = 0L,
useParallelConsumer: Boolean = false,
enrichMetadata: Boolean = false
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ object RecordConsumer {
config.commitMetadataString,
config.rewindUncommittedOffsetsBy.toMillis,
config.eventLoopConfig.consumePartitionInParallel,
config.enrichMetadata
)
}

Expand Down Expand Up @@ -343,10 +342,9 @@ case class RecordConsumerConfig(
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
retryProducerAttributes: Map[String, String] = Map.empty,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
commitMetadataString: Unit => Metadata = _ => OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsBy: Duration = 0.millis,
createRetryTopics: Boolean = true,
enrichMetadata: Boolean = true
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = extraProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ object BatchConsumer {
config.consumerAttributes,
config.decryptor,
config.commitMetadataString,
config.rewindUncommittedOffsetsBy.toMillis,
enrichMetadata = config.enrichMetadata
config.rewindUncommittedOffsetsBy.toMillis
)
}

Expand Down Expand Up @@ -184,9 +183,8 @@ case class BatchConsumerConfig(
initialOffsetsSeek: InitialOffsetsSeek = InitialOffsetsSeek.default,
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
commitMetadataString: Unit => Metadata = _ => OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsBy: Duration = Duration.ZERO,
enrichMetadata: Boolean = false
)

object BatchConsumerConfig {
Expand Down

0 comments on commit cee1cac

Please sign in to comment.