Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-effectful serialization and deserialization #1380

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

def consumeAndCommit(count: Long) =
Consumer
.partitionedStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.partitionedStream[Any, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.flatMapPar(partitionCount)(_._2)
.take(count)
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](20))
Expand Down Expand Up @@ -293,7 +293,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

def consumeAndCommit(count: Long, topic: String, groupId: String) =
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.plainStream[Any, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))
Expand Down Expand Up @@ -336,7 +336,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

def consumeAndCommit(count: Long, topic: String, groupId: String) =
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.plainStream[Any, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))
Expand Down
2 changes: 1 addition & 1 deletion zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object PopulateTopic extends ZIOAppDefault {
override def run: ZIO[Any, Throwable, Unit] =
dataStream(872000).map { case (k, v) =>
new ProducerRecord("inputs-topic", null, null, k, v)
}.mapChunksZIO(Producer.produceChunkAsync[Any, String, String](_, Serde.string, Serde.string).map(Chunk(_)))
}.mapChunksZIO(Producer.produceChunkAsync[String, String](_, Serde.string, Serde.string).map(Chunk(_)))
.mapZIOPar(5)(_.flatMap(chunk => Console.printLine(s"Wrote chunk of ${chunk.size}")))
.runDrain
.provide(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package zio.kafka.serde

import org.apache.kafka.common.header.internals.RecordHeaders
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.test.Assertion._
import zio.test._
import zio.ZAny
import zio.kafka.ZIOSpecDefaultSlf4j

object DeserializerSpec extends ZIOSpecDefaultSlf4j {
override def spec: Spec[ZAny with Any, Throwable] = suite("Deserializer")(
override def spec: Spec[TestEnvironment with Scope, Any] = suite("Deserializer")(
suite("asOption")(
test("deserialize to None when value is null") {
assertZIO(stringDeserializer.asOption.deserialize("topic1", new RecordHeaders, null))(isNone)
Expand Down
11 changes: 6 additions & 5 deletions zio-kafka-test/src/test/scala/zio/kafka/serde/SerdeSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zio.kafka.serde

import org.apache.kafka.common.header.internals.RecordHeaders
import zio.Scope
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.test.Assertion._
import zio.test._
Expand All @@ -15,7 +16,7 @@ object SerdeSpec extends ZIOSpecDefaultSlf4j {

private val anyBytes = Gen.listOf(Gen.byte).map(bytes => new org.apache.kafka.common.utils.Bytes(bytes.toArray))

override def spec: Spec[Any, Throwable] = suite("Serde")(
override def spec: Spec[TestEnvironment with Scope, Any] = suite("Serde")(
testSerde(Serde.string, Gen.string),
testSerde(Serde.int, Gen.int),
testSerde(Serde.short, Gen.short),
Expand All @@ -27,20 +28,20 @@ object SerdeSpec extends ZIOSpecDefaultSlf4j {
testSerde(Serde.byteArray, Gen.listOf(Gen.byte).map(_.toArray)),
suite("asOption")(
test("serialize and deserialize None values to null and visa versa") {
val serde = testDataStructureSerde.asOption
val serde = testDataStructureSerde.asOption
val serialized = serde.serialize("topic1", new RecordHeaders, None)
for {
serialized <- serde.serialize("topic1", new RecordHeaders, None)
deserialized <- serde.deserialize("topic1", new RecordHeaders, serialized)
} yield assert(serialized)(isNull) && assert(deserialized)(isNone)
}
)
)

private def testSerde[R, A](serde: Serde[Any, A], gen: Gen[R, A])(implicit clsTag: ClassTag[A]) =
private def testSerde[R, A](serde: Serde[Throwable, A], gen: Gen[R, A])(implicit clsTag: ClassTag[A]) =
test(s"serialize and deserialize ${clsTag.runtimeClass.getSimpleName}") {
check(gen) { value =>
val serialized = serde.serialize("topic1", new RecordHeaders, value)
for {
serialized <- serde.serialize("topic1", new RecordHeaders, value)
deserialized <- serde.deserialize("topic1", new RecordHeaders, serialized)
} yield assert(deserialized)(equalTo(value))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@ package zio.kafka.serde
import org.apache.kafka.common.header.internals.RecordHeaders
import zio.test.Assertion._
import zio.test._
import zio.ZAny
import zio.{ ZAny, ZIO }
import zio.kafka.ZIOSpecDefaultSlf4j

object SerializerSpec extends ZIOSpecDefaultSlf4j {
override def spec: Spec[ZAny with Any, Throwable] = suite("Serializer")(
suite("asOption")(
test("serialize None values to null") {
assertZIO(stringSerializer.asOption.serialize("topic1", new RecordHeaders, None))(isNull)
assert(stringSerializer.asOption.serialize("topic1", new RecordHeaders, None))(isNull)
},
test("serialize Some values") {
check(Gen.string) { string =>
assertZIO(
stringSerializer.asOption.serialize("topic1", new RecordHeaders, Some(string)).map(new String(_, "UTF-8"))
ZIO
.succeed(stringSerializer.asOption.serialize("topic1", new RecordHeaders, Some(string)))
.map(new String(_, "UTF-8"))
)(
equalTo(string)
)
}
}
)
)
private lazy val stringSerializer: Serializer[Any, String] = Serde.string
private lazy val stringSerializer: Serializer[String] = Serde.string
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object KafkaTestUtils {
key: String,
message: String
): ZIO[Producer, Throwable, RecordMetadata] =
Producer.produce[Any, String, String](new ProducerRecord(topic, key, message), Serde.string, Serde.string)
Producer.produce[String, String](new ProducerRecord(topic, key, message), Serde.string, Serde.string)

/**
* Utility function to produce many messages in give Partition of a Topic.
Expand All @@ -81,7 +81,7 @@ object KafkaTestUtils {
kvs: Iterable[(String, String)]
): ZIO[Producer, Throwable, Chunk[RecordMetadata]] =
Producer
.produceChunk[Any, String, String](
.produceChunk[String, String](
Chunk.fromIterable(kvs.map { case (k, v) =>
new ProducerRecord(topic, partition, null, k, v)
}),
Expand All @@ -97,7 +97,7 @@ object KafkaTestUtils {
kvs: Iterable[(String, String)]
): ZIO[Producer, Throwable, Chunk[RecordMetadata]] =
Producer
.produceChunk[Any, String, String](
.produceChunk[String, String](
Chunk.fromIterable(kvs.map { case (k, v) =>
new ProducerRecord(topic, k, v)
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package zio.kafka.consumer
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import zio.kafka.serde.Deserializer
import zio.{ RIO, Task }
import zio.{ IO, Task }

final case class CommittableRecord[K, V](
record: ConsumerRecord[K, V],
private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
private val consumerGroupMetadata: Option[ConsumerGroupMetadata]
) {
def deserializeWith[R, K1, V1](
keyDeserializer: Deserializer[R, K1],
valueDeserializer: Deserializer[R, V1]
)(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): RIO[R, CommittableRecord[K1, V1]] =
def deserializeWith[E, K1, V1](
keyDeserializer: Deserializer[E, K1],
valueDeserializer: Deserializer[E, V1]
)(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): IO[E, CommittableRecord[K1, V1]] =
for {
key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key())
value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value())
Expand Down
Loading