diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala index 909682b7..54f21aa6 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala @@ -6,7 +6,7 @@ import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.ClosedConsumer import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, Decryptor, NoOpDecryptor, RecordTopicPartition} import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics._ -import org.apache.kafka.clients.consumer.{ConsumerConfig => KafkaConsumerConfig, ConsumerRebalanceListener, KafkaConsumer, OffsetAndMetadata => KafkaOffsetAndMetadata} +import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer, ConsumerConfig => KafkaConsumerConfig, OffsetAndMetadata => KafkaOffsetAndMetadata} import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.{TopicPartition => KafkaTopicPartition} import zio.ZIO.attemptBlocking @@ -99,11 +99,12 @@ object Consumer { // we commit missing offsets to current position on assign - otherwise messages may be lost, in case of `OffsetReset.Latest`, // if a partition with no committed offset is revoked during processing // we also may want to seek forward to some given initial offsets + unsafeOffsetOperations = UnsafeOffsetOperations.make(consumer) offsetsInitializer <- OffsetsInitializer .make( cfg.clientId, cfg.groupId, - UnsafeOffsetOperations.make(consumer), + unsafeOffsetOperations, timeout = 10.seconds, timeoutIfSeek = 10.seconds, initialSeek = cfg.initialSeek, @@ -116,13 +117,13 @@ object Consumer { override def subscribePattern[R1](topicStartsWith: Pattern, rebalanceListener: RebalanceListener[R1])( implicit trace: Trace ): RIO[GreyhoundMetrics with R1, Unit] = - listener(this, offsetsInitializer.initializeOffsets, config.additionalListener *> rebalanceListener) + listener(this, offsetsInitializer.initializeOffsets, config.additionalListener *> rebalanceListener, unsafeOffsetOperations) .flatMap(lis => withConsumer(_.subscribe(topicStartsWith, lis))) override def subscribe[R1](topics: Set[Topic], rebalanceListener: RebalanceListener[R1])( implicit trace: Trace ): RIO[GreyhoundMetrics with R1, Unit] = - listener(this, offsetsInitializer.initializeOffsets, config.additionalListener *> rebalanceListener) + listener(this, offsetsInitializer.initializeOffsets, config.additionalListener *> rebalanceListener, unsafeOffsetOperations) .flatMap(lis => withConsumerBlocking(_.subscribe(topics.asJava, lis))) override def poll(timeout: Duration)(implicit trace: Trace): RIO[Any, Records] = @@ -170,8 +171,14 @@ object Consumer { .toMap ) + import java.time.format.DateTimeFormatter + import java.time.LocalDateTime + private val podName = sys.env.get("POD_NAME") + private val dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss") + def metadata: Option[String] = if (config.enrichMetadata) podName.map(name => s">>> pod: $name, ts: ${dtf.format(LocalDateTime.now())}") else None + override def commit(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = { - withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString)))) + withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, metadata.getOrElse(cfg.commitMetadataString))))) } override def commitWithMetadata( @@ -279,9 +286,17 @@ object Consumer { } } - private def listener[R1](consumer: Consumer, onAssignFirstDo: Set[TopicPartition] => Unit, rebalanceListener: RebalanceListener[R1]) = + case class InitialOffsetsAndMetadata(offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]) extends com.wixpress.dst.greyhound.core.metrics.GreyhoundMetric + + private def listener[R1](consumer: Consumer, onAssignFirstDo: Set[TopicPartition] => Unit, rebalanceListener: RebalanceListener[R1], unsafeOffsetOperations: UnsafeOffsetOperations) = ZIO.runtime[R1].map { runtime => new ConsumerRebalanceListener { + + def reportInitialOffsetsAndMetadata(partitions: Set[TopicPartition]) = { + val offsetsAndMetadata = unsafeOffsetOperations.committedWithMetadata(partitions, 10.seconds) + report(InitialOffsetsAndMetadata(offsetsAndMetadata)).provide(GreyhoundMetrics.liveLayer) + } + override def onPartitionsRevoked(partitions: util.Collection[KafkaTopicPartition]): Unit = { zio.Unsafe.unsafe { implicit s => runtime.unsafe @@ -302,7 +317,7 @@ object Consumer { zio.Unsafe.unsafe { implicit s => runtime.unsafe .run( - rebalanceListener.onPartitionsAssigned(consumer, assigned) + reportInitialOffsetsAndMetadata(assigned) *> rebalanceListener.onPartitionsAssigned(consumer, assigned) ) .getOrThrowFiberFailure() .run() @@ -341,7 +356,8 @@ case class ConsumerConfig( decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor, commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA, rewindUncommittedOffsetsByMillis: Long = 0L, - useParallelConsumer: Boolean = false + useParallelConsumer: Boolean = false, + enrichMetadata: Boolean = false ) extends CommonGreyhoundConfig { override def kafkaProps: Map[String, String] = Map( diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala index ed3028f0..d788c8ae 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala @@ -228,7 +228,8 @@ object RecordConsumer { config.decryptor, config.commitMetadataString, config.rewindUncommittedOffsetsBy.toMillis, - config.eventLoopConfig.consumePartitionInParallel + config.eventLoopConfig.consumePartitionInParallel, + config.enrichMetadata ) } @@ -344,7 +345,8 @@ case class RecordConsumerConfig( retryProducerAttributes: Map[String, String] = Map.empty, commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA, rewindUncommittedOffsetsBy: Duration = 0.millis, - createRetryTopics: Boolean = true + createRetryTopics: Boolean = true, + enrichMetadata: Boolean = true ) extends CommonGreyhoundConfig { override def kafkaProps: Map[String, String] = extraProperties diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala index 3d92e85a..ba414929 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala @@ -146,7 +146,8 @@ object BatchConsumer { config.consumerAttributes, config.decryptor, config.commitMetadataString, - config.rewindUncommittedOffsetsBy.toMillis + config.rewindUncommittedOffsetsBy.toMillis, + enrichMetadata = config.enrichMetadata ) } @@ -184,7 +185,8 @@ case class BatchConsumerConfig( consumerAttributes: Map[String, String] = Map.empty, decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor, commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA, - rewindUncommittedOffsetsBy: Duration = Duration.ZERO + rewindUncommittedOffsetsBy: Duration = Duration.ZERO, + enrichMetadata: Boolean = false ) object BatchConsumerConfig {