diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index e99efe0b4..28defb207 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -219,7 +219,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { def consumeAndCommit(count: Long) = Consumer - .partitionedStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) + .partitionedStream[Any, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .flatMapPar(partitionCount)(_._2) .take(count) .transduce(ZSink.collectAllN[CommittableRecord[String, String]](20)) @@ -293,7 +293,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { def consumeAndCommit(count: Long, topic: String, groupId: String) = Consumer - .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) + .plainStream[Any, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) .foreach(_.offset.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) @@ -336,7 +336,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { def consumeAndCommit(count: Long, topic: String, groupId: String) = Consumer - .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) + .plainStream[Any, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) .foreach(_.offset.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala b/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala index ab5ffa0ef..cb2a3f9d3 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala @@ -25,7 +25,7 @@ object PopulateTopic extends ZIOAppDefault { override def run: ZIO[Any, Throwable, Unit] = dataStream(872000).map { case (k, v) => new ProducerRecord("inputs-topic", null, null, k, v) - }.mapChunksZIO(Producer.produceChunkAsync[Any, String, String](_, Serde.string, Serde.string).map(Chunk(_))) + }.mapChunksZIO(Producer.produceChunkAsync[String, String](_, Serde.string, Serde.string).map(Chunk(_))) .mapZIOPar(5)(_.flatMap(chunk => Console.printLine(s"Wrote chunk of ${chunk.size}"))) .runDrain .provide( diff --git a/zio-kafka-test/src/test/scala/zio/kafka/serde/DeserializerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/serde/DeserializerSpec.scala index 0b5367744..579abe861 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/serde/DeserializerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/serde/DeserializerSpec.scala @@ -2,13 +2,12 @@ package zio.kafka.serde import org.apache.kafka.common.header.internals.RecordHeaders import zio._ +import zio.kafka.ZIOSpecDefaultSlf4j import zio.test.Assertion._ import zio.test._ -import zio.ZAny -import zio.kafka.ZIOSpecDefaultSlf4j object DeserializerSpec extends ZIOSpecDefaultSlf4j { - override def spec: Spec[ZAny with Any, Throwable] = suite("Deserializer")( + override def spec: Spec[TestEnvironment with Scope, Any] = suite("Deserializer")( suite("asOption")( test("deserialize to None when value is null") { assertZIO(stringDeserializer.asOption.deserialize("topic1", new RecordHeaders, null))(isNone) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala index 57e1f2e64..6f8f32011 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala @@ -1,6 +1,7 @@ package zio.kafka.serde import org.apache.kafka.common.header.internals.RecordHeaders +import zio.Scope import zio.kafka.ZIOSpecDefaultSlf4j import zio.test.Assertion._ import zio.test._ @@ -15,7 +16,7 @@ object SerdeSpec extends ZIOSpecDefaultSlf4j { private val anyBytes = Gen.listOf(Gen.byte).map(bytes => new org.apache.kafka.common.utils.Bytes(bytes.toArray)) - override def spec: Spec[Any, Throwable] = suite("Serde")( + override def spec: Spec[TestEnvironment with Scope, Any] = suite("Serde")( testSerde(Serde.string, Gen.string), testSerde(Serde.int, Gen.int), testSerde(Serde.short, Gen.short), @@ -27,20 +28,20 @@ object SerdeSpec extends ZIOSpecDefaultSlf4j { testSerde(Serde.byteArray, Gen.listOf(Gen.byte).map(_.toArray)), suite("asOption")( test("serialize and deserialize None values to null and visa versa") { - val serde = testDataStructureSerde.asOption + val serde = testDataStructureSerde.asOption + val serialized = serde.serialize("topic1", new RecordHeaders, None) for { - serialized <- serde.serialize("topic1", new RecordHeaders, None) deserialized <- serde.deserialize("topic1", new RecordHeaders, serialized) } yield assert(serialized)(isNull) && assert(deserialized)(isNone) } ) ) - private def testSerde[R, A](serde: Serde[Any, A], gen: Gen[R, A])(implicit clsTag: ClassTag[A]) = + private def testSerde[R, A](serde: Serde[Throwable, A], gen: Gen[R, A])(implicit clsTag: ClassTag[A]) = test(s"serialize and deserialize ${clsTag.runtimeClass.getSimpleName}") { check(gen) { value => + val serialized = serde.serialize("topic1", new RecordHeaders, value) for { - serialized <- serde.serialize("topic1", new RecordHeaders, value) deserialized <- serde.deserialize("topic1", new RecordHeaders, serialized) } yield assert(deserialized)(equalTo(value)) } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/serde/SerializerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/serde/SerializerSpec.scala index 20b0a5aa6..52cd9eb72 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/serde/SerializerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/serde/SerializerSpec.scala @@ -3,19 +3,21 @@ package zio.kafka.serde import org.apache.kafka.common.header.internals.RecordHeaders import zio.test.Assertion._ import zio.test._ -import zio.ZAny +import zio.{ ZAny, ZIO } import zio.kafka.ZIOSpecDefaultSlf4j object SerializerSpec extends ZIOSpecDefaultSlf4j { override def spec: Spec[ZAny with Any, Throwable] = suite("Serializer")( suite("asOption")( test("serialize None values to null") { - assertZIO(stringSerializer.asOption.serialize("topic1", new RecordHeaders, None))(isNull) + assert(stringSerializer.asOption.serialize("topic1", new RecordHeaders, None))(isNull) }, test("serialize Some values") { check(Gen.string) { string => assertZIO( - stringSerializer.asOption.serialize("topic1", new RecordHeaders, Some(string)).map(new String(_, "UTF-8")) + ZIO + .succeed(stringSerializer.asOption.serialize("topic1", new RecordHeaders, Some(string))) + .map(new String(_, "UTF-8")) )( equalTo(string) ) @@ -23,5 +25,5 @@ object SerializerSpec extends ZIOSpecDefaultSlf4j { } ) ) - private lazy val stringSerializer: Serializer[Any, String] = Serde.string + private lazy val stringSerializer: Serializer[String] = Serde.string } diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 9d6ad7a8a..3c6ba71da 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -70,7 +70,7 @@ object KafkaTestUtils { key: String, message: String ): ZIO[Producer, Throwable, RecordMetadata] = - Producer.produce[Any, String, String](new ProducerRecord(topic, key, message), Serde.string, Serde.string) + Producer.produce[String, String](new ProducerRecord(topic, key, message), Serde.string, Serde.string) /** * Utility function to produce many messages in give Partition of a Topic. @@ -81,7 +81,7 @@ object KafkaTestUtils { kvs: Iterable[(String, String)] ): ZIO[Producer, Throwable, Chunk[RecordMetadata]] = Producer - .produceChunk[Any, String, String]( + .produceChunk[String, String]( Chunk.fromIterable(kvs.map { case (k, v) => new ProducerRecord(topic, partition, null, k, v) }), @@ -97,7 +97,7 @@ object KafkaTestUtils { kvs: Iterable[(String, String)] ): ZIO[Producer, Throwable, Chunk[RecordMetadata]] = Producer - .produceChunk[Any, String, String]( + .produceChunk[String, String]( Chunk.fromIterable(kvs.map { case (k, v) => new ProducerRecord(topic, k, v) }), diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index d138ea6d8..2b94e143b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -3,17 +3,17 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition import zio.kafka.serde.Deserializer -import zio.{ RIO, Task } +import zio.{ IO, Task } final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { - def deserializeWith[R, K1, V1]( - keyDeserializer: Deserializer[R, K1], - valueDeserializer: Deserializer[R, V1] - )(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): RIO[R, CommittableRecord[K1, V1]] = + def deserializeWith[E, K1, V1]( + keyDeserializer: Deserializer[E, K1], + valueDeserializer: Deserializer[E, V1] + )(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): IO[E, CommittableRecord[K1, V1]] = for { key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index fb43e0cbb..067431538 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -64,11 +64,11 @@ trait Consumer { * On completion of the stream, the consumer is unsubscribed. In case of multiple subscriptions, the total consumer * subscription is changed to exclude this subscription. */ - def partitionedAssignmentStream[R, K, V]( + def partitionedAssignmentStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V] + ): Stream[Throwable, Chunk[(TopicPartition, ZStream[Any, Throwable, CommittableRecord[K, V]])]] /** * Create a stream with messages on the subscribed topic-partitions by topic-partition @@ -87,11 +87,11 @@ trait Consumer { * On completion of the stream, the consumer is unsubscribed. In case of multiple subscriptions, the total consumer * subscription is changed to exclude this subscription. */ - def partitionedStream[R, K, V]( + def partitionedStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V] + ): Stream[Throwable, (TopicPartition, ZStream[Any, Throwable, CommittableRecord[K, V]])] /** * Create a stream with all messages on the subscribed topic-partitions @@ -111,12 +111,12 @@ trait Consumer { * On completion of the stream, the consumer is unsubscribed. In case of multiple subscriptions, the total consumer * subscription is changed to exclude this subscription. */ - def plainStream[R, K, V]( + def plainStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V], + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V], bufferSize: Int = 4 - ): ZStream[R, Throwable, CommittableRecord[K, V]] + ): ZStream[Any, Throwable, CommittableRecord[K, V]] /** * Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit @@ -127,14 +127,14 @@ trait Consumer { /** * See [[Consumer.consumeWith]]. */ - def consumeWith[R: EnvironmentTag, R1: EnvironmentTag, K, V]( + def consumeWith[E, R1: EnvironmentTag, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V], + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V], commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) )( f: ConsumerRecord[K, V] => URIO[R1, Unit] - ): ZIO[R & R1, Throwable, Unit] + ): ZIO[R1, Throwable, Unit] /** * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest @@ -293,36 +293,36 @@ object Consumer { /** * Accessor method */ - def partitionedAssignmentStream[R, K, V]( + def partitionedAssignmentStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V] - ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V] + ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[Any, Throwable, CommittableRecord[K, V]])]] = ZStream.serviceWithStream[Consumer](_.partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer)) /** * Accessor method */ - def partitionedStream[R, K, V]( + def partitionedStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V] + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V] ): ZStream[ Consumer, Throwable, - (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]]) + (TopicPartition, ZStream[Any, Throwable, CommittableRecord[K, V]]) ] = ZStream.serviceWithStream[Consumer](_.partitionedStream(subscription, keyDeserializer, valueDeserializer)) /** * Accessor method */ - def plainStream[R, K, V]( + def plainStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V], + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V], bufferSize: Int = 4 - ): ZStream[R & Consumer, Throwable, CommittableRecord[K, V]] = + ): ZStream[Consumer, Throwable, CommittableRecord[K, V]] = ZStream.serviceWithStream[Consumer]( _.plainStream(subscription, keyDeserializer, valueDeserializer, bufferSize) ) @@ -376,8 +376,8 @@ object Consumer { * @param f * Function that returns the effect to execute for each message. It is passed the * [[org.apache.kafka.clients.consumer.ConsumerRecord]]. - * @tparam R - * Environment for the consuming effect + * @tparam E + * Error channel of deserializers * @tparam R1 * Environment for the deserializers * @tparam K @@ -387,17 +387,17 @@ object Consumer { * @return * Effect that completes with a unit value only when interrupted. May fail when the [[Consumer]] fails. */ - def consumeWith[R: EnvironmentTag, R1: EnvironmentTag, K, V]( + def consumeWith[E, R1: EnvironmentTag, K, V]( settings: ConsumerSettings, subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V], + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V], commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) - )(f: ConsumerRecord[K, V] => URIO[R1, Unit]): RIO[R & R1, Unit] = - ZIO.scoped[R & R1] { + )(f: ConsumerRecord[K, V] => URIO[R1, Unit]): RIO[R1, Unit] = + ZIO.scoped[R1] { Consumer .make(settings) - .flatMap(_.consumeWith[R, R1, K, V](subscription, keyDeserializer, valueDeserializer, commitRetryPolicy)(f)) + .flatMap(_.consumeWith[E, R1, K, V](subscription, keyDeserializer, valueDeserializer, commitRetryPolicy)(f)) } /** @@ -463,6 +463,8 @@ object Consumer { case object Latest extends AutoOffsetStrategy case object None extends AutoOffsetStrategy } + + case class DeserializationError(e: Any) extends Exception(s"Deserialization error: ${e}") } private[consumer] final class ConsumerLive private[consumer] ( @@ -504,11 +506,11 @@ private[consumer] final class ConsumerLive private[consumer] ( override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) - override def partitionedAssignmentStream[R, K, V]( + override def partitionedAssignmentStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = { + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V] + ): Stream[Throwable, Chunk[(TopicPartition, ZStream[Any, Throwable, CommittableRecord[K, V]])]] = { val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) ZStream.unwrapScoped { @@ -520,10 +522,13 @@ private[consumer] final class ConsumerLive private[consumer] ( .map { _.collect { case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => - val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = + val stream: ZStream[Any, Throwable, CommittableRecord[K, V]] = if (onlyByteArraySerdes) - partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] - else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) + partitionStream.asInstanceOf[ZStream[Any, Throwable, CommittableRecord[K, V]]] + else + partitionStream.mapChunksZIO( + _.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer).mapError(DeserializationError(_))) + ) tp -> stream } @@ -531,19 +536,19 @@ private[consumer] final class ConsumerLive private[consumer] ( } } - override def partitionedStream[R, K, V]( + override def partitionedStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V] - ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] = + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V] + ): ZStream[Any, Throwable, (TopicPartition, ZStream[Any, Throwable, CommittableRecord[K, V]])] = partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks - override def plainStream[R, K, V]( + override def plainStream[E, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V], + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V], bufferSize: Int - ): ZStream[R, Throwable, CommittableRecord[K, V]] = + ): ZStream[Any, Throwable, CommittableRecord[K, V]] = partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar( n = Int.MaxValue, bufferSize = bufferSize @@ -553,16 +558,16 @@ private[consumer] final class ConsumerLive private[consumer] ( ZIO.logDebug("stopConsumption called") *> runloopAccess.stopConsumption - override def consumeWith[R: EnvironmentTag, R1: EnvironmentTag, K, V]( + override def consumeWith[E, R1: EnvironmentTag, K, V]( subscription: Subscription, - keyDeserializer: Deserializer[R, K], - valueDeserializer: Deserializer[R, V], + keyDeserializer: Deserializer[E, K], + valueDeserializer: Deserializer[E, V], commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) )( f: ConsumerRecord[K, V] => URIO[R1, Unit] - ): ZIO[R & R1, Throwable, Unit] = + ): ZIO[R1, Throwable, Unit] = for { - r <- ZIO.environment[R & R1] + r <- ZIO.environment[R1] _ <- partitionedStream(subscription, keyDeserializer, valueDeserializer) .flatMapPar(Int.MaxValue) { case (_, partitionStream) => partitionStream.mapChunksZIO(_.mapZIO((c: CommittableRecord[K, V]) => f(c.record).as(c.offset))) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index bc77c6469..43e4ba7dd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -15,42 +15,42 @@ import scala.util.control.{ NoStackTrace, NonFatal } trait Producer { /** - * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version - * that allows to avoid round-trip-time penalty for each record. + * Produces a single record and await broker acknowledgement. See [[produceAsync[K,V](topic:String*]] for version that + * allows to avoid round-trip-time penalty for each record. */ def produce( record: ProducerRecord[Array[Byte], Array[Byte]] ): Task[RecordMetadata] /** - * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version - * that allows to avoid round-trip-time penalty for each record. + * Produces a single record and await broker acknowledgement. See [[produceAsync[K,V](topic:String*]] for version that + * allows to avoid round-trip-time penalty for each record. */ - def produce[R, K, V]( + def produce[K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[RecordMetadata] /** - * Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version - * that allows to avoid round-trip-time penalty for each record. + * Produces a single record and await broker acknowledgement. See [[produceAsync[K,V](topic:String*]] for version that + * allows to avoid round-trip-time penalty for each record. */ - def produce[R, K, V]( + def produce[K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[RecordMetadata] /** * A stream pipeline that produces all records from the stream. */ - final def produceAll[R, K, V]( - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): ZPipeline[R, Throwable, ProducerRecord[K, V], RecordMetadata] = + final def produceAll[K, V]( + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): ZPipeline[Any, Throwable, ProducerRecord[K, V], RecordMetadata] = ZPipeline.mapChunksZIO(records => produceChunk(records, keySerializer, valueSerializer)) /** @@ -61,7 +61,7 @@ trait Producer { * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases - * throughput. See [[produce[R,K,V](record*]] for version that awaits broker acknowledgement. + * throughput. See [[produce[K,V](record*]] for version that awaits broker acknowledgement. */ def produceAsync( record: ProducerRecord[Array[Byte], Array[Byte]] @@ -75,13 +75,13 @@ trait Producer { * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases - * throughput. See [[produce[R,K,V](record*]] for version that awaits broker acknowledgement. + * throughput. See [[produce[K,V](record*]] for version that awaits broker acknowledgement. */ - def produceAsync[R, K, V]( + def produceAsync[K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[Task[RecordMetadata]] /** * Produces a single record. The effect returned from this method has two layers and describes the completion of two @@ -91,15 +91,15 @@ trait Producer { * * It is usually recommended to not await the inner layer of every individual record, but enqueue a batch of records * and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases - * throughput. See [[produce[R,K,V](topic*]] for version that awaits broker acknowledgement. + * throughput. See [[produce[K,V](topic*]] for version that awaits broker acknowledgement. */ - def produceAsync[R, K, V]( + def produceAsync[K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[Task[RecordMetadata]] /** * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time @@ -119,11 +119,11 @@ trait Producer { * When publishing any of the records fails, the whole batch fails even though some records might have been published. * Use [[produceChunkAsyncWithFailures]] to get results per record. */ - def produceChunk[R, K, V]( + def produceChunk[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Chunk[RecordMetadata]] + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[Chunk[RecordMetadata]] /** * Produces a chunk of records. The effect returned from this method has two layers and describes the completion of @@ -155,11 +155,11 @@ trait Producer { * When publishing any of the records fails, the whole batch fails even though some records might have been published. * Use [[produceChunkAsyncWithFailures]] to get results per record. */ - def produceChunkAsync[R, K, V]( + def produceChunkAsync[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[Chunk[RecordMetadata]]] + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[Task[Chunk[RecordMetadata]]] /** * Produces a chunk of records. The effect returned from this method has two layers and describes the completion of @@ -257,34 +257,34 @@ object Producer { ZIO.serviceWithZIO[Producer](_.produce(record)) /** - * Accessor method for [[Producer!.produce[R,K,V](record*]] + * Accessor method for [[Producer!.produce[K,V](record*]] */ - def produce[R, K, V]( + def produce[K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R & Producer, RecordMetadata] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): RIO[Producer, RecordMetadata] = ZIO.serviceWithZIO[Producer](_.produce(record, keySerializer, valueSerializer)) /** - * Accessor method for [[Producer!.produce[R,K,V](topic*]] + * Accessor method for [[Producer!.produce[K,V](topic*]] */ - def produce[R, K, V]( + def produce[K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R & Producer, RecordMetadata] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): RIO[Producer, RecordMetadata] = ZIO.serviceWithZIO[Producer](_.produce(topic, key, value, keySerializer, valueSerializer)) /** * A stream pipeline that produces all records from the stream. */ - def produceAll[R, K, V]( - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): ZPipeline[R & Producer, Throwable, ProducerRecord[K, V], RecordMetadata] = + def produceAll[K, V]( + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): ZPipeline[Producer, Throwable, ProducerRecord[K, V], RecordMetadata] = ZPipeline.mapChunksZIO(records => produceChunk(records, keySerializer, valueSerializer)) /** @@ -298,23 +298,23 @@ object Producer { /** * Accessor method */ - def produceAsync[R, K, V]( + def produceAsync[K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R & Producer, Task[RecordMetadata]] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): RIO[Producer, Task[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceAsync(record, keySerializer, valueSerializer)) /** * Accessor method */ - def produceAsync[R, K, V]( + def produceAsync[K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R & Producer, Task[RecordMetadata]] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): RIO[Producer, Task[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceAsync(topic, key, value, keySerializer, valueSerializer)) /** @@ -328,11 +328,11 @@ object Producer { /** * Accessor method */ - def produceChunkAsync[R, K, V]( + def produceChunkAsync[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R & Producer, Task[Chunk[RecordMetadata]]] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): RIO[Producer, Task[Chunk[RecordMetadata]]] = ZIO.serviceWithZIO[Producer](_.produceChunkAsync(records, keySerializer, valueSerializer)) /** @@ -354,11 +354,11 @@ object Producer { /** * Accessor method */ - def produceChunk[R, K, V]( + def produceChunk[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R & Producer, Chunk[RecordMetadata]] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): RIO[Producer, Chunk[RecordMetadata]] = ZIO.serviceWithZIO[Producer](_.produceChunk(records, keySerializer, valueSerializer)) /** @@ -388,20 +388,20 @@ private[producer] final class ProducerLive( override def produce(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[RecordMetadata] = produceAsync(record).flatten - override def produce[R, K, V]( + override def produce[K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[RecordMetadata] = produceAsync(record, keySerializer, valueSerializer).flatten - override def produce[R, K, V]( + override def produce[K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, RecordMetadata] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[RecordMetadata] = produce(new ProducerRecord(topic, key, value), keySerializer, valueSerializer) // noinspection YieldingZIOEffectInspection @@ -411,30 +411,30 @@ private[producer] final class ProducerLive( _ <- sendQueue.offer((Chunk.single(record), done)) } yield done.await.flatMap(result => ZIO.fromEither(result.head)) - override def produceAsync[R, K, V]( + override def produceAsync[K, V]( record: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[Task[RecordMetadata]] = serialize(record, keySerializer, valueSerializer).flatMap(produceAsync) - override def produceAsync[R, K, V]( + override def produceAsync[K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[RecordMetadata]] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[Task[RecordMetadata]] = produceAsync(new ProducerRecord(topic, key, value), keySerializer, valueSerializer) override def produceChunk(records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]]): Task[Chunk[RecordMetadata]] = produceChunkAsync(records).flatten - override def produceChunk[R, K, V]( + override def produceChunk[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Chunk[RecordMetadata]] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[Chunk[RecordMetadata]] = produceChunkAsync(records, keySerializer, valueSerializer).flatten // noinspection YieldingZIOEffectInspection @@ -449,11 +449,11 @@ private[producer] final class ProducerLive( } }) - override def produceChunkAsync[R, K, V]( + override def produceChunkAsync[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, Task[Chunk[RecordMetadata]]] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[Task[Chunk[RecordMetadata]]] = ZIO .foreach(records)(serialize(_, keySerializer, valueSerializer)) .flatMap(produceChunkAsync) @@ -541,13 +541,13 @@ private[producer] final class ProducerLive( } .runDrain - private def serialize[R, K, V]( + private def serialize[K, V]( r: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V] - ): RIO[R, ByteRecord] = + keySerializer: Serializer[K], + valueSerializer: Serializer[V] + ): Task[ByteRecord] = for { - key <- keySerializer.serialize(r.topic, r.headers, r.key()) - value <- valueSerializer.serialize(r.topic, r.headers, r.value()) + key <- ZIO.succeed(keySerializer.serialize(r.topic, r.headers, r.key())) + value = valueSerializer.serialize(r.topic, r.headers, r.value()) } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index b08b3f36e..c10f0cd22 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -4,38 +4,38 @@ import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata } import zio.kafka.consumer.{ Offset, OffsetBatch } import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort } import zio.kafka.serde.Serializer -import zio.{ Chunk, IO, RIO, Ref, UIO, ZIO } +import zio.{ Chunk, IO, Ref, Task, UIO, ZIO } trait Transaction { - def produce[R, K, V]( + def produce[K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V], + keySerializer: Serializer[K], + valueSerializer: Serializer[V], offset: Option[Offset] - ): RIO[R, RecordMetadata] + ): Task[RecordMetadata] - def produce[R, K, V]( + def produce[K, V]( producerRecord: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V], + keySerializer: Serializer[K], + valueSerializer: Serializer[V], offset: Option[Offset] - ): RIO[R, RecordMetadata] + ): Task[RecordMetadata] - def produceChunk[R, K, V]( + def produceChunk[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V], + keySerializer: Serializer[K], + valueSerializer: Serializer[V], offset: Option[Offset] - ): RIO[R, Chunk[RecordMetadata]] + ): Task[Chunk[RecordMetadata]] - def produceChunkBatch[R, K, V]( + def produceChunkBatch[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V], + keySerializer: Serializer[K], + valueSerializer: Serializer[V], offsets: OffsetBatch - ): RIO[R, Chunk[RecordMetadata]] + ): Task[Chunk[RecordMetadata]] def abort: IO[TransactionalProducer.UserInitiatedAbort.type, Nothing] } @@ -45,45 +45,45 @@ private[producer] final class TransactionImpl( private[producer] val offsetBatchRef: Ref[OffsetBatch], closed: Ref[Boolean] ) extends Transaction { - def produce[R, K, V]( + def produce[K, V]( topic: String, key: K, value: V, - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V], + keySerializer: Serializer[K], + valueSerializer: Serializer[V], offset: Option[Offset] - ): RIO[R, RecordMetadata] = + ): Task[RecordMetadata] = produce(new ProducerRecord[K, V](topic, key, value), keySerializer, valueSerializer, offset) - def produce[R, K, V]( + def produce[K, V]( producerRecord: ProducerRecord[K, V], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V], + keySerializer: Serializer[K], + valueSerializer: Serializer[V], offset: Option[Offset] - ): RIO[R, RecordMetadata] = + ): Task[RecordMetadata] = haltIfClosed *> ZIO.whenCase(offset) { case Some(offset) => offsetBatchRef.update(_ add offset) } *> - producer.produce[R, K, V](producerRecord, keySerializer, valueSerializer) + producer.produce[K, V](producerRecord, keySerializer, valueSerializer) - def produceChunk[R, K, V]( + def produceChunk[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V], + keySerializer: Serializer[K], + valueSerializer: Serializer[V], offset: Option[Offset] - ): RIO[R, Chunk[RecordMetadata]] = + ): Task[Chunk[RecordMetadata]] = haltIfClosed *> ZIO.whenCase(offset) { case Some(offset) => offsetBatchRef.update(_ add offset) } *> - producer.produceChunk[R, K, V](records, keySerializer, valueSerializer) + producer.produceChunk[K, V](records, keySerializer, valueSerializer) - def produceChunkBatch[R, K, V]( + def produceChunkBatch[K, V]( records: Chunk[ProducerRecord[K, V]], - keySerializer: Serializer[R, K], - valueSerializer: Serializer[R, V], + keySerializer: Serializer[K], + valueSerializer: Serializer[V], offsets: OffsetBatch - ): RIO[R, Chunk[RecordMetadata]] = + ): Task[Chunk[RecordMetadata]] = haltIfClosed *> offsetBatchRef.update(_ merge offsets) *> - producer.produceChunk[R, K, V](records, keySerializer, valueSerializer) + producer.produceChunk[K, V](records, keySerializer, valueSerializer) def abort: IO[TransactionalProducer.UserInitiatedAbort.type, Nothing] = ZIO.fail(UserInitiatedAbort) diff --git a/zio-kafka/src/main/scala/zio/kafka/serde/Deserializer.scala b/zio-kafka/src/main/scala/zio/kafka/serde/Deserializer.scala index 3218664c4..9acc41f09 100644 --- a/zio-kafka/src/main/scala/zio/kafka/serde/Deserializer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/serde/Deserializer.scala @@ -1,11 +1,7 @@ package zio.kafka.serde import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.serialization.{ Deserializer => KafkaDeserializer } -import zio.{ RIO, Task, ZIO } - -import scala.jdk.CollectionConverters._ -import scala.util.{ Failure, Success, Try } +import zio.{ IO, ZIO } /** * Deserializer from byte array to a value of some type T @@ -15,48 +11,40 @@ import scala.util.{ Failure, Success, Try } * @tparam T * Value type */ -trait Deserializer[-R, +T] { - def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] - - /** - * Returns a new deserializer that executes its deserialization function on the blocking threadpool. - */ - def blocking: Deserializer[R, T] = - Deserializer((topic, headers, data) => ZIO.blocking(deserialize(topic, headers, data))) +trait Deserializer[+E, T] { + def deserialize(topic: String, headers: Headers, data: Array[Byte]): IO[E, T] /** * Create a deserializer for a type U based on the deserializer for type T and a mapping function */ - def map[U](f: T => U): Deserializer[R, U] = Deserializer(deserialize(_, _, _).map(f)) + def map[U](f: T => U): Deserializer[E, U] = Deserializer(deserialize(_, _, _).map(f)) /** * Create a deserializer for a type U based on the deserializer for type T and an effectful mapping function */ - def mapM[R1 <: R, U](f: T => RIO[R1, U]): Deserializer[R1, U] = Deserializer(deserialize(_, _, _).flatMap(f)) + def mapM[E1 >: E, U](f: T => IO[E1, U]): Deserializer[E1, U] = Deserializer(deserialize(_, _, _).flatMap(f)) /** * When this serializer fails, attempt to deserialize with the alternative * * If both deserializers fail, the error will be the last deserializer's exception. */ - def orElse[R1 <: R, U >: T](alternative: Deserializer[R1, U]): Deserializer[R1, U] = + def orElse[E1 >: E, U >: T](alternative: Deserializer[E1, U]): Deserializer[E1, U] = Deserializer { (topic, headers, data) => deserialize(topic, headers, data) orElse alternative.deserialize(topic, headers, data) } - /** - * Serde that handles deserialization failures by returning a Task - * - * This is useful for explicitly handling deserialization failures. - */ - def asTry: Deserializer[R, Try[T]] = - Deserializer(deserialize(_, _, _).fold(e => Failure(e), v => Success(v))) - /** * Returns a new deserializer that deserializes values as Option values, mapping null data to None values. */ - def asOption: Deserializer[R, Option[T]] = + def asOption: Deserializer[E, Option[T]] = Deserializer((topic, headers, data) => ZIO.foreach(Option(data))(deserialize(topic, headers, _))) + + def mapError[E1](f: E => E1): Deserializer[E1, T] = + Deserializer((topic, headers, data) => deserialize(topic, headers, data).mapError(f)) + + def catchAll[E1 >: E](f: E => IO[E1, T]): Deserializer[E1, T] = + Deserializer((topic, headers, data) => deserialize(topic, headers, data).catchAll(f)) } object Deserializer extends Serdes { @@ -64,23 +52,6 @@ object Deserializer extends Serdes { /** * Create a deserializer from a function */ - def apply[R, T](deser: (String, Headers, Array[Byte]) => RIO[R, T]): Deserializer[R, T] = + def apply[E, T](deser: (String, Headers, Array[Byte]) => IO[E, T]): Deserializer[E, T] = (topic: String, headers: Headers, data: Array[Byte]) => deser(topic, headers, data) - - /** - * Create a Deserializer from a Kafka Deserializer - */ - def fromKafkaDeserializer[T]( - deserializer: KafkaDeserializer[T], - props: Map[String, AnyRef], - isKey: Boolean - ): Task[Deserializer[Any, T]] = - ZIO - .attempt(deserializer.configure(props.asJava, isKey)) - .as( - new Deserializer[Any, T] { - override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Task[T] = - ZIO.attempt(deserializer.deserialize(topic, headers, data)) - } - ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/serde/Serde.scala b/zio-kafka/src/main/scala/zio/kafka/serde/Serde.scala index a6bd842ad..e1c70aca1 100644 --- a/zio-kafka/src/main/scala/zio/kafka/serde/Serde.scala +++ b/zio-kafka/src/main/scala/zio/kafka/serde/Serde.scala @@ -1,11 +1,7 @@ package zio.kafka.serde import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.serialization.{ Serde => KafkaSerde } -import zio.{ RIO, Task, ZIO } - -import scala.jdk.CollectionConverters._ -import scala.util.Try +import zio.IO /** * A serializer and deserializer for values of type T @@ -15,31 +11,25 @@ import scala.util.Try * @tparam T * Value type */ -trait Serde[-R, T] extends Deserializer[R, T] with Serializer[R, T] { +trait Serde[+E, T] extends Deserializer[E, T] with Serializer[T] { /** * Creates a new Serde that uses optional values. Null data will be mapped to None values. */ - override def asOption: Serde[R, Option[T]] = + override def asOption: Serde[E, Option[T]] = Serde(super[Deserializer].asOption)(super[Serializer].asOption) - /** - * Creates a new Serde that executes its serialization and deserialization functions on the blocking threadpool. - */ - override def blocking: Serde[R, T] = - Serde(super[Deserializer].blocking)(super[Serializer].blocking) - /** * Converts to a Serde of type U with pure transformations */ - def inmap[U](f: T => U)(g: U => T): Serde[R, U] = + def inmap[U](f: T => U)(g: U => T): Serde[E, U] = Serde(map(f))(contramap(g)) /** * Convert to a Serde of type U with effectful transformations */ - def inmapM[R1 <: R, U](f: T => RIO[R1, U])(g: U => RIO[R1, T]): Serde[R1, U] = - Serde(mapM(f))(contramapM(g)) + def inmapM[E1 >: E, U](f: T => IO[E1, U])(g: U => T): Serde[E1, U] = + Serde(mapM(f))(contramap(g)) } object Serde extends Serdes { @@ -49,46 +39,24 @@ object Serde extends Serdes { * * The (de)serializer functions can returned a failure ZIO with a Throwable to indicate (de)serialization failure */ - def apply[R, T]( - deser: (String, Headers, Array[Byte]) => RIO[R, T] - )(ser: (String, Headers, T) => RIO[R, Array[Byte]]): Serde[R, T] = - new Serde[R, T] { - override final def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]] = + def apply[E, T]( + deser: (String, Headers, Array[Byte]) => IO[E, T] + )(ser: (String, Headers, T) => Array[Byte]): Serde[E, T] = + new Serde[E, T] { + override final def serialize(topic: String, headers: Headers, value: T): Array[Byte] = ser(topic, headers, value) - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] = + override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): IO[E, T] = deser(topic, headers, data) } /** * Create a Serde from a deserializer and serializer function */ - def apply[R, T](deser: Deserializer[R, T])(ser: Serializer[R, T]): Serde[R, T] = - new Serde[R, T] { - override final def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]] = + def apply[E, T](deser: Deserializer[E, T])(ser: Serializer[T]): Serde[E, T] = + new Serde[E, T] { + override final def serialize(topic: String, headers: Headers, value: T): Array[Byte] = ser.serialize(topic, headers, value) - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] = + override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): IO[E, T] = deser.deserialize(topic, headers, data) } - - /** - * Create a Serde from a Kafka Serde - */ - def fromKafkaSerde[T](serde: KafkaSerde[T], props: Map[String, AnyRef], isKey: Boolean): Task[Serde[Any, T]] = - ZIO - .attempt(serde.configure(props.asJava, isKey)) - .as( - new Serde[Any, T] { - private final val serializer = serde.serializer() - private final val deserializer = serde.deserializer() - - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): Task[T] = - ZIO.attempt(deserializer.deserialize(topic, headers, data)) - - override final def serialize(topic: String, headers: Headers, value: T): Task[Array[Byte]] = - ZIO.attempt(serializer.serialize(topic, headers, value)) - } - ) - - implicit def deserializerWithError[R, T](implicit deser: Deserializer[R, T]): Deserializer[R, Try[T]] = - deser.asTry } diff --git a/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala b/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala index f02577f72..df579cee7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala +++ b/zio-kafka/src/main/scala/zio/kafka/serde/Serdes.scala @@ -3,21 +3,21 @@ package zio.kafka.serde import org.apache.kafka.common.header.Headers import org.apache.kafka.common.serialization.{ Serde => KafkaSerde, Serdes => KafkaSerdes } import org.apache.kafka.common.utils.Bytes -import zio.{ RIO, ZIO } +import zio.{ IO, ZIO } import java.nio.ByteBuffer import java.util.UUID private[zio] trait Serdes { - val long: Serde[Any, Long] = convertPrimitiveSerde(KafkaSerdes.Long()).inmap(Long2long)(long2Long) - val int: Serde[Any, Int] = convertPrimitiveSerde(KafkaSerdes.Integer()).inmap(Integer2int)(int2Integer) - val short: Serde[Any, Short] = convertPrimitiveSerde(KafkaSerdes.Short()).inmap(Short2short)(short2Short) - val float: Serde[Any, Float] = convertPrimitiveSerde(KafkaSerdes.Float()).inmap(Float2float)(float2Float) - val double: Serde[Any, Double] = convertPrimitiveSerde(KafkaSerdes.Double()).inmap(Double2double)(double2Double) - val string: Serde[Any, String] = convertPrimitiveSerde(KafkaSerdes.String()) - val bytes: Serde[Any, Bytes] = convertPrimitiveSerde(KafkaSerdes.Bytes()) - val byteBuffer: Serde[Any, ByteBuffer] = convertPrimitiveSerde(KafkaSerdes.ByteBuffer()) - val uuid: Serde[Any, UUID] = convertPrimitiveSerde(KafkaSerdes.UUID()) + val long: Serde[Throwable, Long] = convertPrimitiveSerde(KafkaSerdes.Long()).inmap(Long2long)(long2Long) + val int: Serde[Throwable, Int] = convertPrimitiveSerde(KafkaSerdes.Integer()).inmap(Integer2int)(int2Integer) + val short: Serde[Throwable, Short] = convertPrimitiveSerde(KafkaSerdes.Short()).inmap(Short2short)(short2Short) + val float: Serde[Throwable, Float] = convertPrimitiveSerde(KafkaSerdes.Float()).inmap(Float2float)(float2Float) + val double: Serde[Throwable, Double] = convertPrimitiveSerde(KafkaSerdes.Double()).inmap(Double2double)(double2Double) + val string: Serde[Throwable, String] = convertPrimitiveSerde(KafkaSerdes.String()) + val bytes: Serde[Throwable, Bytes] = convertPrimitiveSerde(KafkaSerdes.Bytes()) + val byteBuffer: Serde[Throwable, ByteBuffer] = convertPrimitiveSerde(KafkaSerdes.ByteBuffer()) + val uuid: Serde[Throwable, UUID] = convertPrimitiveSerde(KafkaSerdes.UUID()) /** * Optimisation @@ -27,24 +27,24 @@ private[zio] trait Serdes { * * That allows us to use [[ZIO.succeed]] instead of [[ZIO.attempt]]. */ - val byteArray: Serde[Any, Array[Byte]] = - new Serde[Any, Array[Byte]] { - override final def serialize(topic: String, headers: Headers, value: Array[Byte]): RIO[Any, Array[Byte]] = - ZIO.succeed(value) + val byteArray: Serde[Nothing, Array[Byte]] = + new Serde[Nothing, Array[Byte]] { + override final def serialize(topic: String, headers: Headers, value: Array[Byte]): Array[Byte] = + value - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[Any, Array[Byte]] = + override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): IO[Nothing, Array[Byte]] = ZIO.succeed(data) } - private[this] def convertPrimitiveSerde[T](serde: KafkaSerde[T]): Serde[Any, T] = - new Serde[Any, T] { + private[this] def convertPrimitiveSerde[T](serde: KafkaSerde[T]): Serde[Throwable, T] = + new Serde[Throwable, T] { private final val serializer = serde.serializer() private final val deserializer = serde.deserializer() - override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[Any, T] = + override final def deserialize(topic: String, headers: Headers, data: Array[Byte]): IO[Throwable, T] = ZIO.attempt(deserializer.deserialize(topic, headers, data)) - override final def serialize(topic: String, headers: Headers, value: T): RIO[Any, Array[Byte]] = - ZIO.attempt(serializer.serialize(topic, headers, value)) + override final def serialize(topic: String, headers: Headers, value: T): Array[Byte] = + serializer.serialize(topic, headers, value) } } diff --git a/zio-kafka/src/main/scala/zio/kafka/serde/Serializer.scala b/zio-kafka/src/main/scala/zio/kafka/serde/Serializer.scala index 9d984707f..d97d283b0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/serde/Serializer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/serde/Serializer.scala @@ -1,46 +1,31 @@ package zio.kafka.serde import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.serialization.{ Serializer => KafkaSerializer } -import zio.{ RIO, Task, ZIO } - -import scala.jdk.CollectionConverters._ /** * Serializer from values of some type T to a byte array * - * @tparam R - * Environment available to the serializer + * Serialization may not fail and any value can be serialized. If effectful operations are needed for serialization, + * this must be done before passing a serialized value to the Producer. Use [[Serdes.byteArray]] + * * @tparam T */ -trait Serializer[-R, -T] { - def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]] +trait Serializer[T] { + def serialize(topic: String, headers: Headers, value: T): Array[Byte] /** * Create a serializer for a type U based on the serializer for type T and a mapping function */ - def contramap[U](f: U => T): Serializer[R, U] = + def contramap[U](f: U => T): Serializer[U] = Serializer((topic, headers, u) => serialize(topic, headers, f(u))) - /** - * Create a serializer for a type U based on the serializer for type T and an effectful mapping function - */ - def contramapM[R1 <: R, U](f: U => RIO[R1, T]): Serializer[R1, U] = - Serializer((topic, headers, u) => f(u).flatMap(serialize(topic, headers, _))) - - /** - * Returns a new serializer that executes its serialization function on the blocking threadpool. - */ - def blocking: Serializer[R, T] = - Serializer((topic, headers, t) => ZIO.blocking(serialize(topic, headers, t))) - /** * Returns a new serializer that handles optional values and serializes them as nulls. */ - def asOption[U <: T]: Serializer[R, Option[U]] = + def asOption[U <: T]: Serializer[Option[U]] = Serializer { (topic, headers, valueOpt) => valueOpt match { - case None => ZIO.succeed(null) + case None => null case Some(value) => serialize(topic, headers, value) } } @@ -51,24 +36,6 @@ object Serializer extends Serdes { /** * Create a serializer from a function */ - def apply[R, T](ser: (String, Headers, T) => RIO[R, Array[Byte]]): Serializer[R, T] = + def apply[T](ser: (String, Headers, T) => Array[Byte]): Serializer[T] = (topic: String, headers: Headers, value: T) => ser(topic, headers, value) - - /** - * Create a Serializer from a Kafka Serializer - */ - def fromKafkaSerializer[T]( - serializer: KafkaSerializer[T], - props: Map[String, AnyRef], - isKey: Boolean - ): Task[Serializer[Any, T]] = - ZIO - .attempt(serializer.configure(props.asJava, isKey)) - .as( - new Serializer[Any, T] { - override def serialize(topic: String, headers: Headers, value: T): Task[Array[Byte]] = - ZIO.attempt(serializer.serialize(topic, headers, value)) - } - ) - }