Skip to content

Provides JsonSchema-aware codecs for FS2 Kafka that integrate with Confluent's Schema Registry and implementation (Scala 3 only)

License

Notifications You must be signed in to change notification settings

kaizen-solutions/fs2-kafka-jsonschema

Repository files navigation

FS2 Kafka JsonSchema

Continuous Integration Maven Central

Provides FS2 Kafka Serializers and Deserializers that provide integration with Confluent Schema Registry for JSON messages with JSON Schemas.

Note: This library only works with Scala 3.3.x and above. For Scala 2.x, see here.

This functionality is backed by the following libraries:

Usage

Add the following to your build.sbt

resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven")
libraryDependencies += "io.kaizen-solutions" %% "fs2-kafka-jsonschema" % "<latest-version>"

Example

Define the datatype that you would like to send/receive over Kafka via the JSON + JSON Schema format. You do this by defining your datatype and providing a Pickler instance for it. The Pickler instance comes from the Tapir library.

import sttp.tapir.Schema.annotations.*
import sttp.tapir.json.pickler.*

final case class Book(
  @description("name of the book") name: String,
  @description("international standard book number") isbn: Int
)
object Book:
  given Pickler[Book] = Pickler.derived

Next, you can create a fs2 Kafka Serializer and Deserializer for this datatype and use it when building your FS2 Kafka producer/consumer.

import io.kaizensolutions.jsonschema.*
import cats.effect.*
import fs2.kafka.*

def bookSerializer[F[_]: Sync]: Resource[F, ValueSerializer[F, Book]] =
  JsonSchemaSerializerSettings.default
    .withSchemaRegistryUrl("http://localhost:8081")
    .forValue[F, Book]

def bookDeserializer[F[_]: Sync]: Resource[F, ValueDeserializer[F, Book]] =
  JsonSchemaDeserializerSettings.default
    .withSchemaRegistryUrl("http://localhost:8081")
    .forValue[F, Book]

About

Provides JsonSchema-aware codecs for FS2 Kafka that integrate with Confluent's Schema Registry and implementation (Scala 3 only)

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages