From fb984bb18161d7dffb9e9ae0d89d9437dd9aed2c Mon Sep 17 00:00:00 2001 From: Yisrael Union Date: Thu, 7 Sep 2023 10:33:20 -0400 Subject: [PATCH 1/2] Update ZIO-Kafka version this will upgrade kafka client version and remove CVE related to earlier Kafka Clients --- project/Versions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Versions.scala b/project/Versions.scala index a0fd5da..bec34a6 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -17,6 +17,6 @@ object Versions { val skunk = "0.6.0" val embeddedPostgres = "2.0.4" val scribe = "3.11.5" - val zioKafka = "2.4.0" + val zioKafka = "2.4.2" val kafkaEmbedded = "3.5.0" } From 9dd5c9d1807b223eeb636013fcce753299f010c3 Mon Sep 17 00:00:00 2001 From: yunion Date: Thu, 7 Sep 2023 17:19:36 -0400 Subject: [PATCH 2/2] Modify implementation of traced producer as the implementation was changed in the underlying zio-kafka producer --- .../extras/ziokafka/KafkaProducerTracer.scala | 164 ++++++++++++------ 1 file changed, 109 insertions(+), 55 deletions(-) diff --git a/zio-kafka/src/main/scala/io/kaizensolutions/trace4cats/zio/extras/ziokafka/KafkaProducerTracer.scala b/zio-kafka/src/main/scala/io/kaizensolutions/trace4cats/zio/extras/ziokafka/KafkaProducerTracer.scala index b8a14c4..74e5964 100644 --- a/zio-kafka/src/main/scala/io/kaizensolutions/trace4cats/zio/extras/ziokafka/KafkaProducerTracer.scala +++ b/zio-kafka/src/main/scala/io/kaizensolutions/trace4cats/zio/extras/ziokafka/KafkaProducerTracer.scala @@ -8,7 +8,7 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.{Metric, MetricName} import trace4cats.{AttributeValue, ToHeaders} import trace4cats.model.SpanKind -import zio.{Chunk, RIO, Task, ZIO, ZLayer} +import zio.{Chunk, RIO, Task, UIO, ZIO, ZLayer} import zio.kafka.producer.Producer import zio.kafka.serde.Serializer @@ -21,56 +21,88 @@ object KafkaProducerTracer { tracer: ZTracer, underlying: Producer, toHeaders: ToHeaders = ToHeaders.all - ): Producer = new Producer { - def produce[R, K, V]( - record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] = - produceChunkAsync(Chunk.single(record), keySerializer, valueSerializer).flatten.map(_.head) - - def produce[R, K, V]( - topic: String, - key: K, - value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] = - produce(new ProducerRecord(topic, key, value), keySerializer, valueSerializer) - - def produceAsync[R, K, V]( - record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] = - produceChunkAsync(Chunk.single(record), keySerializer, valueSerializer).map(_.map(_.head)) - - def produceAsync[R, K, V]( - topic: String, - key: K, - value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] = - produceAsync(new ProducerRecord(topic, key, value), keySerializer, valueSerializer) - - def produceChunkAsync[R, K, V]( - records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[Chunk[RecordMetadata]]] = - tracedProduceChunkAsync(tracer, underlying, toHeaders, keySerializer, valueSerializer)(records) - - def produceChunk[R, K, V]( - records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Chunk[RecordMetadata]] = - produceChunkAsync(records, keySerializer, valueSerializer).flatten - - def flush: Task[Unit] = underlying.flush - - def metrics: Task[Map[MetricName, Metric]] = underlying.metrics + ): Producer = { + + new Producer { + def produce(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[RecordMetadata] = + produceChunkAsync(Chunk.single(record), Serializer.byteArray, Serializer.byteArray).flatten.map(_.head) + + def produce[R, K, V]( + record: ProducerRecord[K, V], + keySerializer: Serializer[R, K], + valueSerializer: Serializer[R, V] + ): RIO[R, RecordMetadata] = + produceChunkAsync(Chunk.single(record), keySerializer, valueSerializer).flatten.map(_.head) + + def produce[R, K, V]( + topic: String, + key: K, + value: V, + keySerializer: Serializer[R, K], + valueSerializer: Serializer[R, V] + ): RIO[R, RecordMetadata] = + produce(new ProducerRecord(topic, key, value), keySerializer, valueSerializer) + + def produceAsync( + record: ProducerRecord[Array[Byte], Array[Byte]] + ): Task[Task[RecordMetadata]] = + produceChunkAsync(Chunk.single(record), Serializer.byteArray, Serializer.byteArray).map(_.map(_.head)) + + def produceAsync[R, K, V]( + record: ProducerRecord[K, V], + keySerializer: Serializer[R, K], + valueSerializer: Serializer[R, V] + ): RIO[R, Task[RecordMetadata]] = + produceChunkAsync(Chunk.single(record), keySerializer, valueSerializer).map(_.map(_.head)) + + def produceAsync[R, K, V]( + topic: String, + key: K, + value: V, + keySerializer: Serializer[R, K], + valueSerializer: Serializer[R, V] + ): RIO[R, Task[RecordMetadata]] = + produceAsync(new ProducerRecord(topic, key, value), keySerializer, valueSerializer) + + def produceChunkAsync( + records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] + ): Task[Task[Chunk[RecordMetadata]]] = + produceChunkAsync(records, Serializer.byteArray, Serializer.byteArray) + + def produceChunkAsync[R, K, V]( + records: Chunk[ProducerRecord[K, V]], + keySerializer: Serializer[R, K], + valueSerializer: Serializer[R, V] + ): RIO[R, Task[Chunk[RecordMetadata]]] = + tracedProduceChunkAsync(tracer, underlying, toHeaders, keySerializer, valueSerializer)(records) + .map(_.flatMap { chunk => + val (errors, successes) = chunk.partitionMap(identity) + if (errors.isEmpty) ZIO.succeed(successes) + else ZIO.fail(errors.head) + }) + + def produceChunkAsyncWithFailures( + records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] + ): UIO[UIO[Chunk[Either[Throwable, RecordMetadata]]]] = + tracedProduceChunkAsync(tracer, underlying, toHeaders, Serializer.byteArray, Serializer.byteArray)(records) + .catchAll(e => ZIO.succeed(ZIO.succeed(Chunk.single(Left(e))))) + + def produceChunk( + records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] + ): Task[Chunk[RecordMetadata]] = + produceChunkAsync(records, Serializer.byteArray, Serializer.byteArray).flatten + + def produceChunk[R, K, V]( + records: Chunk[ProducerRecord[K, V]], + keySerializer: Serializer[R, K], + valueSerializer: Serializer[R, V] + ): RIO[R, Chunk[RecordMetadata]] = + produceChunkAsync(records, keySerializer, valueSerializer).flatten + + def flush: Task[Unit] = underlying.flush + + def metrics: Task[Map[MetricName, Metric]] = underlying.metrics + } } private def addHeaders[A, B](headers: List[Header], record: ProducerRecord[A, B]): Task[ProducerRecord[A, B]] = @@ -88,7 +120,7 @@ object KafkaProducerTracer { headers: ToHeaders, keySerializer: Serializer[R, K], valueSerializer: Serializer[R, V] - )(records: Chunk[ProducerRecord[K, V]]): RIO[R, Task[Chunk[RecordMetadata]]] = + )(records: Chunk[ProducerRecord[K, V]]): ZIO[R, Throwable, UIO[Chunk[Either[Throwable, RecordMetadata]]]] = tracer.withSpan("kafka-producer-send-buffer", kind = SpanKind.Producer) { span => tracer .extractHeaders(headers) @@ -100,8 +132,11 @@ object KafkaProducerTracer { .ignore kafkaTraceHeaders = traceHeaders.values.map { case (k, v) => new RecordHeader(k.toString, v.getBytes) }.toList - recordsWithHeaders <- ZIO.foreach(records)(addHeaders(kafkaTraceHeaders, _)) - waitForAck <- underlying.produceChunkAsync(recordsWithHeaders, keySerializer, valueSerializer) + recordsWithHeaders <- + ZIO.foreach(records)( + addHeaders(kafkaTraceHeaders, _).flatMap(serialize(_, keySerializer, valueSerializer)) + ) + waitForAck <- underlying.produceChunkAsyncWithFailures(recordsWithHeaders) } yield waitForAck enrichSpanWithError( @@ -117,7 +152,16 @@ object KafkaProducerTracer { kind = SpanKind.Producer, name = "kafka-producer-broker-ack" ) { span => - enrichSpanWithError("error.message-broker-ack", "error.cause-broker-ack", span, ack) + ack.tap(chunk => + ZIO.foreach(chunk)(elem => + enrichSpanWithError( + "error.message-broker-ack", + "error.cause-broker-ack", + span, + ZIO.fromEither[Throwable, RecordMetadata](elem) + ).either + ) + ) } ) } @@ -132,4 +176,14 @@ object KafkaProducerTracer { in .tapError(e => span.put(errorKey, AttributeValue.StringValue(e.getLocalizedMessage)).when(span.isSampled)) .tapDefect(c => span.put(causeKey, AttributeValue.StringValue(c.prettyPrint)).when(span.isSampled)) + + private def serialize[R, K, V]( + r: ProducerRecord[K, V], + keySerializer: Serializer[R, K], + valueSerializer: Serializer[R, V] + ): RIO[R, ProducerRecord[Array[Byte], Array[Byte]]] = + for { + key <- keySerializer.serialize(r.topic, r.headers, r.key()) + value <- valueSerializer.serialize(r.topic, r.headers, r.value()) + } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) }