diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala index d8d0c15d..b1c872bb 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala @@ -171,15 +171,8 @@ object Consumer { .toMap ) - import java.time.format.DateTimeFormatter - import java.time.LocalDateTime - import java.time.ZoneOffset - private val podName = sys.env.get("POD_NAME") - private val dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss") - def metadata: Option[String] = if (config.enrichMetadata) podName.map(name => s">>> pod: $name, ts: ${dtf.format(LocalDateTime.now(ZoneOffset.UTC))}") else None - override def commit(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = { - withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, metadata.getOrElse(cfg.commitMetadataString))))) + withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString())))) } override def commitWithMetadata( @@ -191,7 +184,7 @@ object Consumer { override def commitOnRebalance( offsets: Map[TopicPartition, Offset] )(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = { - val kOffsets = kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString)) + val kOffsets = kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString())) // we can't actually call commit here, as it needs to be called from the same // thread, that triggered poll(), so we return the commit action as thunk ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kOffsets))) @@ -355,10 +348,9 @@ case class ConsumerConfig( initialSeek: InitialOffsetsSeek = InitialOffsetsSeek.default, consumerAttributes: Map[String, String] = Map.empty, decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor, - commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA, + commitMetadataString: Unit => Metadata = _ => OffsetAndMetadata.NO_METADATA, rewindUncommittedOffsetsByMillis: Long = 0L, useParallelConsumer: Boolean = false, - enrichMetadata: Boolean = false ) extends CommonGreyhoundConfig { override def kafkaProps: Map[String, String] = Map( diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala index d788c8ae..6078a784 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala @@ -229,7 +229,6 @@ object RecordConsumer { config.commitMetadataString, config.rewindUncommittedOffsetsBy.toMillis, config.eventLoopConfig.consumePartitionInParallel, - config.enrichMetadata ) } @@ -343,10 +342,9 @@ case class RecordConsumerConfig( consumerAttributes: Map[String, String] = Map.empty, decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor, retryProducerAttributes: Map[String, String] = Map.empty, - commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA, + commitMetadataString: Unit => Metadata = _ => OffsetAndMetadata.NO_METADATA, rewindUncommittedOffsetsBy: Duration = 0.millis, createRetryTopics: Boolean = true, - enrichMetadata: Boolean = true ) extends CommonGreyhoundConfig { override def kafkaProps: Map[String, String] = extraProperties diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala index ba414929..7d2839b3 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala @@ -146,8 +146,7 @@ object BatchConsumer { config.consumerAttributes, config.decryptor, config.commitMetadataString, - config.rewindUncommittedOffsetsBy.toMillis, - enrichMetadata = config.enrichMetadata + config.rewindUncommittedOffsetsBy.toMillis ) } @@ -184,9 +183,8 @@ case class BatchConsumerConfig( initialOffsetsSeek: InitialOffsetsSeek = InitialOffsetsSeek.default, consumerAttributes: Map[String, String] = Map.empty, decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor, - commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA, + commitMetadataString: Unit => Metadata = _ => OffsetAndMetadata.NO_METADATA, rewindUncommittedOffsetsBy: Duration = Duration.ZERO, - enrichMetadata: Boolean = false ) object BatchConsumerConfig {